From 9aa2d9ce22ce0887725e735e73e8bc3e99cba5a6 Mon Sep 17 00:00:00 2001 From: Rushmore75 Date: Tue, 15 Apr 2025 13:06:53 -0600 Subject: [PATCH 1/3] code settings --- .vscode/launch.json | 19 ++++++++----------- .vscode/settings.json | 2 +- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 43deeb5..cfb5506 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -7,18 +7,15 @@ { "type": "lldb", "request": "launch", - "name": "Debug executable 'surreal_spider'", - "env": { - "RUST_LOG": "surreal_spider=trace,reqwest=info", - }, + "name": "Debug executable 'internet_mapper'", "cargo": { "args": [ "build", - "--bin=surreal_spider", - "--package=surreal_spider" + "--bin=internet_mapper", + "--package=internet_mapper" ], "filter": { - "name": "surreal_spider", + "name": "internet_mapper", "kind": "bin" } }, @@ -28,16 +25,16 @@ { "type": "lldb", "request": "launch", - "name": "Debug unit tests in executable 'surreal_spider'", + "name": "Debug unit tests in executable 'internet_mapper'", "cargo": { "args": [ "test", "--no-run", - "--bin=surreal_spider", - "--package=surreal_spider" + "--bin=internet_mapper", + "--package=internet_mapper" ], "filter": { - "name": "surreal_spider", + "name": "internet_mapper", "kind": "bin" } }, diff --git a/.vscode/settings.json b/.vscode/settings.json index 5196957..df2b276 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,6 +3,6 @@ "creds", "reqwest", "rustls", - "surql" + "surql", ] } \ No newline at end of file -- 2.47.2 From bdb1094a307663685fc03900445a1b7bb9993af7 Mon Sep 17 00:00:00 2001 From: Rushmore75 Date: Tue, 15 Apr 2025 13:07:47 -0600 Subject: [PATCH 2/3] steam data to the disk --- Cargo.lock | 1 + Cargo.toml | 3 +- src/filesystem.rs | 52 +++++++++++++------- src/main.rs | 122 +++++++++++++++++++++++++++------------------- src/parser.rs | 49 +++++++++++-------- 5 files changed, 138 insertions(+), 89 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c55ee61..a60854e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1966,6 +1966,7 @@ name = "internet_mapper" version = "0.1.0" dependencies = [ "base64 0.22.1", + "futures-util", "html5ever 0.29.1", "metrics", "metrics-exporter-prometheus", diff --git a/Cargo.toml b/Cargo.toml index fa499e4..a2ba16f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,12 +5,13 @@ edition = "2021" [dependencies] base64 = "0.22.1" +futures-util = "0.3.31" html5ever = "0.29" metrics = "0.24.1" metrics-exporter-prometheus = { version = "0.16.2", features=["http-listener"]} # minio = "0.1.0" minio = {git="https://github.com/minio/minio-rs.git", rev = "c28f576"} -reqwest = { version = "0.12", features = ["gzip", "default", "rustls-tls"] } +reqwest = { version = "0.12", features = ["gzip", "default", "rustls-tls", "stream"] } serde = { version = "1.0", features = ["derive"] } surrealdb = "2.2" tokio = { version="1.41.0", features = ["full"] } diff --git a/src/filesystem.rs b/src/filesystem.rs index 81ad057..20d6493 100644 --- a/src/filesystem.rs +++ b/src/filesystem.rs @@ -1,14 +1,10 @@ -use std::{ffi::OsStr, path::PathBuf}; +use std::{ffi::OsStr, io::ErrorKind, path::PathBuf}; use tokio::fs; -use tracing::{debug, error, instrument, trace, warn}; +use tracing::{error, trace}; use url::Url; -#[instrument(skip(data))] -/// Returns whether or not the saved file should be parsed. -/// If the file is just data, like an image, it doesn't need to be parsed. -/// If it's html, then it does need to be parsed. -pub async fn store(data: &str, url: &Url) -> bool { +pub fn as_path(url: &Url) -> PathBuf { // extract data from url to save it accurately let url_path = PathBuf::from("./downloaded/".to_string() + url.domain().unwrap_or("UnknownDomain") + url.path()); @@ -24,20 +20,42 @@ pub async fn store(data: &str, url: &Url) -> bool { (url_path.clone(), "index.html".into()) }; - let should_parse = filename.ends_with(".html"); + let mut path = PathBuf::new(); + path = path.join(basepath); + path = path.join(filename); - debug!("Writing at: {:?} {:?}", basepath, filename); + path +} - // create the folders - if let Err(err) = fs::create_dir_all(&basepath).await { - error!("Dir creation: {err} {:?}", basepath); - } else { - if let Err(err) = fs::write(&basepath.join(filename), data).await { - error!("File creation: {err} {:?}", url_path); +pub async fn init(filename: &PathBuf) -> Option { + let file = async || tokio::fs::OpenOptions::new() + .append(true) + .create(true) + .open(&filename).await; + + match file().await { + Ok(ok) => Some(ok), + Err(err) => { + // the file/folder isn't found + if err.kind() == ErrorKind::NotFound { + if let Some(parent ) = &filename.parent() { + // create the folders + if let Err(err) = fs::create_dir_all(&parent).await { + error!("Dir creation: {err} {:?}", filename); + eprintln!("{}", err) + } else if let Ok(ok) = file().await { + return Some(ok); + } + } else { + error!("Couldn't get file's parents: {:?}", &filename); + } + } else { + error!("File creation: {err} {:?}", filename); + } + // we don't care about other errors, we can't/shouldn't fix them + None } } - - should_parse } fn valid_file_extension(take: &&OsStr) -> bool { diff --git a/src/main.rs b/src/main.rs index 36ecb59..bdfc55c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,17 @@ #![feature(ip_from)] +#![feature(async_closure)] +#![warn(clippy::expect_used)] +#![deny(clippy::unwrap_used)] extern crate html5ever; +use futures_util::StreamExt; + use std::{ - collections::HashSet, fs::File, io::Read, net::{IpAddr, Ipv4Addr} + collections::HashSet, + fs::File, + io::Read, + net::{IpAddr, Ipv4Addr}, }; use db::{connect, Website}; @@ -11,21 +19,19 @@ use metrics::{counter, gauge}; use metrics_exporter_prometheus::PrometheusBuilder; use serde::Deserialize; use surrealdb::{engine::remote::ws::Client, Surreal}; -use tokio::task::JoinSet; +use tokio::{io::AsyncWriteExt, task::JoinSet}; use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, trace_span}; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry}; mod db; -mod parser; mod filesystem; +mod parser; const GET_METRIC: &str = "total_gets"; const GET_IN_FLIGHT: &str = "gets_in_flight"; const SITES_CRAWLED: &str = "pages_crawled"; const BEING_PROCESSED: &str = "pages_being_processed"; -const BATCH_SIZE: usize = 2; - #[derive(Deserialize)] struct Config { surreal_ns: String, @@ -37,6 +43,7 @@ struct Config { crawl_filter: String, start_url: String, budget: usize, + batch_size: usize, } #[tokio::main] @@ -111,7 +118,8 @@ async fn main() { let span = trace_span!("Loop"); let span = span.enter(); while crawled < config.budget { - let uncrawled = get_uncrawled_links(&db, config.budget - crawled, config.crawl_filter.clone()).await; + let uncrawled = + get_uncrawled_links(&db, config.budget - crawled, config.crawl_filter.clone(), &config).await; if uncrawled.is_empty() { info!("Had more budget but finished crawling everything."); return; @@ -137,12 +145,13 @@ async fn main() { } drop(span); - if let Ok(mut ok) = db.query("count(select id from website where crawled = true)").await { + if let Ok(mut ok) = db + .query("count(select id from website where crawled = true)") + .await + { let res = ok.take::>(0); - if let Ok(i) = res { - if let Some(n) = i { - info!("Total crawled pages now equals {n}"); - } + if let Ok(Some(n)) = res { + info!("Total crawled pages now equals {n}"); } } @@ -153,7 +162,6 @@ async fn main() { /// Downloads and crawls and stores a webpage. /// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Client) { - // METRICS trace!("Process: {}", &site.site); // Build the request @@ -165,60 +173,75 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien // Send the http request (get) if let Ok(response) = request_builder.send().await { - - // TODO if this will fail if the object we are downloading is - // larger than the memory of the device it's running on. - // We should store it *as* we download it then parse it in-place. // Get body from response - let data = response - .text() - .await - .expect("Failed to read http response's body!"); - // METRICS - g.decrement(1); - counter!(GET_METRIC).increment(1); + let path = filesystem::as_path(&site.site); - // Store document - let should_parse = filesystem::store(&data, &site.site).await; + // make sure that the file is good to go + if let Some(mut file) = filesystem::init(&path).await { + let should_parse = path.to_string_lossy().ends_with(".html"); + let mut buf: Vec = Vec::new(); - if should_parse { - // Parse document and get relationships - let sites = parser::parse(&site, &data).await; - - // De-duplicate this list - let prev_len = sites.len(); - let set = sites.into_iter().fold(HashSet::new(), |mut set,item| { - set.insert(item); - set - }); - let de_dupe_sites: Vec = set.into_iter().collect(); - let diff = prev_len - de_dupe_sites.len(); - trace!("Saved {diff} from being entered into the db by de-duping"); + // stream the response onto the disk + let mut stream = response.bytes_stream(); + while let Some(data) = stream.next().await { + match data { + Ok(data) => { + debug!("Writing at: {:?}", path); + let _ = file.write_all(&data).await; + // If we are going to parse this file later, we will save it + // into memory as well as the disk. + if should_parse { + data.iter().for_each(|f| buf.push(*f)); + } + }, + Err(err) => { + eprintln!("{}", err) + }, + } + } - // Store all the other sites so that we can link to them. - let _ = Website::store_all(de_dupe_sites, &db).await; + if should_parse { + // Parse document and get relationships + let sites = parser::parse(&site, &buf).await; + // De-duplicate this list + let prev_len = sites.len(); + let set = sites.into_iter().fold(HashSet::new(), |mut set, item| { + set.insert(item); + set + }); + let de_dupe_sites: Vec = set.into_iter().collect(); + let diff = prev_len - de_dupe_sites.len(); + trace!("Saved {diff} from being entered into the db by de-duping"); + // Store all the other sites so that we can link to them. + let _ = Website::store_all(de_dupe_sites, &db).await; + } + + // METRICS + g.decrement(1); + counter!(GET_METRIC).increment(1); + + // update self in db + site.set_crawled(); + Website::store_all(vec![site], &db).await; + } else { + error!("File failed to cooperate: {:?}", path); } - - // update self in db - site.set_crawled(); - Website::store_all(vec![site], &db).await; - } else { error!("Failed to get: {}", &site.site); } } /// Returns uncrawled links -#[instrument(skip(db))] +#[instrument(skip(db, config))] async fn get_uncrawled_links( db: &Surreal, mut count: usize, filter: String, + config: &Config, ) -> Vec { - - if count > BATCH_SIZE { - count = BATCH_SIZE; + if count > config.batch_size { + count = config.batch_size; } debug!("Getting uncrawled links"); @@ -233,4 +256,3 @@ async fn get_uncrawled_links( .take(0) .expect("Returned websites couldn't be parsed") } - diff --git a/src/parser.rs b/src/parser.rs index c1e87e2..61469d5 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -1,5 +1,4 @@ use std::default::Default; -use std::str::FromStr; use html5ever::tokenizer::{BufferQueue, TokenizerResult}; use html5ever::tokenizer::{StartTag, TagToken}; @@ -63,29 +62,34 @@ impl TokenSink for Website { #[instrument(skip_all)] /// Parses the passed site and returns all the sites it links to. -pub async fn parse(site: &Website, data: &str) -> Vec { +pub async fn parse(site: &Website, data: &[u8]) -> Vec { + debug!("Parsing {}", site.site.to_string()); // prep work let mut other_sites: Vec = Vec::new(); // change data into something that can be tokenized - let chunk = Tendril::from_str(data).expect("Failed to parse string into Tendril!"); - // create buffer of tokens and push our input into it - let token_buffer = BufferQueue::default(); - token_buffer.push_back( - chunk - .try_reinterpret::() - .expect("Failed to reinterpret chunk!"), - ); - // create the tokenizer - let tokenizer = Tokenizer::new(site.clone(), TokenizerOpts::default()); + let s: Result, ()> = Tendril::try_from_byte_slice(data); + if let Ok(chunk) = s { + // create buffer of tokens and push our input into it + let token_buffer = BufferQueue::default(); + token_buffer.push_back( + chunk + .try_reinterpret::() + .expect("Failed to reinterpret chunk!"), + ); + // create the tokenizer + let tokenizer = Tokenizer::new(site.clone(), TokenizerOpts::default()); - // go thru buffer - while let TokenizerResult::Script(mut sites) = tokenizer.feed(&token_buffer) { - other_sites.append(&mut sites); - // other_sites.push(sites); + // go thru buffer + while let TokenizerResult::Script(mut sites) = tokenizer.feed(&token_buffer) { + other_sites.append(&mut sites); + // other_sites.push(sites); + } + assert!(token_buffer.is_empty()); + tokenizer.end(); + } else { + warn!("Tendril failed to parse on: {}", site.site.to_string()); } - assert!(token_buffer.is_empty()); - tokenizer.end(); other_sites } @@ -107,7 +111,7 @@ fn try_get_url(parent: &Url, link: &str) -> Option { match Url::parse(&format!("{scheme}://{}", link)) { Ok(url) => Some(url), Err(err) => { - error!("Failed parsing realative scheme url: {}", err); + error!("Failed parsing relative scheme url: {}", err); None } } @@ -117,10 +121,13 @@ fn try_get_url(parent: &Url, link: &str) -> Option { match e { url::ParseError::RelativeUrlWithoutBase => { // Is: scheme://host:port - let origin = parent.origin().ascii_serialization(); + let mut origin = parent.origin().ascii_serialization(); + if !origin.ends_with('/') && !link.starts_with('/') { + origin += "/"; + } let url = origin.clone() + link; - trace!("Built `{url}` from `{origin} + {}`", link.to_string()); + trace!("Built `{url}` from `{origin} + `{}`", link.to_string()); if let Ok(url) = Url::parse(&url) { trace!("Saved relative url `{}` AS: `{}`", link, url); -- 2.47.2 From 9bfa8f91084ae31454c0f231d9ea243169d1b153 Mon Sep 17 00:00:00 2001 From: Rushmore75 Date: Tue, 15 Apr 2025 13:38:28 -0600 Subject: [PATCH 3/3] batch_size --- Crawler.toml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/Crawler.toml b/Crawler.toml index 0048238..e57d08b 100644 --- a/Crawler.toml +++ b/Crawler.toml @@ -3,9 +3,12 @@ surreal_url = "localhost:8000" surreal_username = "root" surreal_password = "root" surreal_ns = "test" -surreal_db = "v1.19.5" +surreal_db = "v1.20.3" # Crawler config -crawl_filter = "en.wikipedia.org" -start_url = "https://en.wikipedia.org" +crawl_filter = "https://ftpgeoinfo.msl.mt.gov/Data/Spatial/MSDI/Imagery/2023_NAIP/UTM_County_Mosaics/" +# crawl_filter = "https://oliveratkinson.net" +start_url = "https://ftpgeoinfo.msl.mt.gov/Data/Spatial/MSDI/Imagery/2023_NAIP/UTM_County_Mosaics/" +# start_url = "https://oliveratkinson.net" budget = 100 +batch_size = 5 -- 2.47.2