it works and it is awesome

This commit is contained in:
Rushmore75 2025-03-19 15:04:00 -06:00
parent bac3cd9d1d
commit 71b7b2d7bc
4 changed files with 66 additions and 118 deletions

141
src/db.rs
View File

@ -4,43 +4,55 @@ use base64::{
Engine,
};
use metrics::counter;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use serde::{ser::SerializeStruct, Deserialize, Serialize};
use std::{fmt::Debug, sync::LazyLock, time::Instant};
use surrealdb::{
engine::remote::ws::{Client, Ws},
error::Db,
opt::auth::Root,
sql::{Query, Thing},
sql::Thing,
Error::Api,
Response, Surreal,
};
use tokio::sync::Mutex;
use tracing::{error, instrument, trace, warn};
use url::Url;
use crate::{Config, Timer};
// static LOCK: LazyLock<Arc<Mutex<bool>>> = LazyLock::new(|| Arc::new(Mutex::new(true)));
static LOCK: LazyLock<Mutex<bool>> = LazyLock::new(|| Mutex::new(true));
const CUSTOM_ENGINE: engine::GeneralPurpose =
engine::GeneralPurpose::new(&alphabet::URL_SAFE, general_purpose::NO_PAD);
const ROUND_TRIP_METRIC: &'static str = "surql_trips";
const TIME_SPENT_ON_LOCK: &'static str = "surql_lock_waiting_ms";
const STORE: &'static str = "surql_store_calls";
const LINK: &'static str = "surql_link_calls";
#[derive(Serialize, Deserialize, Clone)]
#[derive(Deserialize, Clone)]
pub struct Website {
/// The url that this data is found at
pub site: Url,
/// Wether or not this link has been crawled yet
pub crawled: bool,
#[serde(skip_serializing)]
id: Option<Thing>,
}
impl Serialize for Website {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer {
let mut state = serializer.serialize_struct("Website", 2)?;
state.serialize_field("crawled", &self.crawled)?;
// to_string() calls the correct naming of site
state.serialize_field("site", &self.site.to_string())?;
state.end()
}
}
// manual impl to make tracing look nicer
impl Debug for Website {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let site = (self.site.domain().unwrap_or("n/a")).to_string() + self.site.path();
f.debug_struct("Website").field("site", &site).finish()
f.debug_struct("Website").field("site", &self.site).finish()
}
}
@ -52,9 +64,8 @@ impl Website {
Err(_) => todo!(),
};
Self {
id: None,
crawled,
site,
site
}
}
@ -64,7 +75,13 @@ impl Website {
}
pub fn get_url_as_string(site: &Url) -> String {
let domain = site.domain().unwrap_or("DOMAIN").to_string();
let domain = match site.domain() {
Some(s) => s.to_string(),
None => {
warn!("Failed to get domain of URL: {}, falling back to 'localhost'", site.to_string());
"localhost".to_string()
}
};
let path = site.path();
domain + path
@ -82,19 +99,19 @@ impl Website {
if len == 0 {
return;
}
let from = &self.site;
let from = self.site.to_string();
// let to = other.site.to_string();
trace!("Linking {from} to {} other pages.", other.len());
let msg = format!("Linked {len} pages");
trace!("Linking {} pages to {from}", other.len());
let msg = format!("Linked {len} pages to {from}");
let timer = Timer::start(&msg);
// prevent the timer from being dropped instantly.
let _ = timer;
counter!(ROUND_TRIP_METRIC).increment(1);
counter!(LINK).increment(1);
match db
.query("COUNT(RELATE (SELECT id FROM website WHERE site = $in) -> links_to -> $out)")
.bind(("in", from))
.bind(("in", from.clone()))
.bind(("out", other))
.await
{
@ -106,15 +123,15 @@ impl Website {
let _: Vec<usize> = vec;
if let Some(num) = vec.get(0) {
if *num == len {
trace!("Link OK");
trace!("Link for {from} OK - {num}/{len}");
return;
} else {
warn!("Didn't link all the records. {num}/{len}");
error!("Didn't link all the records. {num}/{len}. Surreal response: {:?}", e);
return;
}
}
}
warn!("Linking request succeeded but couldn't verify the results.");
error!("Linking request succeeded but couldn't verify the results.");
}
Err(e) => {
error!("{}", e.to_string());
@ -125,11 +142,17 @@ impl Website {
// Insert ever item in the vec into surreal, crawled state will be preserved as TRUE
// if already in the database as such or incoming data is TRUE.
pub async fn store_all(all: Vec<Self>, db: &Surreal<Client>) -> Vec<Thing> {
counter!(ROUND_TRIP_METRIC).increment(1);
counter!(STORE).increment(1);
let mut things = Vec::with_capacity(all.len());
// TODO this only allows for one thread to be in the database at a time.
// This is currently required since otherwise we get write errors.
// If the default `crawled` is set to false, we might not need to write any more
// than just the name. `accessed_at` is fun but not needed.
let now = Instant::now();
let lock = LOCK.lock().await;
counter!(TIME_SPENT_ON_LOCK).increment(now.elapsed().as_millis() as u64);
match db
.query(
"INSERT INTO website $array
@ -156,79 +179,9 @@ impl Website {
error!("{:?}", err);
}
}
drop(lock);
things
}
#[instrument(name = "surql_store", skip_all)]
pub async fn store(&self, db: &Surreal<Client>) -> Option<Thing> {
counter!(STORE).increment(1);
let counter = counter!(ROUND_TRIP_METRIC);
let t = Timer::start("Stored link");
let _ = t;
counter.increment(1);
// check if it's been gone thru before
let mut response = db
.query("SELECT * FROM ONLY website WHERE site = $site LIMIT 1")
.bind(("site", self.site.to_string()))
.await
.expect("Failed to check surreal for duplicates!");
if let Some(old) = response
.take::<Option<Website>>(0)
.expect("Failed to read response from surreal for duplicates.")
{
// site exists already
if let Some(id) = old.id {
// make sure to preserve the "crawled status"
let mut new = self.clone();
new.crawled = old.crawled | new.crawled;
counter.increment(1);
// update the record
match db.upsert((id.tb, id.id.to_string())).content(new).await {
Ok(e) => {
if let Some(a) = e {
let _: Record = a;
return Some(a.id);
}
}
Err(e) => {
match e {
surrealdb::Error::Db(error) => match error {
Db::QueryCancelled => todo!(),
Db::QueryNotExecuted => todo!(),
Db::QueryNotExecutedDetail { message: _ } => todo!(),
_ => {}
},
_ => {}
}
// error!("{}", e);
}
};
}
} else {
counter.increment(1);
// sites hasn't existed yet
match db.create("website").content(self.clone()).await {
Ok(e) => {
let _: Option<Record> = e;
if let Some(a) = e {
let _: Record = a;
return Some(a.id);
}
}
Err(a) => error!("{:?}", a),
};
}
None
}
}
impl ToString for Website {
fn to_string(&self) -> String {
self.site.to_string()
}
}
#[derive(Debug, Serialize)]

View File

@ -55,14 +55,16 @@ async fn main() {
.open("./docker/logs/tracing.log")
.expect("Couldn't make log file!");
let filter = EnvFilter::from_default_env();
let registry = Registry::default().with(
fmt::layer()
.with_line_number(true)
.with_thread_ids(true)
.with_file(true)
// .with_timer(LocalTime::rfc_3339()) // Loki or alloy does this automatically
.json()
.with_writer(writer), // .with_filter(EnvFilter::from_default_env())
.with_writer(writer)
.with_filter(filter)
);
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
@ -111,7 +113,6 @@ async fn main() {
let site = Website::new(&starting_url, false);
process(site, db.clone(), reqwest.clone(), s3.clone()).await;
return;
drop(pre_loop_span);
let span = trace_span!("Loop");
@ -154,7 +155,7 @@ async fn main() {
}
drop(span);
info!("Done");
debug!("Done");
drop(total_runtime);
}
@ -164,10 +165,10 @@ async fn main() {
async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client, s3: S3) {
// METRICS
trace!("Process: {}", site.to_string());
trace!("Process: {}", &site.site);
let timer = Timer::start("Built request");
// Build the request
let request_builder = reqwest.get(site.to_string());
let request_builder = reqwest.get(&site.site.to_string());
// METRICS
timer.stop();
@ -183,9 +184,8 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
timer.stop();
g.decrement(1);
counter!(GET_METRIC).increment(1);
debug!("Getting body...");
// Get body from respones
// Get body from response
let data = response
.text()
.await
@ -198,7 +198,7 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
// update self in db
site.set_crawled();
site.store(&db).await;
Website::store_all(vec![site.clone()], &db).await;
// Store all the other sites so that we can link to them.
// let mut links_to = Vec::new();
@ -206,8 +206,9 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
// Make the database's links reflect the html links between sites
site.links_to(others, &db).await;
} else {
error!("Failed to get: {}", &site.site);
}
error!("Failed to get: {}", site.to_string());
}
/// Returns uncrawled links
@ -252,8 +253,6 @@ impl<'a> Timer<'a> {
if ms > 200. {
warn!("{}", format!("{} in {:.3}ms", self.msg, ms));
} else {
trace!("{}", format!("{} in {:.3}ms", self.msg, ms));
}
ms

View File

@ -5,8 +5,6 @@ use html5ever::tokenizer::{BufferQueue, TokenizerResult};
use html5ever::tokenizer::{StartTag, TagToken};
use html5ever::tokenizer::{Token, TokenSink, TokenSinkResult, Tokenizer, TokenizerOpts};
use html5ever::{local_name, tendril::*};
use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal;
use tracing::instrument;
use crate::db::Website;

View File

@ -1,8 +1,3 @@
use base64::{
alphabet,
engine::{self, general_purpose},
Engine,
};
use metrics::counter;
use minio::s3::{
args::{BucketExistsArgs, MakeBucketArgs},
@ -17,7 +12,7 @@ use url::Url;
use crate::{db::Website, Config, Timer};
const ROUND_TRIP_METRIC: &'static str = "s3_trips";
const S3_ROUND_TRIP_METRIC: &'static str = "s3_trips";
#[derive(Clone)]
pub struct S3 {
@ -47,6 +42,7 @@ impl S3 {
.expect("Failed to check if bucket exists"),
)
.await?;
counter!(S3_ROUND_TRIP_METRIC).increment(1);
if !exists {
trace!("Creating bucket...");
@ -56,23 +52,24 @@ impl S3 {
)
.await?;
}
counter!(S3_ROUND_TRIP_METRIC).increment(1);
trace!("Connection successful");
Ok(Self {
bucket_name: config.s3_bucket.to_owned(),
client: client,
client,
})
}
#[instrument(name = "s3_store", skip_all)]
pub async fn store(&self, data: &str, url: &Url) {
let counter = counter!(ROUND_TRIP_METRIC);
let counter = counter!(S3_ROUND_TRIP_METRIC);
let t = Timer::start("Stored page");
let _ = t; // prevent compiler drop
let filename = Website::get_url_as_string(url);
trace!("Created filename: {filename} from raw: {}", url.to_string());
let filename = Website::get_url_as_string(url);
trace!("Storing {} as {filename}", url.to_string());
counter.increment(1);
let _ = match &self
@ -84,6 +81,7 @@ impl S3 {
Ok(_) => {}
Err(err) => match err {
Error::InvalidObjectName(_) => {
// This code will really only run if the url has non-english chars
warn!("Tried storing invalid object name, retrying with Base64 encoding. Last try.");
let filename: String = Website::get_url_as_b64_path(url);