diff --git a/src/filesystem.rs b/src/filesystem.rs index 04e6ce1..3c57671 100644 --- a/src/filesystem.rs +++ b/src/filesystem.rs @@ -52,7 +52,7 @@ pub async fn check_file_length(file: &PathBuf) -> Option { } }, Err(err) => { - error!("Failed to open file for testing... {}", err); + error!("Failed to open file for testing... {:?} {}", file, err); }, } None diff --git a/src/main.rs b/src/main.rs index 848dd69..d5211e6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,7 +22,7 @@ use opentelemetry_sdk::{metrics::SdkMeterProvider, trace::SdkTracerProvider}; use serde::Deserialize; use surrealdb::{engine::remote::ws::Client, Surreal}; use tokio::{ - io::{AsyncWriteExt, BufWriter}, + io::{AsyncReadExt, AsyncWriteExt, BufWriter}, sync::RwLock, task::JoinSet, }; @@ -183,7 +183,7 @@ async fn process_single_thread( #[instrument(skip(db, reqwest))] /// Downloads and crawls and stores a webpage. /// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver -async fn process(site: Website, db: Surreal, reqwest: reqwest::Client) { +async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Client) { // METRICS debug!(url = &site.site.as_str(), "Process: {}", &site.site); BEING_PROCESSED.add(1, &[]); @@ -251,26 +251,32 @@ async fn process(site: Website, db: Surreal, reqwest: reqwest::Client) { } } - let update_in_db = async |mut site: Website| { - // update self in db - site.crawled = true; - site.status_code = code.as_u16(); - Website::store_all(vec![site.clone()], &db).await; - }; + // make sure that the file is good to go + if let Some(file) = filesystem::init(&tmp_path).await { + // Get body from response + // stream the response onto the disk + let mut stream = response.bytes_stream(); - if skip_download { - trace!("Skipping download..."); - update_in_db(site).await; - } else { - // make sure that the file is good to go - if let Some(file) = filesystem::init(&tmp_path).await { - // Get body from response - // stream the response onto the disk - let mut stream = response.bytes_stream(); + let should_parse = real_path.to_string_lossy().ends_with(".html"); - let should_parse = real_path.to_string_lossy().ends_with(".html"); + let mut buf: Vec = Vec::new(); + + if skip_download && should_parse { + // since we are skipping the download we will just read the file off the disk to + // parse it + if let Ok(mut file) = tokio::fs::OpenOptions::new() + .read(true) + .open(&real_path).await + { + if let Err(err) = file.read_to_end(&mut buf).await { + warn!("Failed to read file off disk for parsing, {}", err); + } + } + } + + // !!!DOWNLOADING TIME!!! + if !skip_download { let mut writer = BufWriter::new(file); - let mut buf: Vec = Vec::new(); // Write file to disk trace!("Writing at: {:?}", tmp_path); @@ -306,37 +312,38 @@ async fn process(site: Website, db: Surreal, reqwest: reqwest::Client) { // stream_span.end(); BEING_STREAMED.add(-1, &[]); - - // (If needed) Parse the file - if should_parse { - BEING_PARSED.add(1, &[]); - // let mut parsing_span = TRACER.start("Parsing"); - - // Parse document and get relationships - let sites = parser::parse(&site, &buf).await; - // De-duplicate this list - let prev_len = sites.len(); - let set = sites.into_iter().fold(HashSet::new(), |mut set, item| { - set.insert(item); - set - }); - let de_dupe_sites: Vec = set.into_iter().collect(); - let diff = prev_len - de_dupe_sites.len(); - trace!("Saved {diff} from being entered into the db by de-duping"); - // Store all the other sites so that we can link to them. - let _ = Website::store_all(de_dupe_sites, &db).await; - - // parsing_span.end(); - BEING_PARSED.add(-1, &[]); - } else { - trace!(url = site.site.as_str(), "Parse = False"); - } - - // update self in db - update_in_db(site).await; } - } + // (If needed) Parse the file + if should_parse { + BEING_PARSED.add(1, &[]); + // let mut parsing_span = TRACER.start("Parsing"); + + // Parse document and get relationships + let sites = parser::parse(&site, &buf).await; + // De-duplicate this list + let prev_len = sites.len(); + let set = sites.into_iter().fold(HashSet::new(), |mut set, item| { + set.insert(item); + set + }); + let de_dupe_sites: Vec = set.into_iter().collect(); + let diff = prev_len - de_dupe_sites.len(); + trace!("Saved {diff} from being entered into the db by de-duping"); + // Store all the other sites so that we can link to them. + let _ = Website::store_all(de_dupe_sites, &db).await; + + // parsing_span.end(); + BEING_PARSED.add(-1, &[]); + } else { + trace!(url = site.site.as_str(), "Parse = False"); + } + + // update self in db + site.crawled = true; + site.status_code = code.as_u16(); + Website::store_all(vec![site.clone()], &db).await; + } } else { error!(url = site.site.as_str(), "Failed to get: {}", &site.site); }