#![feature(ip_from)] extern crate html5ever; use std::{ collections::HashSet, fs::File, io::Read, net::{IpAddr, Ipv4Addr} }; use db::{connect, Website}; use metrics::{counter, gauge}; use metrics_exporter_prometheus::PrometheusBuilder; use serde::Deserialize; use surrealdb::{engine::remote::ws::Client, Surreal}; use tokio::task::JoinSet; use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, trace_span}; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry}; mod db; mod parser; mod filesystem; const GET_METRIC: &str = "total_gets"; const GET_IN_FLIGHT: &str = "gets_in_flight"; const SITES_CRAWLED: &str = "pages_crawled"; const BEING_PROCESSED: &str = "pages_being_processed"; #[derive(Deserialize)] struct Config { surreal_ns: String, surreal_db: String, surreal_url: String, surreal_username: String, surreal_password: String, crawl_filter: String, budget: usize, } #[tokio::main] async fn main() { let writer = std::fs::OpenOptions::new() .append(true) .create(true) .open("./docker/logs/tracing.log") .expect("Couldn't make log file!"); let filter = EnvFilter::builder() .with_default_directive(LevelFilter::DEBUG.into()) .from_env_lossy(); let registry = Registry::default().with( fmt::layer() .with_line_number(true) .with_thread_ids(true) .with_file(true) .json() .with_writer(writer) .with_filter(filter) ); tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber"); let builder = PrometheusBuilder::new(); builder .with_http_listener(std::net::SocketAddr::new( IpAddr::V4(Ipv4Addr::from_octets([0, 0, 0, 0])), 2500, )) .install() .expect("failed to install recorder/exporter"); info!("Starting..."); // Would probably take these in as parameters from a cli let starting_url = "https://en.wikipedia.org/"; // When getting uncrawled pages, name must contain this variable. "" will effectively get ignored. // let crawl_filter = "en.wikipedia.org/"; // let budget = 50; let mut crawled = 0; let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml"); let mut buf = String::new(); let _ = file.read_to_string(&mut buf); let config: Config = toml::from_str(&buf).expect("Failed to parse Crawler.toml"); let db = connect(&config) .await .expect("Failed to connect to surreal, aborting."); let reqwest = reqwest::Client::builder() // .use_rustls_tls() .gzip(true) .build() .expect("Failed to build reqwest client."); // Kick off the whole machine - This Website object doesn't matter, it's just to allow for // get() to work. let span = trace_span!("Pre-Loop"); let pre_loop_span = span.enter(); // Download the site let site = Website::new(starting_url, false); process(site, db.clone(), reqwest.clone()).await; drop(pre_loop_span); let span = trace_span!("Loop"); let span = span.enter(); while crawled < config.budget { let get_num = if config.budget - crawled < 100 { config.budget - crawled } else { 100 }; let uncrawled = get_uncrawled_links(&db, get_num, config.crawl_filter.clone()).await; if uncrawled.is_empty() { info!("Had more budget but finished crawling everything."); return; } { let mut futures = JoinSet::new(); for site in uncrawled { gauge!(BEING_PROCESSED).increment(1); futures.spawn(process(site, db.clone(), reqwest.clone())); // let percent = format!("{:.2}%", (crawled as f32 / budget as f32) * 100f32); // info!("Crawled {crawled} out of {budget} pages. ({percent})"); } let c = counter!(SITES_CRAWLED); // As futures complete runs code in while block while futures.join_next().await.is_some() { c.increment(1); gauge!(BEING_PROCESSED).decrement(1); crawled += 1; } } } drop(span); info!("Done"); } #[instrument(skip(db, reqwest))] /// Downloads and crawls and stores a webpage. /// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Client) { // METRICS trace!("Process: {}", &site.site); // Build the request let request_builder = reqwest.get(site.site.to_string()); // METRICS let g = gauge!(GET_IN_FLIGHT); g.increment(1); // Send the http request (get) if let Ok(response) = request_builder.send().await { // METRICS g.decrement(1); counter!(GET_METRIC).increment(1); // Get body from response let data = response .text() .await .expect("Failed to read http response's body!"); // Store document filesystem::store(&data, &site.site).await; // Parse document and get relationships let sites = parser::parse(&site, &data).await; // update self in db site.set_crawled(); Website::store_all(vec![site], &db).await; // De-duplicate this list let prev_len = sites.len(); let set = sites.into_iter().fold(HashSet::new(), |mut set,item| { set.insert(item); set }); let de_dupe_sites: Vec = set.into_iter().collect(); let diff = prev_len - de_dupe_sites.len(); trace!("Saved {diff} from being entered into the db by de-duping"); // Store all the other sites so that we can link to them. let _ = Website::store_all(de_dupe_sites, &db).await; } else { error!("Failed to get: {}", &site.site); } } /// Returns uncrawled links #[instrument(skip(db))] async fn get_uncrawled_links( db: &Surreal, mut count: usize, filter: String, ) -> Vec { if count > 100 { count = 100 } debug!("Getting uncrawled links"); let mut response = db .query("SELECT * FROM website WHERE crawled = false AND site ~ type::string($format) LIMIT $count;") .bind(("format", filter)) .bind(("count", count)) .await .expect("Hard-coded query failed..?"); response .take(0) .expect("Returned websites couldn't be parsed") }