diff --git a/src/db.rs b/src/db.rs index 5f2a166..8aae6e8 100644 --- a/src/db.rs +++ b/src/db.rs @@ -4,43 +4,55 @@ use base64::{ Engine, }; use metrics::counter; -use serde::{Deserialize, Serialize}; -use std::fmt::Debug; +use serde::{ser::SerializeStruct, Deserialize, Serialize}; +use std::{fmt::Debug, sync::LazyLock, time::Instant}; use surrealdb::{ engine::remote::ws::{Client, Ws}, - error::Db, opt::auth::Root, - sql::{Query, Thing}, + sql::Thing, Error::Api, Response, Surreal, }; +use tokio::sync::Mutex; use tracing::{error, instrument, trace, warn}; use url::Url; use crate::{Config, Timer}; +// static LOCK: LazyLock>> = LazyLock::new(|| Arc::new(Mutex::new(true))); +static LOCK: LazyLock> = LazyLock::new(|| Mutex::new(true)); + const CUSTOM_ENGINE: engine::GeneralPurpose = engine::GeneralPurpose::new(&alphabet::URL_SAFE, general_purpose::NO_PAD); -const ROUND_TRIP_METRIC: &'static str = "surql_trips"; +const TIME_SPENT_ON_LOCK: &'static str = "surql_lock_waiting_ms"; const STORE: &'static str = "surql_store_calls"; const LINK: &'static str = "surql_link_calls"; -#[derive(Serialize, Deserialize, Clone)] +#[derive(Deserialize, Clone)] pub struct Website { /// The url that this data is found at pub site: Url, /// Wether or not this link has been crawled yet pub crawled: bool, - #[serde(skip_serializing)] - id: Option, +} + +impl Serialize for Website { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer { + let mut state = serializer.serialize_struct("Website", 2)?; + state.serialize_field("crawled", &self.crawled)?; + // to_string() calls the correct naming of site + state.serialize_field("site", &self.site.to_string())?; + state.end() + } } // manual impl to make tracing look nicer impl Debug for Website { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let site = (self.site.domain().unwrap_or("n/a")).to_string() + self.site.path(); - f.debug_struct("Website").field("site", &site).finish() + f.debug_struct("Website").field("site", &self.site).finish() } } @@ -52,9 +64,8 @@ impl Website { Err(_) => todo!(), }; Self { - id: None, crawled, - site, + site } } @@ -64,7 +75,13 @@ impl Website { } pub fn get_url_as_string(site: &Url) -> String { - let domain = site.domain().unwrap_or("DOMAIN").to_string(); + let domain = match site.domain() { + Some(s) => s.to_string(), + None => { + warn!("Failed to get domain of URL: {}, falling back to 'localhost'", site.to_string()); + "localhost".to_string() + } + }; let path = site.path(); domain + path @@ -82,19 +99,19 @@ impl Website { if len == 0 { return; } + + let from = &self.site; - let from = self.site.to_string(); // let to = other.site.to_string(); - trace!("Linking {from} to {} other pages.", other.len()); - let msg = format!("Linked {len} pages"); + trace!("Linking {} pages to {from}", other.len()); + let msg = format!("Linked {len} pages to {from}"); let timer = Timer::start(&msg); // prevent the timer from being dropped instantly. let _ = timer; - counter!(ROUND_TRIP_METRIC).increment(1); counter!(LINK).increment(1); match db .query("COUNT(RELATE (SELECT id FROM website WHERE site = $in) -> links_to -> $out)") - .bind(("in", from)) + .bind(("in", from.clone())) .bind(("out", other)) .await { @@ -106,15 +123,15 @@ impl Website { let _: Vec = vec; if let Some(num) = vec.get(0) { if *num == len { - trace!("Link OK"); + trace!("Link for {from} OK - {num}/{len}"); return; } else { - warn!("Didn't link all the records. {num}/{len}"); + error!("Didn't link all the records. {num}/{len}. Surreal response: {:?}", e); return; } } } - warn!("Linking request succeeded but couldn't verify the results."); + error!("Linking request succeeded but couldn't verify the results."); } Err(e) => { error!("{}", e.to_string()); @@ -125,11 +142,17 @@ impl Website { // Insert ever item in the vec into surreal, crawled state will be preserved as TRUE // if already in the database as such or incoming data is TRUE. pub async fn store_all(all: Vec, db: &Surreal) -> Vec { - counter!(ROUND_TRIP_METRIC).increment(1); counter!(STORE).increment(1); - let mut things = Vec::with_capacity(all.len()); + // TODO this only allows for one thread to be in the database at a time. + // This is currently required since otherwise we get write errors. + // If the default `crawled` is set to false, we might not need to write any more + // than just the name. `accessed_at` is fun but not needed. + let now = Instant::now(); + let lock = LOCK.lock().await; + counter!(TIME_SPENT_ON_LOCK).increment(now.elapsed().as_millis() as u64); + match db .query( "INSERT INTO website $array @@ -156,79 +179,9 @@ impl Website { error!("{:?}", err); } } + drop(lock); things } - - #[instrument(name = "surql_store", skip_all)] - pub async fn store(&self, db: &Surreal) -> Option { - counter!(STORE).increment(1); - let counter = counter!(ROUND_TRIP_METRIC); - let t = Timer::start("Stored link"); - let _ = t; - counter.increment(1); - - // check if it's been gone thru before - let mut response = db - .query("SELECT * FROM ONLY website WHERE site = $site LIMIT 1") - .bind(("site", self.site.to_string())) - .await - .expect("Failed to check surreal for duplicates!"); - - if let Some(old) = response - .take::>(0) - .expect("Failed to read response from surreal for duplicates.") - { - // site exists already - if let Some(id) = old.id { - // make sure to preserve the "crawled status" - let mut new = self.clone(); - new.crawled = old.crawled | new.crawled; - - counter.increment(1); - // update the record - match db.upsert((id.tb, id.id.to_string())).content(new).await { - Ok(e) => { - if let Some(a) = e { - let _: Record = a; - return Some(a.id); - } - } - Err(e) => { - match e { - surrealdb::Error::Db(error) => match error { - Db::QueryCancelled => todo!(), - Db::QueryNotExecuted => todo!(), - Db::QueryNotExecutedDetail { message: _ } => todo!(), - _ => {} - }, - _ => {} - } - // error!("{}", e); - } - }; - } - } else { - counter.increment(1); - // sites hasn't existed yet - match db.create("website").content(self.clone()).await { - Ok(e) => { - let _: Option = e; - if let Some(a) = e { - let _: Record = a; - return Some(a.id); - } - } - Err(a) => error!("{:?}", a), - }; - } - None - } -} - -impl ToString for Website { - fn to_string(&self) -> String { - self.site.to_string() - } } #[derive(Debug, Serialize)] diff --git a/src/main.rs b/src/main.rs index 02816ce..1d02d56 100644 --- a/src/main.rs +++ b/src/main.rs @@ -55,14 +55,16 @@ async fn main() { .open("./docker/logs/tracing.log") .expect("Couldn't make log file!"); + let filter = EnvFilter::from_default_env(); + let registry = Registry::default().with( fmt::layer() .with_line_number(true) .with_thread_ids(true) .with_file(true) - // .with_timer(LocalTime::rfc_3339()) // Loki or alloy does this automatically .json() - .with_writer(writer), // .with_filter(EnvFilter::from_default_env()) + .with_writer(writer) + .with_filter(filter) ); tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber"); @@ -111,7 +113,6 @@ async fn main() { let site = Website::new(&starting_url, false); process(site, db.clone(), reqwest.clone(), s3.clone()).await; - return; drop(pre_loop_span); let span = trace_span!("Loop"); @@ -154,7 +155,7 @@ async fn main() { } drop(span); - info!("Done"); + debug!("Done"); drop(total_runtime); } @@ -164,10 +165,10 @@ async fn main() { async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Client, s3: S3) { // METRICS - trace!("Process: {}", site.to_string()); + trace!("Process: {}", &site.site); let timer = Timer::start("Built request"); // Build the request - let request_builder = reqwest.get(site.to_string()); + let request_builder = reqwest.get(&site.site.to_string()); // METRICS timer.stop(); @@ -183,9 +184,8 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien timer.stop(); g.decrement(1); counter!(GET_METRIC).increment(1); - debug!("Getting body..."); - // Get body from respones + // Get body from response let data = response .text() .await @@ -198,7 +198,7 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien // update self in db site.set_crawled(); - site.store(&db).await; + Website::store_all(vec![site.clone()], &db).await; // Store all the other sites so that we can link to them. // let mut links_to = Vec::new(); @@ -206,8 +206,9 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien // Make the database's links reflect the html links between sites site.links_to(others, &db).await; + } else { + error!("Failed to get: {}", &site.site); } - error!("Failed to get: {}", site.to_string()); } /// Returns uncrawled links @@ -252,8 +253,6 @@ impl<'a> Timer<'a> { if ms > 200. { warn!("{}", format!("{} in {:.3}ms", self.msg, ms)); - } else { - trace!("{}", format!("{} in {:.3}ms", self.msg, ms)); } ms diff --git a/src/parser.rs b/src/parser.rs index 17c4622..fc6d5fe 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -5,8 +5,6 @@ use html5ever::tokenizer::{BufferQueue, TokenizerResult}; use html5ever::tokenizer::{StartTag, TagToken}; use html5ever::tokenizer::{Token, TokenSink, TokenSinkResult, Tokenizer, TokenizerOpts}; use html5ever::{local_name, tendril::*}; -use surrealdb::engine::remote::ws::Client; -use surrealdb::Surreal; use tracing::instrument; use crate::db::Website; diff --git a/src/s3.rs b/src/s3.rs index 5d37aaf..f984606 100644 --- a/src/s3.rs +++ b/src/s3.rs @@ -1,8 +1,3 @@ -use base64::{ - alphabet, - engine::{self, general_purpose}, - Engine, -}; use metrics::counter; use minio::s3::{ args::{BucketExistsArgs, MakeBucketArgs}, @@ -17,7 +12,7 @@ use url::Url; use crate::{db::Website, Config, Timer}; -const ROUND_TRIP_METRIC: &'static str = "s3_trips"; +const S3_ROUND_TRIP_METRIC: &'static str = "s3_trips"; #[derive(Clone)] pub struct S3 { @@ -47,6 +42,7 @@ impl S3 { .expect("Failed to check if bucket exists"), ) .await?; + counter!(S3_ROUND_TRIP_METRIC).increment(1); if !exists { trace!("Creating bucket..."); @@ -56,23 +52,24 @@ impl S3 { ) .await?; } + counter!(S3_ROUND_TRIP_METRIC).increment(1); trace!("Connection successful"); Ok(Self { bucket_name: config.s3_bucket.to_owned(), - client: client, + client, }) } #[instrument(name = "s3_store", skip_all)] pub async fn store(&self, data: &str, url: &Url) { - let counter = counter!(ROUND_TRIP_METRIC); + let counter = counter!(S3_ROUND_TRIP_METRIC); let t = Timer::start("Stored page"); let _ = t; // prevent compiler drop - let filename = Website::get_url_as_string(url); - trace!("Created filename: {filename} from raw: {}", url.to_string()); + let filename = Website::get_url_as_string(url); + trace!("Storing {} as {filename}", url.to_string()); counter.increment(1); let _ = match &self @@ -84,6 +81,7 @@ impl S3 { Ok(_) => {} Err(err) => match err { Error::InvalidObjectName(_) => { + // This code will really only run if the url has non-english chars warn!("Tried storing invalid object name, retrying with Base64 encoding. Last try."); let filename: String = Website::get_url_as_b64_path(url);