From 6409baaffb09c4d904ff89914dd5c8d480bd5587 Mon Sep 17 00:00:00 2001 From: Rushmore75 Date: Wed, 19 Mar 2025 12:41:08 -0600 Subject: [PATCH] Reducted trips to surreal by x500 --- Crawler.toml | 10 ++-- docker/prometheus.yaml | 2 +- src/db.rs | 104 +++++++++++++++++++++++++++++++++-------- src/main.rs | 86 ++++++++++++++++++++++------------ src/parser.rs | 60 ++++++++++-------------- src/s3.rs | 64 ++++++++++++------------- src/setup.surql | 8 +++- 7 files changed, 208 insertions(+), 126 deletions(-) diff --git a/Crawler.toml b/Crawler.toml index 29625a8..551c326 100644 --- a/Crawler.toml +++ b/Crawler.toml @@ -3,14 +3,14 @@ surreal_url = "localhost:8000" surreal_username = "root" surreal_password = "root" surreal_ns = "test" -surreal_db = "v1.12" +surreal_db = "v1.14.1" # Minio config -s3_bucket = "v1.12" +s3_bucket = "v1.14.1" s3_url = "http://localhost:9000" -s3_access_key = "jLDPKGuu513VENc8kJwX" -s3_secret_key = "4T1nymEzsGYOlKSAb1WX7V3stnQn9a5ZoTQjDfcL" +s3_access_key = "3ptjsHhRHCHlpCmgFy9n" +s3_secret_key = "68CmV07YExeCxb8kJhosSauEizj5CAE7PINZIfQz" # Crawler config crawl_filter = "en.wikipedia.com" -budget = 200 \ No newline at end of file +budget = 100 diff --git a/docker/prometheus.yaml b/docker/prometheus.yaml index b8f5992..3047f38 100644 --- a/docker/prometheus.yaml +++ b/docker/prometheus.yaml @@ -7,7 +7,7 @@ scrape_configs: static_configs: # change this your machine's ip, localhost won't work # because localhost refers to the docker container. - - targets: ['192.168.8.209:2500'] + - targets: ['172.20.239.48:2500'] - job_name: loki static_configs: - targets: ['loki:3100'] diff --git a/src/db.rs b/src/db.rs index bb0edac..5f2a166 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,14 +1,27 @@ -use std::fmt::Debug; +use base64::{ + alphabet, + engine::{self, general_purpose}, + Engine, +}; use metrics::counter; use serde::{Deserialize, Serialize}; +use std::fmt::Debug; use surrealdb::{ - engine::remote::ws::{Client, Ws}, error::Db, opt::auth::Root, sql::Thing, Response, Surreal + engine::remote::ws::{Client, Ws}, + error::Db, + opt::auth::Root, + sql::{Query, Thing}, + Error::Api, + Response, Surreal, }; use tracing::{error, instrument, trace, warn}; use url::Url; use crate::{Config, Timer}; +const CUSTOM_ENGINE: engine::GeneralPurpose = + engine::GeneralPurpose::new(&alphabet::URL_SAFE, general_purpose::NO_PAD); + const ROUND_TRIP_METRIC: &'static str = "surql_trips"; const STORE: &'static str = "surql_store_calls"; const LINK: &'static str = "surql_link_calls"; @@ -50,11 +63,25 @@ impl Website { self.crawled = true } + pub fn get_url_as_string(site: &Url) -> String { + let domain = site.domain().unwrap_or("DOMAIN").to_string(); + let path = site.path(); + + domain + path + } + pub fn get_url_as_b64_path(site: &Url) -> String { + let domain = site.domain().unwrap_or("DOMAIN").to_string(); + let path = &CUSTOM_ENGINE.encode(site.path()); + + domain + path + } + #[instrument(skip_all)] pub async fn links_to(&self, other: Vec, db: &Surreal) { - let len = other.len(); - if len == 0 {return} + if len == 0 { + return; + } let from = self.site.to_string(); // let to = other.site.to_string(); @@ -88,13 +115,50 @@ impl Website { } } warn!("Linking request succeeded but couldn't verify the results."); - }, + } Err(e) => { error!("{}", e.to_string()); - }, + } } } + // Insert ever item in the vec into surreal, crawled state will be preserved as TRUE + // if already in the database as such or incoming data is TRUE. + pub async fn store_all(all: Vec, db: &Surreal) -> Vec { + counter!(ROUND_TRIP_METRIC).increment(1); + counter!(STORE).increment(1); + + let mut things = Vec::with_capacity(all.len()); + + match db + .query( + "INSERT INTO website $array + ON DUPLICATE KEY UPDATE + accessed_at = time::now(), + crawled = crawled OR $input.crawled + RETURN VALUE id; + ", + ) + .bind(("array", all)) + .await + { + Ok(mut id) => match id.take::>(0) { + Ok(mut x) => things.append(&mut x), + Err(err) => match err { + Api(error) => { + eprintln!("{:?}", error); + error!("{:?}", error); + } + _ => error!("{:?}", err), + }, + }, + Err(err) => { + error!("{:?}", err); + } + } + things + } + #[instrument(name = "surql_store", skip_all)] pub async fn store(&self, db: &Surreal) -> Option { counter!(STORE).increment(1); @@ -110,7 +174,10 @@ impl Website { .await .expect("Failed to check surreal for duplicates!"); - if let Some(old) = response.take::>(0).expect("Failed to read response from surreal for duplicates.") { + if let Some(old) = response + .take::>(0) + .expect("Failed to read response from surreal for duplicates.") + { // site exists already if let Some(id) = old.id { // make sure to preserve the "crawled status" @@ -128,15 +195,13 @@ impl Website { } Err(e) => { match e { - surrealdb::Error::Db(error) => { - match error { - Db::QueryCancelled => todo!(), - Db::QueryNotExecuted => todo!(), - Db::QueryNotExecutedDetail { message: _ } => todo!(), - _=>{}, - } + surrealdb::Error::Db(error) => match error { + Db::QueryCancelled => todo!(), + Db::QueryNotExecuted => todo!(), + Db::QueryNotExecutedDetail { message: _ } => todo!(), + _ => {} }, - _=>{}, + _ => {} } // error!("{}", e); } @@ -193,15 +258,16 @@ pub async fn connect(config: &Config) -> surrealdb::Result> { .await?; // Select a specific namespace / database - db - .use_ns(&config.surreal_ns) + db.use_ns(&config.surreal_ns) .use_db(&config.surreal_db) .await?; - + let setup = include_bytes!("setup.surql"); let file = setup.iter().map(|c| *c as char).collect::(); - db.query(file).await.expect("Failed to setup surreal tables."); + db.query(file) + .await + .expect("Failed to setup surreal tables."); Ok(db) } diff --git a/src/main.rs b/src/main.rs index 42f65ae..02816ce 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,12 @@ extern crate html5ever; -use std::{fs::File, io::Read, net::{IpAddr, Ipv4Addr}, time::Instant}; +use std::{ + fs::File, + io::Read, + net::{IpAddr, Ipv4Addr}, + time::Instant, +}; use db::{connect, Website}; use metrics::{counter, gauge}; @@ -11,7 +16,7 @@ use s3::S3; use serde::Deserialize; use surrealdb::{engine::remote::ws::Client, Surreal}; use tokio::task::JoinSet; -use tracing::{debug, info, instrument, trace, trace_span, warn}; +use tracing::{debug, error, info, instrument, trace, trace_span, warn}; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry}; mod db; @@ -50,25 +55,24 @@ async fn main() { .open("./docker/logs/tracing.log") .expect("Couldn't make log file!"); - let registry = Registry::default() - .with( - fmt::layer() - .with_line_number(true) - .with_thread_ids(true) - .with_file(true) - // .with_timer(LocalTime::rfc_3339()) // Loki or alloy does this automatically - .json() - .with_writer(writer) - // .with_filter(EnvFilter::from_default_env()) - ); + let registry = Registry::default().with( + fmt::layer() + .with_line_number(true) + .with_thread_ids(true) + .with_file(true) + // .with_timer(LocalTime::rfc_3339()) // Loki or alloy does this automatically + .json() + .with_writer(writer), // .with_filter(EnvFilter::from_default_env()) + ); tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber"); - let builder = PrometheusBuilder::new(); - builder.with_http_listener( - std::net::SocketAddr::new(IpAddr::V4(Ipv4Addr::from_octets([0,0,0,0])), 2500) - ) + builder + .with_http_listener(std::net::SocketAddr::new( + IpAddr::V4(Ipv4Addr::from_octets([0, 0, 0, 0])), + 2500, + )) .install() .expect("failed to install recorder/exporter"); @@ -80,7 +84,6 @@ async fn main() { // let budget = 50; let mut crawled = 0; - let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml"); let mut buf = String::new(); let _ = file.read_to_string(&mut buf); @@ -106,8 +109,9 @@ async fn main() { let pre_loop_span = span.enter(); // Download the site let site = Website::new(&starting_url, false); - get(site, db.clone(), reqwest.clone(), s3.clone()).await; + process(site, db.clone(), reqwest.clone(), s3.clone()).await; + return; drop(pre_loop_span); let span = trace_span!("Loop"); @@ -133,7 +137,7 @@ async fn main() { let mut futures = JoinSet::new(); for site in uncrawled { gauge!(BEING_PROCESSED).increment(1); - futures.spawn(get(site, db.clone(), reqwest.clone(), s3.clone())); + futures.spawn(process(site, db.clone(), reqwest.clone(), s3.clone())); // let percent = format!("{:.2}%", (crawled as f32 / budget as f32) * 100f32); // info!("Crawled {crawled} out of {budget} pages. ({percent})"); } @@ -154,36 +158,56 @@ async fn main() { drop(total_runtime); } -#[instrument(skip (db, s3, reqwest))] +#[instrument(skip(db, s3, reqwest))] /// Downloads and crawls and stores a webpage. -/// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver -async fn get(mut site: Website, db: Surreal, reqwest: reqwest::Client, s3: S3) { - trace!("Get: {}", site.to_string()); - +/// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver +async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Client, s3: S3) { + + // METRICS + trace!("Process: {}", site.to_string()); let timer = Timer::start("Built request"); + // Build the request let request_builder = reqwest.get(site.to_string()); + // METRICS timer.stop(); + // METRICS let g = gauge!(GET_IN_FLIGHT); g.increment(1); let timer = Timer::start("Got page"); + // Send the http request (get) if let Ok(response) = request_builder.send().await { + // METRICS timer.stop(); g.decrement(1); counter!(GET_METRIC).increment(1); debug!("Getting body..."); - // Get body - let data = response.text().await.expect("Failed to read http response's body!"); + // Get body from respones + let data = response + .text() + .await + .expect("Failed to read http response's body!"); // Store document s3.store(&data, &site.site).await; - // Parse document and store relationships - parser::parse(&db, &mut site, &data).await; - return; + + // Parse document and get relationships + let sites = parser::parse(&site, &data).await; + + // update self in db + site.set_crawled(); + site.store(&db).await; + + // Store all the other sites so that we can link to them. + // let mut links_to = Vec::new(); + let others = Website::store_all(sites, &db).await; + + // Make the database's links reflect the html links between sites + site.links_to(others, &db).await; } - trace!("Failed to get: {}", site.to_string()); + error!("Failed to get: {}", site.to_string()); } /// Returns uncrawled links diff --git a/src/parser.rs b/src/parser.rs index 0e5dc38..17c4622 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -40,7 +40,9 @@ impl TokenSink for Website { // Set url let mut url = web.site; url.set_fragment(None); // removes #xyz - let joined = url.join(&attr.value).expect("Failed to join url during parsing!"); + let joined = url + .join(&attr.value) + .expect("Failed to join url during parsing!"); web.site = joined; web.crawled = false; @@ -65,45 +67,31 @@ impl TokenSink for Website { } #[instrument(skip_all)] -pub async fn parse(db: &Surreal, site: &mut Website, data: &str) { - // update self in db - site.set_crawled(); - site.store(db).await; - +/// Parses the passed site and returns all the sites it links to. +pub async fn parse(site: &Website, data: &str) -> Vec { // prep work let mut other_sites: Vec = Vec::new(); - { // using blocks to prevent compiler's async worries - let _t = Timer::start("Parsed page"); + let _t = Timer::start("Parsed page"); - // change data into something that can be tokenized - let chunk = Tendril::from_str(&data).expect("Failed to parse string into Tendril!"); - // create buffer of tokens and push our input into it - let mut token_buffer = BufferQueue::default(); - token_buffer.push_back(chunk.try_reinterpret::().expect("Failed to reinterprt chunk!")); - // create the tokenizer - let tokenizer = Tokenizer::new(site.clone(), TokenizerOpts::default()); + // change data into something that can be tokenized + let chunk = Tendril::from_str(&data).expect("Failed to parse string into Tendril!"); + // create buffer of tokens and push our input into it + let mut token_buffer = BufferQueue::default(); + token_buffer.push_back( + chunk + .try_reinterpret::() + .expect("Failed to reinterprt chunk!"), + ); + // create the tokenizer + let tokenizer = Tokenizer::new(site.clone(), TokenizerOpts::default()); - // go thru buffer - while let TokenizerResult::Script(mut sites) = tokenizer.feed(&mut token_buffer) { - other_sites.append(&mut sites); - // other_sites.push(sites); - } - - assert!(token_buffer.is_empty()); - tokenizer.end(); + // go thru buffer + while let TokenizerResult::Script(mut sites) = tokenizer.feed(&mut token_buffer) { + other_sites.append(&mut sites); + // other_sites.push(sites); } + assert!(token_buffer.is_empty()); + tokenizer.end(); - { - let mut links_to = Vec::with_capacity(other_sites.len()); - - for a in other_sites { - - let other = a.store(db).await; - if let Some(o) = other { - links_to.push(o); - } - } - - site.links_to(links_to, db).await; - } + other_sites } diff --git a/src/s3.rs b/src/s3.rs index 99bd91a..5d37aaf 100644 --- a/src/s3.rs +++ b/src/s3.rs @@ -1,4 +1,8 @@ -use base64::{alphabet, engine::{self, general_purpose}, Engine}; +use base64::{ + alphabet, + engine::{self, general_purpose}, + Engine, +}; use metrics::counter; use minio::s3::{ args::{BucketExistsArgs, MakeBucketArgs}, @@ -11,9 +15,7 @@ use minio::s3::{ use tracing::{instrument, trace, warn}; use url::Url; -use crate::{Config, Timer}; - -const CUSTOM_ENGINE: engine::GeneralPurpose = engine::GeneralPurpose::new(&alphabet::URL_SAFE, general_purpose::NO_PAD); +use crate::{db::Website, Config, Timer}; const ROUND_TRIP_METRIC: &'static str = "s3_trips"; @@ -68,38 +70,34 @@ impl S3 { let counter = counter!(ROUND_TRIP_METRIC); let t = Timer::start("Stored page"); let _ = t; // prevent compiler drop - if let Some(domain) = url.domain() { - let filename = domain.to_owned() + url.path(); + let filename = Website::get_url_as_string(url); - trace!("Created filename: {filename} from raw: {}", url.to_string()); + trace!("Created filename: {filename} from raw: {}", url.to_string()); - counter.increment(1); - let _ = match &self - .client - .put_object_content(&self.bucket_name, &filename, data.to_owned()) - .send() - .await { - Ok(_) => {}, - Err(err) => { - match err { - Error::InvalidObjectName(_) => { + counter.increment(1); + let _ = match &self + .client + .put_object_content(&self.bucket_name, &filename, data.to_owned()) + .send() + .await + { + Ok(_) => {} + Err(err) => match err { + Error::InvalidObjectName(_) => { + warn!("Tried storing invalid object name, retrying with Base64 encoding. Last try."); - warn!("Tried storing invalid object name, retrying with Base64 encoding. Last try."); - - let filename: String = domain.to_owned() + &CUSTOM_ENGINE.encode(url.path()); + let filename: String = Website::get_url_as_b64_path(url); - counter.increment(1); - let _ = &self - .client - .put_object_content(&self.bucket_name, &filename, data.to_owned()) - .send() - .await - .unwrap(); - }, - _ => {}, - } - }, - }; - } + counter.increment(1); + let _ = &self + .client + .put_object_content(&self.bucket_name, &filename, data.to_owned()) + .send() + .await + .unwrap(); + } + _ => {} + }, + }; } } diff --git a/src/setup.surql b/src/setup.surql index 61c020c..17b89f5 100644 --- a/src/setup.surql +++ b/src/setup.surql @@ -1,3 +1,9 @@ DEFINE TABLE IF NOT EXISTS website SCHEMALESS; -DEFINE FIELD IF NOT EXISTS accessed_at ON TABLE website VALUE time::now(); + +DEFINE FIELD IF NOT EXISTS site ON TABLE website TYPE string; DEFINE INDEX IF NOT EXISTS idx ON TABLE website COLUMNS site UNIQUE; + +DEFINE FIELD IF NOT EXISTS crawled ON TABLE website TYPE bool; + +DEFINE FIELD IF NOT EXISTS accessed_at ON TABLE website VALUE time::now(); +DEFINE FIELD IF NOT EXISTS first_accessed_at ON TABLE website VALUE time::now();