diff --git a/src/filesystem.rs b/src/filesystem.rs index 9432bfa..04e6ce1 100644 --- a/src/filesystem.rs +++ b/src/filesystem.rs @@ -34,10 +34,35 @@ pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf { url_path } +pub async fn check_file_length(file: &PathBuf) -> Option { + match tokio::fs::OpenOptions::new() + .write(false) + .read(true) + .create(false) + .open(file).await + { + Ok(file) => { + match file.metadata().await { + Ok(meta) => { + return Some(meta.len()) + }, + Err(err) => { + error!("Failed to get metadata. {}", err) + }, + } + }, + Err(err) => { + error!("Failed to open file for testing... {}", err); + }, + } + None + +} + pub async fn init(filename: &PathBuf) -> Option { let file = async || tokio::fs::OpenOptions::new() .write(true) - .append(true) + .append(false) .create(true) .open(&filename).await; diff --git a/src/main.rs b/src/main.rs index 517d765..5af7166 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,17 +5,27 @@ extern crate html5ever; use std::{ - collections::HashSet, fs::File, io::Read, sync::{Arc, LazyLock} + collections::HashSet, + fs::File, + io::Read, + sync::{Arc, LazyLock}, }; -use futures_util::StreamExt; -use opentelemetry::{global::{self}, metrics::{Counter, Meter, UpDownCounter}}; -use opentelemetry_otlp::{Protocol, WithExportConfig}; use db::{connect, Website}; +use futures_util::StreamExt; +use opentelemetry::{ + global::{self}, + metrics::{Counter, Meter, UpDownCounter}, +}; +use opentelemetry_otlp::{Protocol, WithExportConfig}; use opentelemetry_sdk::{metrics::SdkMeterProvider, trace::SdkTracerProvider}; use serde::Deserialize; use surrealdb::{engine::remote::ws::Client, Surreal}; -use tokio::{io::{AsyncWriteExt, BufWriter}, sync::RwLock, task::JoinSet}; +use tokio::{ + io::{AsyncWriteExt, BufWriter}, + sync::RwLock, + task::JoinSet, +}; use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, warn}; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry}; @@ -26,41 +36,29 @@ mod filesystem; mod parser; static METER: LazyLock = LazyLock::new(|| global::meter("Internet_Mapper")); -static BATCH_SIZE: LazyLock> = LazyLock::new(|| - METER - .u64_counter("crawler_batch_size") - .build() -); -static BEING_PROCESSED: LazyLock> = LazyLock::new(|| +static BATCH_SIZE: LazyLock> = + LazyLock::new(|| METER.u64_counter("crawler_batch_size").build()); +static BEING_PROCESSED: LazyLock> = LazyLock::new(|| { METER .i64_up_down_counter("crawler_pages_being_processed") .build() -); -static BEING_PARSED: LazyLock> = LazyLock::new(|| +}); +static BEING_PARSED: LazyLock> = LazyLock::new(|| { METER .i64_up_down_counter("crawler_pages_being_parsed") .build() -); -static BEING_STREAMED: LazyLock> = LazyLock::new(|| +}); +static BEING_STREAMED: LazyLock> = LazyLock::new(|| { METER .i64_up_down_counter("crawler_pages_being_streamed") .build() -); -static GET_IN_FLIGHT: LazyLock> = LazyLock::new(|| - METER - .i64_up_down_counter("crawler_gets_in_flight") - .build() -); -static TOTAL_BYTES_DOWN: LazyLock> = LazyLock::new(|| - METER - .u64_counter("crawler_total_bytes_down") - .build() -); -static SITES_CRAWLED: LazyLock> = LazyLock::new(|| - METER - .u64_counter("crawler_total_sites_crawled") - .build() -); +}); +static GET_IN_FLIGHT: LazyLock> = + LazyLock::new(|| METER.i64_up_down_counter("crawler_gets_in_flight").build()); +static TOTAL_BYTES_DOWN: LazyLock> = + LazyLock::new(|| METER.u64_counter("crawler_total_bytes_down").build()); +static SITES_CRAWLED: LazyLock> = + LazyLock::new(|| METER.u64_counter("crawler_total_sites_crawled").build()); static CONFIG: LazyLock = LazyLock::new(|| { let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml"); @@ -130,15 +128,25 @@ async fn main() { // let mut main_loop_span= TRACER.start("Main-Loop"); let mut futures = JoinSet::new(); for _ in 0..CONFIG.batch_size { - futures.spawn(process_single_thread(&CONFIG, db.clone(), reqwest.clone(), crawled.clone())); + futures.spawn(process_single_thread( + &CONFIG, + db.clone(), + reqwest.clone(), + crawled.clone(), + )); } while let Some(_) = futures.join_next().await { // Budget - Threads - This thread (1) // Would roughly be the acceptable amount at which a thread should exit - if *(crawled.read().await) < CONFIG.budget - CONFIG.batch_size -1 { + if *(crawled.read().await) < CONFIG.budget - CONFIG.batch_size - 1 { warn!("Thread terminated early, restarting"); - futures.spawn(process_single_thread(&CONFIG, db.clone(), reqwest.clone(), crawled.clone())); + futures.spawn(process_single_thread( + &CONFIG, + db.clone(), + reqwest.clone(), + crawled.clone(), + )); } } @@ -148,7 +156,12 @@ async fn main() { info!("Done"); } -async fn process_single_thread(config: &Config, db: Surreal, reqwest: reqwest::Client, crawled: Arc>) { +async fn process_single_thread( + config: &Config, + db: Surreal, + reqwest: reqwest::Client, + crawled: Arc>, +) { while *(crawled.read().await) < config.budget { let uncrawled = get_next(&db.clone(), &config).await; match uncrawled { @@ -158,11 +171,11 @@ async fn process_single_thread(config: &Config, db: Surreal, reqwest: re // Somehow this write doesn't hang on the while's read? let mut c = crawled.write().await; *c += 1; - }, + } None => { warn!("fn::get_next() returned None"); return; - }, + } } } } @@ -171,7 +184,6 @@ async fn process_single_thread(config: &Config, db: Surreal, reqwest: re /// Downloads and crawls and stores a webpage. /// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Client) { - // METRICS debug!(url = &site.site.as_str(), "Process: {}", &site.site); BEING_PROCESSED.add(1, &[]); @@ -179,10 +191,12 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien // Build the request let request_builder = reqwest.get(site.site.to_string()); - + // Send the http request (get) GET_IN_FLIGHT.add(1, &[]); if let Ok(response) = request_builder.send().await { + let mut skip_download = false; + GET_IN_FLIGHT.add(-1, &[]); let headers = response.headers(); @@ -195,104 +209,132 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien let CT = headers.get("Content-Type"); let ct = headers.get("content-type"); - let ct = match (CT,ct) { + let ct = match (CT, ct) { (None, None) => { - warn!("Server did not respond with Content-Type header. Url: {} Headers: ({:?})", site.site.to_string(), headers); - return - }, + warn!( + "Server did not respond with Content-Type header. Url: {} Headers: ({:?})", + site.site.to_string(), + headers + ); + return; + } (None, Some(a)) => a, (Some(a), None) => a, (Some(a), Some(_)) => a, }; // create filepath (handles / -> /index.html) - let path = filesystem::as_path(&site.site, ct); - let mut tmp_path= path.clone(); + let real_path = filesystem::as_path(&site.site, ct); + let mut tmp_path = real_path.clone(); if !(tmp_path.add_extension("crawl_temp")) { warn!("Failed to add extension to file"); // fallback ig tmp_path = tmp_path.with_extension("crawl_temp"); } - // make sure that the file is good to go - if let Some(file) = filesystem::init(&tmp_path).await { - // Get body from response - // stream the response onto the disk - let mut stream = response.bytes_stream(); - - let should_parse = path.to_string_lossy().ends_with(".html"); - let mut writer = BufWriter::new(file); - let mut buf: Vec = Vec::new(); - - // Write file to disk - trace!("Writing at: {:?}", tmp_path); - BEING_STREAMED.add(1, &[]); - // let mut stream_span = TRACER.start("Stream"); - while let Some(data) = stream.next().await { - match data { - Ok(data) => { - TOTAL_BYTES_DOWN.add(data.len() as u64, &[]); - let _ = writer.write_all(&data).await; - // If we are going to parse this file later, we will save it - // into memory as well as the disk. - // We do this because the data here might be incomplete - if should_parse { - data.iter().for_each(|f| buf.push(*f)); + // CODE FOR UPDATING DOWNLOADED CONTENT: + // Check the Content-Length header (we assume the server is telling the truth) (I don't see + // a reason for it to lie in this case). + // And see if the file on the disk is the same length. + // Yes, technically this isn't perfect, but the other option is storing ETags, which I + // don't want to do right now. + if let Some(len) = headers.get("Content-Length") { + if let Ok(s) = len.to_str() { + // length is in bytes + if let Ok(len) = s.parse::() { + if let Some(disk_len) = filesystem::check_file_length(&real_path).await { + if disk_len == len { + skip_download = true; } - }, - Err(err) => { - error!("{err}") - }, + } } } - let _ = writer.flush().await; - // rename the temp file into the real file name - if let Err(err) = tokio::fs::rename(&tmp_path, &path).await { - error!( - from = &*tmp_path.to_string_lossy(), - to = &*path.to_string_lossy(), - "Error renaming file: {}", - err - ); + } + + + if skip_download { + trace!("Skipping download..."); + } else { + // make sure that the file is good to go + if let Some(file) = filesystem::init(&tmp_path).await { + // Get body from response + // stream the response onto the disk + let mut stream = response.bytes_stream(); + + let should_parse = real_path.to_string_lossy().ends_with(".html"); + let mut writer = BufWriter::new(file); + let mut buf: Vec = Vec::new(); + + // Write file to disk + trace!("Writing at: {:?}", tmp_path); + BEING_STREAMED.add(1, &[]); + // let mut stream_span = TRACER.start("Stream"); + while let Some(data) = stream.next().await { + match data { + Ok(data) => { + TOTAL_BYTES_DOWN.add(data.len() as u64, &[]); + let _ = writer.write_all(&data).await; + // If we are going to parse this file later, we will save it + // into memory as well as the disk. + // We do this because the data here might be incomplete + if should_parse { + data.iter().for_each(|f| buf.push(*f)); + } + } + Err(err) => { + error!("{err}") + } + } + } + let _ = writer.flush().await; + // rename the temp file into the real file name + if let Err(err) = tokio::fs::rename(&tmp_path, &real_path).await { + error!( + from = &*tmp_path.to_string_lossy(), + to = &*real_path.to_string_lossy(), + "Error renaming file: {}", + err + ); + } + + // stream_span.end(); + BEING_STREAMED.add(-1, &[]); + + // (If needed) Parse the file + if should_parse { + BEING_PARSED.add(1, &[]); + // let mut parsing_span = TRACER.start("Parsing"); + + // Parse document and get relationships + let sites = parser::parse(&site, &buf).await; + // De-duplicate this list + let prev_len = sites.len(); + let set = sites.into_iter().fold(HashSet::new(), |mut set, item| { + set.insert(item); + set + }); + let de_dupe_sites: Vec = set.into_iter().collect(); + let diff = prev_len - de_dupe_sites.len(); + trace!("Saved {diff} from being entered into the db by de-duping"); + // Store all the other sites so that we can link to them. + let _ = Website::store_all(de_dupe_sites, &db).await; + + // parsing_span.end(); + BEING_PARSED.add(-1, &[]); + } else { + trace!(url = site.site.as_str(), "Parse = False"); + } + + // update self in db + site.crawled = true; + site.status_code = code.as_u16(); + Website::store_all(vec![site.clone()], &db).await; } - - // stream_span.end(); - BEING_STREAMED.add(-1, &[]); - - // (If needed) Parse the file - if should_parse { - BEING_PARSED.add(1, &[]); - // let mut parsing_span = TRACER.start("Parsing"); - - // Parse document and get relationships - let sites = parser::parse(&site, &buf).await; - // De-duplicate this list - let prev_len = sites.len(); - let set = sites.into_iter().fold(HashSet::new(), |mut set, item| { - set.insert(item); - set - }); - let de_dupe_sites: Vec = set.into_iter().collect(); - let diff = prev_len - de_dupe_sites.len(); - trace!("Saved {diff} from being entered into the db by de-duping"); - // Store all the other sites so that we can link to them. - let _ = Website::store_all(de_dupe_sites, &db).await; - - // parsing_span.end(); - BEING_PARSED.add(-1, &[]); - } else { - trace!(url = site.site.as_str(), "Parse = False"); - } - - // update self in db - site.crawled = true; - site.status_code = code.as_u16(); - Website::store_all(vec![site.clone()], &db).await; } } else { error!(url = site.site.as_str(), "Failed to get: {}", &site.site); } - + // process_span.end(); BEING_PROCESSED.add(-1, &[]); } @@ -311,11 +353,11 @@ fn load_tracing(config: &Config) -> SdkTracerProvider { } fn load_logging(config: &Config) { -// let otlp_log = opentelemetry_otlp::LogExporter::builder() -// .with_tonic() -// .with_endpoint(endpoint) -// .build() -// .unwrap(); + // let otlp_log = opentelemetry_otlp::LogExporter::builder() + // .with_tonic() + // .with_endpoint(endpoint) + // .build() + // .unwrap(); // let logger_provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder() // .with_simple_exporter(otlp_log) // .build(); @@ -336,7 +378,7 @@ fn load_logging(config: &Config) { .with_file(true) .json() .with_writer(writer) - .with_filter(filter) + .with_filter(filter), ); tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber"); @@ -355,5 +397,3 @@ fn load_metrics(config: &Config) -> SdkMeterProvider { .build(); metrics_provider } - -