diff --git a/.gitignore b/.gitignore index dbaa546..87fb73a 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ perf.data flamegraph.svg perf.data.old -/docker/logs/* \ No newline at end of file +/docker/logs/* +/downloaded \ No newline at end of file diff --git a/Crawler.toml b/Crawler.toml index d536db5..7072e33 100644 --- a/Crawler.toml +++ b/Crawler.toml @@ -3,13 +3,7 @@ surreal_url = "localhost:8000" surreal_username = "root" surreal_password = "root" surreal_ns = "test" -surreal_db = "v1.17" - -# Minio config -s3_bucket = "v1.17" -s3_url = "http://localhost:9000" -s3_access_key = "Ok6s9uQEvKrqRoGZdacm" -s3_secret_key = "qubeSkP787c7QZu4TvtnuwPTGIAq6ETPupCxvv6K" +surreal_db = "v1.18.1" # Crawler config crawl_filter = "en.wikipedia.com" diff --git a/docker/prometheus.yaml b/docker/prometheus.yaml index cb43a89..9133e15 100644 --- a/docker/prometheus.yaml +++ b/docker/prometheus.yaml @@ -7,7 +7,8 @@ scrape_configs: static_configs: # change this your machine's ip, localhost won't work # because localhost refers to the docker container. - - targets: ['172.20.239.48:2500'] + # - targets: ['172.20.239.48:2500'] + - targets: ['192.168.8.209:2500'] - job_name: loki static_configs: - targets: ['loki:3100'] diff --git a/src/db.rs b/src/db.rs index deceb27..03a1850 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,16 +1,11 @@ -use base64::{ - alphabet, - engine::{self, general_purpose}, - Engine, -}; use metrics::counter; -use serde::{ser::SerializeStruct, Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; use std::{fmt::Debug, sync::LazyLock, time::Instant}; use surrealdb::{ engine::remote::ws::{Client, Ws}, opt::auth::Root, sql::Thing, - Response, Surreal, + Surreal, }; use tokio::sync::Mutex; use tracing::{error, instrument, trace, warn}; @@ -18,17 +13,12 @@ use url::Url; use crate::Config; -// static LOCK: LazyLock>> = LazyLock::new(|| Arc::new(Mutex::new(true))); static LOCK: LazyLock> = LazyLock::new(|| Mutex::new(true)); -const CUSTOM_ENGINE: engine::GeneralPurpose = - engine::GeneralPurpose::new(&alphabet::URL_SAFE, general_purpose::NO_PAD); - const TIME_SPENT_ON_LOCK: &str = "surql_lock_waiting_ms"; const STORE: &str = "surql_store_calls"; -const LINK: &str = "surql_link_calls"; -#[derive(Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash)] pub struct Website { /// The url that this data is found at pub site: Url, @@ -36,18 +26,6 @@ pub struct Website { pub crawled: bool, } -impl Serialize for Website { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer { - let mut state = serializer.serialize_struct("Website", 2)?; - state.serialize_field("crawled", &self.crawled)?; - // to_string() calls the correct naming of site - state.serialize_field("site", &self.site.to_string())?; - state.end() - } -} - // manual impl to make tracing look nicer impl Debug for Website { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -73,68 +51,6 @@ impl Website { self.crawled = true } - pub fn get_url_as_string(site: &Url) -> String { - let domain = match site.domain() { - Some(s) => s.to_string(), - None => { - warn!("Failed to get domain of URL: {}, falling back to 'localhost'", site.to_string()); - "localhost".to_string() - } - }; - let path = site.path(); - - domain + path - } - - pub fn get_url_as_b64_path(site: &Url) -> String { - let domain = site.domain().unwrap_or("DOMAIN").to_string(); - let path = &CUSTOM_ENGINE.encode(site.path()); - - domain + path - } - - #[instrument(skip_all)] - pub async fn links_to(&self, other: Vec, db: &Surreal) { - let len = other.len(); - if len == 0 { - return; - } - - let from = &self.site; - - // let to = other.site.to_string(); - trace!("Linking {} pages to {from}", other.len()); - counter!(LINK).increment(1); - match db - .query("COUNT(RELATE (SELECT id FROM website WHERE site = $in) -> links_to -> $out)") - .bind(("in", from.clone())) - .bind(("out", other)) - .await - { - Ok(mut e) => { - // The relate could technically "fail" (not relate anything), this just means that - // the query was ok. - let _: Response = e; - if let Ok(vec) = e.take(0) { - let _: Vec = vec; - if let Some(num) = vec.first() { - if *num == len { - trace!("Link for {from} OK - {num}/{len}"); - return; - } else { - error!("Didn't link all the records. {num}/{len}. Surreal response: {:?}", e); - return; - } - } - } - error!("Linking request succeeded but couldn't verify the results."); - } - Err(e) => { - error!("{}", e.to_string()); - } - } - } - // Insert ever item in the vec into surreal, crawled state will be preserved as TRUE // if already in the database as such or incoming data is TRUE. pub async fn store_all(all: Vec, db: &Surreal) -> Vec { diff --git a/src/filesystem.rs b/src/filesystem.rs new file mode 100644 index 0000000..9a3f761 --- /dev/null +++ b/src/filesystem.rs @@ -0,0 +1,24 @@ +use std::path::PathBuf; + +use tokio::fs; +use tracing::{error, instrument, trace}; +use url::Url; + +#[instrument(skip(data))] +pub async fn store(data: &str, url: &Url) { + let path = PathBuf::from("./downloaded".to_string() + url.path()); + let basepath = path.ancestors().skip(1).take(1).collect::(); + trace!("Built path: {:?} and base path: {:?}", &path, &basepath); + if let Err(err) = fs::create_dir_all(&basepath).await { + let ex = path.ancestors().fold(String::new(), |mut s, item| { + s += ", "; + s += &item.to_string_lossy().to_string(); + s + }); + error!("Dir creation: {err} {:?} {ex}", basepath); + } else { + if let Err(err) = fs::write(&path, data).await { + error!("File creation: {err} {:?}", path); + } + } +} diff --git a/src/main.rs b/src/main.rs index fcdd10b..c5c96cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,24 +3,21 @@ extern crate html5ever; use std::{ - fs::File, - io::Read, - net::{IpAddr, Ipv4Addr}, + collections::HashSet, fs::File, io::Read, net::{IpAddr, Ipv4Addr}, str::FromStr, time }; use db::{connect, Website}; use metrics::{counter, gauge}; use metrics_exporter_prometheus::PrometheusBuilder; -use s3::S3; use serde::Deserialize; use surrealdb::{engine::remote::ws::Client, Surreal}; use tokio::task::JoinSet; -use tracing::{debug, error, info, instrument, trace, trace_span}; +use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, trace_span}; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry}; mod db; mod parser; -mod s3; +mod filesystem; const GET_METRIC: &str = "total_gets"; const GET_IN_FLIGHT: &str = "gets_in_flight"; @@ -35,11 +32,6 @@ struct Config { surreal_username: String, surreal_password: String, - s3_url: String, - s3_bucket: String, - s3_access_key: String, - s3_secret_key: String, - crawl_filter: String, budget: usize, } @@ -52,7 +44,9 @@ async fn main() { .open("./docker/logs/tracing.log") .expect("Couldn't make log file!"); - let filter = EnvFilter::from_default_env(); + let filter = EnvFilter::builder() + .with_default_directive(LevelFilter::DEBUG.into()) + .from_env_lossy(); let registry = Registry::default().with( fmt::layer() @@ -75,7 +69,7 @@ async fn main() { .install() .expect("failed to install recorder/exporter"); - debug!("Starting..."); + info!("Starting..."); // Would probably take these in as parameters from a cli let starting_url = "https://en.wikipedia.org/"; // When getting uncrawled pages, name must contain this variable. "" will effectively get ignored. @@ -92,9 +86,6 @@ async fn main() { let db = connect(&config) .await .expect("Failed to connect to surreal, aborting."); - let s3 = S3::connect(&config) - .await - .expect("Failed to connect to minio, aborting.\n\nThis probably means you need to login to the minio console and get a new access key!\n\n(Probably here) http://localhost:9001/access-keys/new-account\n\n"); let reqwest = reqwest::Client::builder() // .use_rustls_tls() @@ -108,7 +99,7 @@ async fn main() { let pre_loop_span = span.enter(); // Download the site let site = Website::new(starting_url, false); - process(site, db.clone(), reqwest.clone(), s3.clone()).await; + process(site, db.clone(), reqwest.clone()).await; drop(pre_loop_span); @@ -126,20 +117,15 @@ async fn main() { info!("Had more budget but finished crawling everything."); return; } - debug!("Crawling {} pages...", uncrawled.len()); - - let span = trace_span!("Crawling"); - let _ = span.enter(); { let mut futures = JoinSet::new(); for site in uncrawled { gauge!(BEING_PROCESSED).increment(1); - futures.spawn(process(site, db.clone(), reqwest.clone(), s3.clone())); + futures.spawn(process(site, db.clone(), reqwest.clone())); // let percent = format!("{:.2}%", (crawled as f32 / budget as f32) * 100f32); // info!("Crawled {crawled} out of {budget} pages. ({percent})"); } - debug!("Joining {} futures...", futures.len()); let c = counter!(SITES_CRAWLED); // As futures complete runs code in while block @@ -152,13 +138,13 @@ async fn main() { } drop(span); - debug!("Done"); + info!("Done"); } -#[instrument(skip(db, s3, reqwest))] +#[instrument(skip(db, reqwest))] /// Downloads and crawls and stores a webpage. /// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver -async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Client, s3: S3) { +async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Client) { // METRICS trace!("Process: {}", &site.site); @@ -182,7 +168,7 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien .await .expect("Failed to read http response's body!"); // Store document - s3.store(&data, &site.site).await; + filesystem::store(&data, &site.site).await; // Parse document and get relationships let sites = parser::parse(&site, &data).await; @@ -191,12 +177,19 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien site.set_crawled(); Website::store_all(vec![site], &db).await; - // Store all the other sites so that we can link to them. - // let mut links_to = Vec::new(); - let _ = Website::store_all(sites, &db).await; + // De-duplicate this list + let prev_len = sites.len(); + let set = sites.into_iter().fold(HashSet::new(), |mut set,item| { + set.insert(item); + set + }); + let de_dupe_sites: Vec = set.into_iter().collect(); + let diff = prev_len - de_dupe_sites.len(); + trace!("Saved {diff} from being entered into the db by de-duping"); + + // Store all the other sites so that we can link to them. + let _ = Website::store_all(de_dupe_sites, &db).await; - // Make the database's links reflect the html links between sites - // site.links_to(others, &db).await; } else { error!("Failed to get: {}", &site.site); } diff --git a/src/parser.rs b/src/parser.rs index 3fc1276..49001e0 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -5,18 +5,22 @@ use html5ever::tokenizer::{BufferQueue, TokenizerResult}; use html5ever::tokenizer::{StartTag, TagToken}; use html5ever::tokenizer::{Token, TokenSink, TokenSinkResult, Tokenizer, TokenizerOpts}; use html5ever::{local_name, tendril::*}; -use tracing::instrument; +use tracing::{debug, error, info, instrument, trace, warn}; +use url::Url; use crate::db::Website; impl TokenSink for Website { type Handle = Vec; + #[instrument(skip(token, _line_number))] fn process_token(&self, token: Token, _line_number: u64) -> TokenSinkResult { match token { TagToken(tag) => { if tag.kind == StartTag { match tag.name { + // this should be all the html + // elements that have links local_name!("a") | local_name!("audio") | local_name!("area") @@ -31,23 +35,46 @@ impl TokenSink for Website { let attr_name = attr.name.local.to_string(); if attr_name == "src" || attr_name == "href" || attr_name == "data" { - // Get clone of the current site object - let mut web = self.clone(); - - // Set url - let mut url = web.site; - url.set_fragment(None); // removes #xyz - let joined = url - .join(&attr.value) - .expect("Failed to join url during parsing!"); - web.site = joined; - - web.crawled = false; - - links.push(web); + let url: Option = match Url::parse(&attr.value) { + Ok(ok) => { + trace!("Found `{}` in the html on `{}` tag", ok.to_string(), tag.name); + Some(ok) + }, + Err(e) => { + if attr.value.eq(&Tendril::from_char('#')) { + trace!("Rejecting # url"); + None + } else { + match e { + url::ParseError::RelativeUrlWithoutBase => { + let origin = self.site.origin().ascii_serialization(); + let url = origin.clone() + &attr.value; + trace!("Built `{url}` from `{origin} + {}`", &attr.value.to_string()); + if let Ok(url) = Url::parse(&url) { + trace!("Saved relative url `{}` AS: `{}`", &attr.value, url); + Some(url) + } else { + error!("Failed to reconstruct a url from relative url: `{}` on site: `{}`", &attr.value, self.site.to_string()); + None + } + }, + _ => { + error!("MISC error: {:?} {:?}", e, &attr.value); + None + }, + } + } + }, + }; + if let Some(mut parsed) = url { + parsed.set_query(None); + parsed.set_fragment(None); + debug!("Final cleaned URL: `{}`", parsed.to_string()); + let web = Website::new(&parsed.to_string(), false); + links.push(web); + } } } - return TokenSinkResult::Script(links); } local_name!("button") | local_name!("meta") | local_name!("iframe") => { @@ -76,7 +103,7 @@ pub async fn parse(site: &Website, data: &str) -> Vec { token_buffer.push_back( chunk .try_reinterpret::() - .expect("Failed to reinterprt chunk!"), + .expect("Failed to reinterpret chunk!"), ); // create the tokenizer let tokenizer = Tokenizer::new(site.clone(), TokenizerOpts::default()); diff --git a/src/s3.rs b/src/s3.rs deleted file mode 100644 index 4efc2b4..0000000 --- a/src/s3.rs +++ /dev/null @@ -1,100 +0,0 @@ -use metrics::counter; -use minio::s3::{ - args::{BucketExistsArgs, MakeBucketArgs}, - client::ClientBuilder, - creds::StaticProvider, - error::Error, - http::BaseUrl, - Client, -}; -use tracing::{instrument, trace, warn}; -use url::Url; - -use crate::{db::Website, Config}; - -const S3_ROUND_TRIP_METRIC: &str = "s3_trips"; - -#[derive(Clone)] -pub struct S3 { - bucket_name: String, - client: Client, -} - -impl S3 { - #[instrument(skip_all, name = "S3")] - pub async fn connect(config: &Config) -> Result { - let base_url = config - .s3_url - .parse::() - .expect("Failed to parse url into BaseUrl"); - - let static_provider = - StaticProvider::new(&config.s3_access_key, &config.s3_secret_key, None); - - let client = ClientBuilder::new(base_url) - .provider(Some(Box::new(static_provider))) - .build()?; - - trace!("Checking bucket..."); - let exists = client - .bucket_exists( - &BucketExistsArgs::new(&config.s3_bucket) - .expect("Failed to check if bucket exists"), - ) - .await?; - counter!(S3_ROUND_TRIP_METRIC).increment(1); - - if !exists { - trace!("Creating bucket..."); - client - .make_bucket( - &MakeBucketArgs::new(&config.s3_bucket).expect("Failed to create bucket!"), - ) - .await?; - } - counter!(S3_ROUND_TRIP_METRIC).increment(1); - - trace!("Connection successful"); - - Ok(Self { - bucket_name: config.s3_bucket.to_owned(), - client, - }) - } - - #[instrument(name = "s3_store", skip_all)] - pub async fn store(&self, data: &str, url: &Url) { - let counter = counter!(S3_ROUND_TRIP_METRIC); - - let filename = Website::get_url_as_string(url); - trace!("Storing {} as {filename}", url.to_string()); - - counter.increment(1); - match &self - .client - .put_object_content(&self.bucket_name, &filename, data.to_owned()) - .send() - .await - { - Ok(_) => {} - Err(err) => match err { - Error::InvalidObjectName(_) => { - // This code will really only run if the url has non-english chars - warn!("Tried storing invalid object name, retrying with Base64 encoding. Last try."); - - let filename: String = Website::get_url_as_b64_path(url); - - counter.increment(1); - let _ = &self - .client - .put_object_content(&self.bucket_name, &filename, data.to_owned()) - .send() - .await - .unwrap(); - } - _ => {} - }, - }; - } -} -