From f7bb0eef16ac6515d4badc2f6ec45f1cbe8a6b91 Mon Sep 17 00:00:00 2001 From: Rushmore75 Date: Wed, 16 Jul 2025 11:47:42 -0600 Subject: [PATCH] turn program into `batch_size` parrallel downloaders --- src/main.rs | 98 ++++++++++++++++++++++++----------------------------- 1 file changed, 44 insertions(+), 54 deletions(-) diff --git a/src/main.rs b/src/main.rs index 86c3f82..c2550b8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,11 @@ #![feature(ip_from)] #![feature(path_add_extension)] -#![warn(clippy::expect_used)] #![deny(clippy::unwrap_used)] extern crate html5ever; use std::{ - collections::HashSet, fs::File, io::Read, sync::LazyLock + collections::HashSet, fs::File, io::Read, sync::{Arc, LazyLock} }; use futures_util::StreamExt; @@ -15,7 +14,7 @@ use opentelemetry_otlp::{Protocol, WithExportConfig}; use db::{connect, Website}; use serde::Deserialize; use surrealdb::{engine::remote::ws::Client, Surreal}; -use tokio::{io::{AsyncWriteExt, BufWriter}, task::JoinSet}; +use tokio::{io::{AsyncWriteExt, BufWriter}, sync::RwLock, task::JoinSet}; use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, warn}; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry}; @@ -57,7 +56,8 @@ static SITES_CRAWLED: LazyLock> = LazyLock::new(|| .build() ); -static TRACER: LazyLock = LazyLock::new(|| global::tracer("Internet_Mapper")); +// FIXME Traces aren't working on multiple threads, they block +// static TRACER: LazyLock = LazyLock::new(|| global::tracer("Internet_Mapper")); #[derive(Deserialize)] struct Config { @@ -137,18 +137,16 @@ async fn main() { tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber"); // End LOGGING - info!("Starting..."); - // When getting uncrawled pages, name must contain this variable. "" will effectively get ignored. // let crawl_filter = "en.wikipedia.org/"; // let budget = 50; - let mut crawled = 0; + let crawled = Arc::new(RwLock::new(0)); let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml"); let mut buf = String::new(); let _ = file.read_to_string(&mut buf); - let config: Config = toml::from_str(&buf).expect("Failed to parse Crawler.toml"); + let config: Arc = Arc::new(toml::from_str(&buf).expect("Failed to parse Crawler.toml")); let starting_url = &config.start_url; let db = connect(&config) @@ -163,49 +161,39 @@ async fn main() { // Kick off the whole machine - This Website object doesn't matter, it's just to allow for // get() to work. - let mut span = TRACER.start("Pre-Loop"); + // let mut span = TRACER.start("Pre-Loop"); let site = Website::new(starting_url, false); process(site, db.clone(), reqwest.clone()).await; - span.end(); - // Download the site + // span.end(); - let mut main_loop_span= TRACER.start("Main-Loop"); - while crawled < config.budget { - let uncrawled = - get_uncrawled_links(&db, config.budget - crawled, config.crawl_filter.clone(), &config).await; - if uncrawled.is_empty() { - info!("Had more budget but finished crawling everything."); - return; - } - - { - let mut futures = JoinSet::new(); - for site in uncrawled { - futures.spawn(process(site, db.clone(), reqwest.clone())); - } - - // As futures complete runs code in while block - while futures.join_next().await.is_some() { - SITES_CRAWLED.add(1, &[]); - crawled += 1; - } - } - } - main_loop_span.end(); - - if let Ok(mut ok) = db - .query("count(select id from website where crawled = true)") - .await - { - let res = ok.take::>(0); - if let Ok(Some(n)) = res { - info!("Total crawled pages now equals {n}"); - } + // let mut main_loop_span= TRACER.start("Main-Loop"); + let mut futures = JoinSet::new(); + for _ in 0..config.batch_size { + futures.spawn(process_single_thread(config.clone(), db.clone(), reqwest.clone(), crawled.clone())); } + futures.join_all().await; + // main_loop_span.end(); info!("Done"); } +async fn process_single_thread(config: Arc, db: Surreal, reqwest: reqwest::Client, crawled: Arc>) { + while *(crawled.read().await) < config.budget { + let uncrawled = get_uncrawled_links(&db.clone(), 1, &config).await; + if uncrawled.is_empty() { + return + } + + for site in uncrawled { + process(site, db.clone(), reqwest.clone()).await; + SITES_CRAWLED.add(1, &[]); + // Somehow this write doesn't hang on the while's read? + let mut c = crawled.write().await; + *c += 1; + } + } +} + #[instrument(skip(db, reqwest))] /// Downloads and crawls and stores a webpage. /// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver @@ -214,7 +202,7 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien // METRICS trace!("Process: {}", &site.site); BEING_PROCESSED.add(1, &[]); - let mut process_span = TRACER.start("Process"); + // let mut process_span = TRACER.start("Process"); // Build the request let request_builder = reqwest.get(site.site.to_string()); @@ -226,6 +214,9 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien let headers = response.headers(); let code = response.status(); + if code != 200 { + warn!("{code} for {}", site.site.as_str()); + } #[allow(non_snake_case)] let CT = headers.get("Content-Type"); @@ -261,9 +252,9 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien let mut buf: Vec = Vec::new(); // Write file to disk - trace!("Writing at: {:?}", path); + trace!("Writing at: {:?}", tmp_path); BEING_STREAMED.add(1, &[]); - let mut stream_span = TRACER.start("Stream"); + // let mut stream_span = TRACER.start("Stream"); while let Some(data) = stream.next().await { match data { Ok(data) => { @@ -277,23 +268,23 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien } }, Err(err) => { - eprintln!("{}", err) + error!("{err}") }, } } - let _ = writer.flush(); + let _ = writer.flush().await; // rename the temp file into the real file name if let Err(err) = tokio::fs::rename(tmp_path, path).await { error!("{}", err); } - stream_span.end(); + // stream_span.end(); BEING_STREAMED.add(-1, &[]); // (If needed) Parse the file if should_parse { BEING_PARSED.add(1, &[]); - let mut parsing_span = TRACER.start("Parsing"); + // let mut parsing_span = TRACER.start("Parsing"); // Parse document and get relationships let sites = parser::parse(&site, &buf).await; @@ -309,7 +300,7 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien // Store all the other sites so that we can link to them. let _ = Website::store_all(de_dupe_sites, &db).await; - parsing_span.end(); + // parsing_span.end(); BEING_PARSED.add(-1, &[]); } @@ -324,7 +315,7 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien error!("Failed to get: {}", &site.site); } - process_span.end(); + // process_span.end(); BEING_PROCESSED.add(-1, &[]); } @@ -333,7 +324,6 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien async fn get_uncrawled_links( db: &Surreal, mut count: usize, - filter: String, config: &Config, ) -> Vec { if count > config.batch_size { @@ -344,7 +334,7 @@ async fn get_uncrawled_links( let mut response = db .query("SELECT * FROM website WHERE crawled = false AND site ~ type::string($format) LIMIT $count;") - .bind(("format", filter)) + .bind(("format", config.crawl_filter.to_string())) .bind(("count", count)) .await .expect("Hard-coded query failed..?");