turn program into batch_size parrallel downloaders

This commit is contained in:
Rushmore75 2025-07-16 11:47:42 -06:00
parent 865f9be8c0
commit f7bb0eef16

View File

@ -1,12 +1,11 @@
#![feature(ip_from)] #![feature(ip_from)]
#![feature(path_add_extension)] #![feature(path_add_extension)]
#![warn(clippy::expect_used)]
#![deny(clippy::unwrap_used)] #![deny(clippy::unwrap_used)]
extern crate html5ever; extern crate html5ever;
use std::{ use std::{
collections::HashSet, fs::File, io::Read, sync::LazyLock collections::HashSet, fs::File, io::Read, sync::{Arc, LazyLock}
}; };
use futures_util::StreamExt; use futures_util::StreamExt;
@ -15,7 +14,7 @@ use opentelemetry_otlp::{Protocol, WithExportConfig};
use db::{connect, Website}; use db::{connect, Website};
use serde::Deserialize; use serde::Deserialize;
use surrealdb::{engine::remote::ws::Client, Surreal}; use surrealdb::{engine::remote::ws::Client, Surreal};
use tokio::{io::{AsyncWriteExt, BufWriter}, task::JoinSet}; use tokio::{io::{AsyncWriteExt, BufWriter}, sync::RwLock, task::JoinSet};
use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, warn}; use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, warn};
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry}; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry};
@ -57,7 +56,8 @@ static SITES_CRAWLED: LazyLock<Counter<u64>> = LazyLock::new(||
.build() .build()
); );
static TRACER: LazyLock<BoxedTracer> = LazyLock::new(|| global::tracer("Internet_Mapper")); // FIXME Traces aren't working on multiple threads, they block
// static TRACER: LazyLock<BoxedTracer> = LazyLock::new(|| global::tracer("Internet_Mapper"));
#[derive(Deserialize)] #[derive(Deserialize)]
struct Config { struct Config {
@ -137,18 +137,16 @@ async fn main() {
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber"); tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
// End LOGGING // End LOGGING
info!("Starting...");
// When getting uncrawled pages, name must contain this variable. "" will effectively get ignored. // When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
// let crawl_filter = "en.wikipedia.org/"; // let crawl_filter = "en.wikipedia.org/";
// let budget = 50; // let budget = 50;
let mut crawled = 0; let crawled = Arc::new(RwLock::new(0));
let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml"); let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml");
let mut buf = String::new(); let mut buf = String::new();
let _ = file.read_to_string(&mut buf); let _ = file.read_to_string(&mut buf);
let config: Config = toml::from_str(&buf).expect("Failed to parse Crawler.toml"); let config: Arc<Config> = Arc::new(toml::from_str(&buf).expect("Failed to parse Crawler.toml"));
let starting_url = &config.start_url; let starting_url = &config.start_url;
let db = connect(&config) let db = connect(&config)
@ -163,49 +161,39 @@ async fn main() {
// Kick off the whole machine - This Website object doesn't matter, it's just to allow for // Kick off the whole machine - This Website object doesn't matter, it's just to allow for
// get() to work. // get() to work.
let mut span = TRACER.start("Pre-Loop"); // let mut span = TRACER.start("Pre-Loop");
let site = Website::new(starting_url, false); let site = Website::new(starting_url, false);
process(site, db.clone(), reqwest.clone()).await; process(site, db.clone(), reqwest.clone()).await;
span.end(); // span.end();
// Download the site
let mut main_loop_span= TRACER.start("Main-Loop"); // let mut main_loop_span= TRACER.start("Main-Loop");
while crawled < config.budget {
let uncrawled =
get_uncrawled_links(&db, config.budget - crawled, config.crawl_filter.clone(), &config).await;
if uncrawled.is_empty() {
info!("Had more budget but finished crawling everything.");
return;
}
{
let mut futures = JoinSet::new(); let mut futures = JoinSet::new();
for site in uncrawled { for _ in 0..config.batch_size {
futures.spawn(process(site, db.clone(), reqwest.clone())); futures.spawn(process_single_thread(config.clone(), db.clone(), reqwest.clone(), crawled.clone()));
}
// As futures complete runs code in while block
while futures.join_next().await.is_some() {
SITES_CRAWLED.add(1, &[]);
crawled += 1;
}
}
}
main_loop_span.end();
if let Ok(mut ok) = db
.query("count(select id from website where crawled = true)")
.await
{
let res = ok.take::<Option<usize>>(0);
if let Ok(Some(n)) = res {
info!("Total crawled pages now equals {n}");
}
} }
futures.join_all().await;
// main_loop_span.end();
info!("Done"); info!("Done");
} }
async fn process_single_thread(config: Arc<Config>, db: Surreal<Client>, reqwest: reqwest::Client, crawled: Arc<RwLock<usize>>) {
while *(crawled.read().await) < config.budget {
let uncrawled = get_uncrawled_links(&db.clone(), 1, &config).await;
if uncrawled.is_empty() {
return
}
for site in uncrawled {
process(site, db.clone(), reqwest.clone()).await;
SITES_CRAWLED.add(1, &[]);
// Somehow this write doesn't hang on the while's read?
let mut c = crawled.write().await;
*c += 1;
}
}
}
#[instrument(skip(db, reqwest))] #[instrument(skip(db, reqwest))]
/// Downloads and crawls and stores a webpage. /// Downloads and crawls and stores a webpage.
/// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver /// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver
@ -214,7 +202,7 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
// METRICS // METRICS
trace!("Process: {}", &site.site); trace!("Process: {}", &site.site);
BEING_PROCESSED.add(1, &[]); BEING_PROCESSED.add(1, &[]);
let mut process_span = TRACER.start("Process"); // let mut process_span = TRACER.start("Process");
// Build the request // Build the request
let request_builder = reqwest.get(site.site.to_string()); let request_builder = reqwest.get(site.site.to_string());
@ -226,6 +214,9 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
let headers = response.headers(); let headers = response.headers();
let code = response.status(); let code = response.status();
if code != 200 {
warn!("{code} for {}", site.site.as_str());
}
#[allow(non_snake_case)] #[allow(non_snake_case)]
let CT = headers.get("Content-Type"); let CT = headers.get("Content-Type");
@ -261,9 +252,9 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
let mut buf: Vec<u8> = Vec::new(); let mut buf: Vec<u8> = Vec::new();
// Write file to disk // Write file to disk
trace!("Writing at: {:?}", path); trace!("Writing at: {:?}", tmp_path);
BEING_STREAMED.add(1, &[]); BEING_STREAMED.add(1, &[]);
let mut stream_span = TRACER.start("Stream"); // let mut stream_span = TRACER.start("Stream");
while let Some(data) = stream.next().await { while let Some(data) = stream.next().await {
match data { match data {
Ok(data) => { Ok(data) => {
@ -277,23 +268,23 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
} }
}, },
Err(err) => { Err(err) => {
eprintln!("{}", err) error!("{err}")
}, },
} }
} }
let _ = writer.flush(); let _ = writer.flush().await;
// rename the temp file into the real file name // rename the temp file into the real file name
if let Err(err) = tokio::fs::rename(tmp_path, path).await { if let Err(err) = tokio::fs::rename(tmp_path, path).await {
error!("{}", err); error!("{}", err);
} }
stream_span.end(); // stream_span.end();
BEING_STREAMED.add(-1, &[]); BEING_STREAMED.add(-1, &[]);
// (If needed) Parse the file // (If needed) Parse the file
if should_parse { if should_parse {
BEING_PARSED.add(1, &[]); BEING_PARSED.add(1, &[]);
let mut parsing_span = TRACER.start("Parsing"); // let mut parsing_span = TRACER.start("Parsing");
// Parse document and get relationships // Parse document and get relationships
let sites = parser::parse(&site, &buf).await; let sites = parser::parse(&site, &buf).await;
@ -309,7 +300,7 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
// Store all the other sites so that we can link to them. // Store all the other sites so that we can link to them.
let _ = Website::store_all(de_dupe_sites, &db).await; let _ = Website::store_all(de_dupe_sites, &db).await;
parsing_span.end(); // parsing_span.end();
BEING_PARSED.add(-1, &[]); BEING_PARSED.add(-1, &[]);
} }
@ -324,7 +315,7 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
error!("Failed to get: {}", &site.site); error!("Failed to get: {}", &site.site);
} }
process_span.end(); // process_span.end();
BEING_PROCESSED.add(-1, &[]); BEING_PROCESSED.add(-1, &[]);
} }
@ -333,7 +324,6 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
async fn get_uncrawled_links( async fn get_uncrawled_links(
db: &Surreal<Client>, db: &Surreal<Client>,
mut count: usize, mut count: usize,
filter: String,
config: &Config, config: &Config,
) -> Vec<Website> { ) -> Vec<Website> {
if count > config.batch_size { if count > config.batch_size {
@ -344,7 +334,7 @@ async fn get_uncrawled_links(
let mut response = db let mut response = db
.query("SELECT * FROM website WHERE crawled = false AND site ~ type::string($format) LIMIT $count;") .query("SELECT * FROM website WHERE crawled = false AND site ~ type::string($format) LIMIT $count;")
.bind(("format", filter)) .bind(("format", config.crawl_filter.to_string()))
.bind(("count", count)) .bind(("count", count))
.await .await
.expect("Hard-coded query failed..?"); .expect("Hard-coded query failed..?");