fix the issue where nothing works
This commit is contained in:
@@ -52,7 +52,7 @@ pub async fn check_file_length(file: &PathBuf) -> Option<u64> {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Failed to open file for testing... {}", err);
|
error!("Failed to open file for testing... {:?} {}", file, err);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
|
103
src/main.rs
103
src/main.rs
@@ -22,7 +22,7 @@ use opentelemetry_sdk::{metrics::SdkMeterProvider, trace::SdkTracerProvider};
|
|||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use surrealdb::{engine::remote::ws::Client, Surreal};
|
use surrealdb::{engine::remote::ws::Client, Surreal};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::{AsyncWriteExt, BufWriter},
|
io::{AsyncReadExt, AsyncWriteExt, BufWriter},
|
||||||
sync::RwLock,
|
sync::RwLock,
|
||||||
task::JoinSet,
|
task::JoinSet,
|
||||||
};
|
};
|
||||||
@@ -183,7 +183,7 @@ async fn process_single_thread(
|
|||||||
#[instrument(skip(db, reqwest))]
|
#[instrument(skip(db, reqwest))]
|
||||||
/// Downloads and crawls and stores a webpage.
|
/// Downloads and crawls and stores a webpage.
|
||||||
/// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver
|
/// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver
|
||||||
async fn process(site: Website, db: Surreal<Client>, reqwest: reqwest::Client) {
|
async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client) {
|
||||||
// METRICS
|
// METRICS
|
||||||
debug!(url = &site.site.as_str(), "Process: {}", &site.site);
|
debug!(url = &site.site.as_str(), "Process: {}", &site.site);
|
||||||
BEING_PROCESSED.add(1, &[]);
|
BEING_PROCESSED.add(1, &[]);
|
||||||
@@ -251,26 +251,32 @@ async fn process(site: Website, db: Surreal<Client>, reqwest: reqwest::Client) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let update_in_db = async |mut site: Website| {
|
// make sure that the file is good to go
|
||||||
// update self in db
|
if let Some(file) = filesystem::init(&tmp_path).await {
|
||||||
site.crawled = true;
|
// Get body from response
|
||||||
site.status_code = code.as_u16();
|
// stream the response onto the disk
|
||||||
Website::store_all(vec![site.clone()], &db).await;
|
let mut stream = response.bytes_stream();
|
||||||
};
|
|
||||||
|
|
||||||
if skip_download {
|
let should_parse = real_path.to_string_lossy().ends_with(".html");
|
||||||
trace!("Skipping download...");
|
|
||||||
update_in_db(site).await;
|
|
||||||
} else {
|
|
||||||
// make sure that the file is good to go
|
|
||||||
if let Some(file) = filesystem::init(&tmp_path).await {
|
|
||||||
// Get body from response
|
|
||||||
// stream the response onto the disk
|
|
||||||
let mut stream = response.bytes_stream();
|
|
||||||
|
|
||||||
let should_parse = real_path.to_string_lossy().ends_with(".html");
|
let mut buf: Vec<u8> = Vec::new();
|
||||||
|
|
||||||
|
if skip_download && should_parse {
|
||||||
|
// since we are skipping the download we will just read the file off the disk to
|
||||||
|
// parse it
|
||||||
|
if let Ok(mut file) = tokio::fs::OpenOptions::new()
|
||||||
|
.read(true)
|
||||||
|
.open(&real_path).await
|
||||||
|
{
|
||||||
|
if let Err(err) = file.read_to_end(&mut buf).await {
|
||||||
|
warn!("Failed to read file off disk for parsing, {}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// !!!DOWNLOADING TIME!!!
|
||||||
|
if !skip_download {
|
||||||
let mut writer = BufWriter::new(file);
|
let mut writer = BufWriter::new(file);
|
||||||
let mut buf: Vec<u8> = Vec::new();
|
|
||||||
|
|
||||||
// Write file to disk
|
// Write file to disk
|
||||||
trace!("Writing at: {:?}", tmp_path);
|
trace!("Writing at: {:?}", tmp_path);
|
||||||
@@ -306,37 +312,38 @@ async fn process(site: Website, db: Surreal<Client>, reqwest: reqwest::Client) {
|
|||||||
|
|
||||||
// stream_span.end();
|
// stream_span.end();
|
||||||
BEING_STREAMED.add(-1, &[]);
|
BEING_STREAMED.add(-1, &[]);
|
||||||
|
|
||||||
// (If needed) Parse the file
|
|
||||||
if should_parse {
|
|
||||||
BEING_PARSED.add(1, &[]);
|
|
||||||
// let mut parsing_span = TRACER.start("Parsing");
|
|
||||||
|
|
||||||
// Parse document and get relationships
|
|
||||||
let sites = parser::parse(&site, &buf).await;
|
|
||||||
// De-duplicate this list
|
|
||||||
let prev_len = sites.len();
|
|
||||||
let set = sites.into_iter().fold(HashSet::new(), |mut set, item| {
|
|
||||||
set.insert(item);
|
|
||||||
set
|
|
||||||
});
|
|
||||||
let de_dupe_sites: Vec<Website> = set.into_iter().collect();
|
|
||||||
let diff = prev_len - de_dupe_sites.len();
|
|
||||||
trace!("Saved {diff} from being entered into the db by de-duping");
|
|
||||||
// Store all the other sites so that we can link to them.
|
|
||||||
let _ = Website::store_all(de_dupe_sites, &db).await;
|
|
||||||
|
|
||||||
// parsing_span.end();
|
|
||||||
BEING_PARSED.add(-1, &[]);
|
|
||||||
} else {
|
|
||||||
trace!(url = site.site.as_str(), "Parse = False");
|
|
||||||
}
|
|
||||||
|
|
||||||
// update self in db
|
|
||||||
update_in_db(site).await;
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
// (If needed) Parse the file
|
||||||
|
if should_parse {
|
||||||
|
BEING_PARSED.add(1, &[]);
|
||||||
|
// let mut parsing_span = TRACER.start("Parsing");
|
||||||
|
|
||||||
|
// Parse document and get relationships
|
||||||
|
let sites = parser::parse(&site, &buf).await;
|
||||||
|
// De-duplicate this list
|
||||||
|
let prev_len = sites.len();
|
||||||
|
let set = sites.into_iter().fold(HashSet::new(), |mut set, item| {
|
||||||
|
set.insert(item);
|
||||||
|
set
|
||||||
|
});
|
||||||
|
let de_dupe_sites: Vec<Website> = set.into_iter().collect();
|
||||||
|
let diff = prev_len - de_dupe_sites.len();
|
||||||
|
trace!("Saved {diff} from being entered into the db by de-duping");
|
||||||
|
// Store all the other sites so that we can link to them.
|
||||||
|
let _ = Website::store_all(de_dupe_sites, &db).await;
|
||||||
|
|
||||||
|
// parsing_span.end();
|
||||||
|
BEING_PARSED.add(-1, &[]);
|
||||||
|
} else {
|
||||||
|
trace!(url = site.site.as_str(), "Parse = False");
|
||||||
|
}
|
||||||
|
|
||||||
|
// update self in db
|
||||||
|
site.crawled = true;
|
||||||
|
site.status_code = code.as_u16();
|
||||||
|
Website::store_all(vec![site.clone()], &db).await;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
error!(url = site.site.as_str(), "Failed to get: {}", &site.site);
|
error!(url = site.site.as_str(), "Failed to get: {}", &site.site);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user