foss_storage #3
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @@ -5,3 +5,4 @@ perf.data | ||||
| flamegraph.svg | ||||
| perf.data.old | ||||
| /docker/logs/* | ||||
| /downloaded | ||||
							
								
								
									
										13
									
								
								Crawler.toml
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								Crawler.toml
									
									
									
									
									
								
							| @@ -3,14 +3,9 @@ surreal_url = "localhost:8000" | ||||
| surreal_username = "root" | ||||
| surreal_password = "root" | ||||
| surreal_ns = "test" | ||||
| surreal_db = "v1.17" | ||||
|  | ||||
| # Minio config | ||||
| s3_bucket = "v1.17" | ||||
| s3_url = "http://localhost:9000" | ||||
| s3_access_key = "Ok6s9uQEvKrqRoGZdacm" | ||||
| s3_secret_key = "qubeSkP787c7QZu4TvtnuwPTGIAq6ETPupCxvv6K" | ||||
| surreal_db = "v1.19.5" | ||||
|  | ||||
| # Crawler config | ||||
| crawl_filter = "en.wikipedia.com"  | ||||
| budget = 1000 | ||||
| crawl_filter = "en.wikipedia.org"  | ||||
| start_url = "https://en.wikipedia.org" | ||||
| budget = 100 | ||||
|   | ||||
							
								
								
									
										36
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										36
									
								
								README.md
									
									
									
									
									
								
							| @@ -2,13 +2,43 @@ | ||||
|  | ||||
| Crawls sites saving all the found links to a surrealdb database. It then proceeds to take batches of 100 uncrawled links untill the crawl budget is reached. It saves the data of each site in a minio database. | ||||
|  | ||||
| ## How to use | ||||
|  | ||||
| 1. Clone the repo and `cd` into it. | ||||
| 2. Build the repo with `cargo build -r` | ||||
| 3. Start the docker conatiners | ||||
| 	1. cd into the docker folder `cd docker` | ||||
| 	2. Bring up the docker containers `docker compose up -d` | ||||
| 4. From the project's root, edit the `Crawler.toml` file to your liking. | ||||
| 5. Run with `./target/release/internet_mapper` | ||||
|  | ||||
| You can view stats of the project at `http://<your-ip>:3000/dashboards` | ||||
|  | ||||
| ```bash | ||||
| # Untested script but probably works | ||||
| git clone https://git.oliveratkinson.net/Oliver/internet_mapper.git | ||||
| cd internet_mapper | ||||
|  | ||||
| cargo build -r | ||||
|  | ||||
| cd docker | ||||
| docker compose up -d | ||||
| cd .. | ||||
|  | ||||
| $EDITOR Crawler.toml | ||||
|  | ||||
| ./target/release/internet_mapper | ||||
|  | ||||
| ``` | ||||
|  | ||||
| ### TODO | ||||
|  | ||||
| - [ ] Domain filtering - prevent the crawler from going on alternate versions of wikipedia. | ||||
| - [x] Domain filtering - prevent the crawler from going on alternate versions of wikipedia. | ||||
| - [ ] Conditionally save content - based on filename or file contents | ||||
| - [x] GUI / TUI ? - Graphana | ||||
| - [x] Better asynchronous getting of the sites. Currently it all happens serially. | ||||
| - [ ] Allow for storing asynchronously | ||||
| - [x] Allow for storing asynchronously - dropping the "links to" logic fixes this need | ||||
| - [x] Control crawler via config file (no recompliation needed) | ||||
|  | ||||
| 3/17/25: Took >1hr to crawl 100 pages | ||||
|  | ||||
| @@ -17,6 +47,8 @@ This ment we stored 1000 pages, 142,997 urls, and 1,425,798 links between the tw | ||||
|  | ||||
| 3/20/25: Took 5min to crawl 1000 pages | ||||
|  | ||||
| 3/21/25: Took 3min to crawl 1000 pages | ||||
|  | ||||
| # About | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -14,22 +14,6 @@ services: | ||||
|       - --pass | ||||
|       - root | ||||
|       - rocksdb:/mydata/database.db | ||||
|   minio: | ||||
|     image: quay.io/minio/minio | ||||
|     ports: | ||||
|       - 9000:9000 | ||||
|       - 9001:9001 | ||||
|     environment: | ||||
|       - MINIO_ROOT_USER=root | ||||
|       - MINIO_ROOT_PASSWORD=an8charpassword | ||||
|       - MINIO_PROMETHEUS_AUTH_TYPE=public | ||||
|     volumes: | ||||
|       - minio_storage:/data | ||||
|     command: | ||||
|       - server | ||||
|       - /data | ||||
|       - --console-address | ||||
|       - ":9001" | ||||
|  | ||||
|   alloy: | ||||
|     image: grafana/alloy:latest | ||||
| @@ -82,4 +66,3 @@ volumes: | ||||
|   grafana_storage: | ||||
|   alloy_storage: | ||||
|   surrealdb_storage: | ||||
|   minio_storage: | ||||
|   | ||||
| @@ -8,13 +8,10 @@ scrape_configs: | ||||
|     # change this your machine's ip, localhost won't work | ||||
|     # because localhost refers to the docker container. | ||||
|       - targets: ['172.20.239.48:2500'] | ||||
|         #- targets: ['192.168.8.209:2500'] | ||||
|   - job_name: loki | ||||
|     static_configs: | ||||
|       - targets: ['loki:3100'] | ||||
|   - job_name: prometheus | ||||
|     static_configs: | ||||
|       - targets: ['localhost:9090'] | ||||
|   - job_name: minio | ||||
|     metrics_path: /minio/v2/metrics/cluster | ||||
|     static_configs: | ||||
|       - targets: ['minio:9000'] | ||||
|   | ||||
							
								
								
									
										108
									
								
								src/db.rs
									
									
									
									
									
								
							
							
						
						
									
										108
									
								
								src/db.rs
									
									
									
									
									
								
							| @@ -1,34 +1,20 @@ | ||||
| use base64::{ | ||||
|     alphabet, | ||||
|     engine::{self, general_purpose}, | ||||
|     Engine, | ||||
| }; | ||||
| use metrics::counter; | ||||
| use serde::{ser::SerializeStruct, Deserialize, Serialize}; | ||||
| use std::{fmt::Debug, sync::LazyLock, time::Instant}; | ||||
| use std::fmt::Debug; | ||||
| use serde::{Deserialize, Serialize}; | ||||
| use surrealdb::{ | ||||
|     engine::remote::ws::{Client, Ws}, | ||||
|     opt::auth::Root, | ||||
|     sql::Thing, | ||||
|     Response, Surreal, | ||||
|     Surreal, | ||||
| }; | ||||
| use tokio::sync::Mutex; | ||||
| use tracing::{error, instrument, trace, warn}; | ||||
| use tracing::{error, instrument, trace}; | ||||
| use url::Url; | ||||
|  | ||||
| use crate::Config; | ||||
|  | ||||
| // static LOCK: LazyLock<Arc<Mutex<bool>>> = LazyLock::new(|| Arc::new(Mutex::new(true))); | ||||
| static LOCK: LazyLock<Mutex<bool>> = LazyLock::new(|| Mutex::new(true)); | ||||
|  | ||||
| const CUSTOM_ENGINE: engine::GeneralPurpose = | ||||
|     engine::GeneralPurpose::new(&alphabet::URL_SAFE, general_purpose::NO_PAD); | ||||
|  | ||||
| const TIME_SPENT_ON_LOCK: &str = "surql_lock_waiting_ms"; | ||||
| const STORE: &str = "surql_store_calls"; | ||||
| const LINK: &str = "surql_link_calls"; | ||||
|  | ||||
| #[derive(Deserialize, Clone)] | ||||
| #[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash)] | ||||
| pub struct Website { | ||||
|     /// The url that this data is found at | ||||
|     pub site: Url, | ||||
| @@ -36,18 +22,6 @@ pub struct Website { | ||||
|     pub crawled: bool, | ||||
| } | ||||
|  | ||||
| impl Serialize for Website { | ||||
|     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> | ||||
|     where | ||||
|         S: serde::Serializer { | ||||
|             let mut state = serializer.serialize_struct("Website", 2)?; | ||||
|             state.serialize_field("crawled", &self.crawled)?; | ||||
|             // to_string() calls the correct naming of site | ||||
|             state.serialize_field("site", &self.site.to_string())?; | ||||
|             state.end() | ||||
|     } | ||||
| } | ||||
|  | ||||
| // manual impl to make tracing look nicer | ||||
| impl Debug for Website { | ||||
|     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||||
| @@ -73,82 +47,13 @@ impl Website { | ||||
|         self.crawled = true | ||||
|     } | ||||
|  | ||||
|     pub fn get_url_as_string(site: &Url) -> String { | ||||
|         let domain = match site.domain() { | ||||
|             Some(s) => s.to_string(), | ||||
|             None => { | ||||
|                 warn!("Failed to get domain of URL: {}, falling back to 'localhost'", site.to_string()); | ||||
|                 "localhost".to_string() | ||||
|             } | ||||
|         }; | ||||
|         let path = site.path(); | ||||
|  | ||||
|         domain + path | ||||
|     } | ||||
|  | ||||
|     pub fn get_url_as_b64_path(site: &Url) -> String { | ||||
|         let domain = site.domain().unwrap_or("DOMAIN").to_string(); | ||||
|         let path = &CUSTOM_ENGINE.encode(site.path()); | ||||
|  | ||||
|         domain + path | ||||
|     } | ||||
|  | ||||
|     #[instrument(skip_all)] | ||||
|     pub async fn links_to(&self, other: Vec<Thing>, db: &Surreal<Client>) { | ||||
|         let len = other.len(); | ||||
|         if len == 0 { | ||||
|             return; | ||||
|         } | ||||
|          | ||||
|         let from = &self.site; | ||||
|  | ||||
|         // let to = other.site.to_string(); | ||||
|         trace!("Linking {} pages to {from}", other.len()); | ||||
|         counter!(LINK).increment(1); | ||||
|         match db | ||||
|             .query("COUNT(RELATE (SELECT id FROM website WHERE site = $in) -> links_to -> $out)") | ||||
|             .bind(("in", from.clone())) | ||||
|             .bind(("out", other)) | ||||
|             .await | ||||
|         { | ||||
|             Ok(mut e) => { | ||||
|                 // The relate could technically "fail" (not relate anything), this just means that | ||||
|                 // the query was ok. | ||||
|                 let _: Response = e; | ||||
|                 if let Ok(vec) = e.take(0) { | ||||
|                     let _: Vec<usize> = vec; | ||||
|                     if let Some(num) = vec.first() { | ||||
|                         if *num == len { | ||||
|                             trace!("Link for {from} OK - {num}/{len}"); | ||||
|                             return; | ||||
|                         } else { | ||||
|                             error!("Didn't link all the records. {num}/{len}. Surreal response: {:?}", e); | ||||
|                             return; | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|                 error!("Linking request succeeded but couldn't verify the results."); | ||||
|             } | ||||
|             Err(e) => { | ||||
|                 error!("{}", e.to_string()); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     // Insert ever item in the vec into surreal, crawled state will be preserved as TRUE | ||||
|     // if already in the database as such or incoming data is TRUE. | ||||
|     #[instrument(skip(db))] | ||||
|     pub async fn store_all(all: Vec<Self>, db: &Surreal<Client>) -> Vec<Thing> { | ||||
|         counter!(STORE).increment(1); | ||||
|         let mut things = Vec::with_capacity(all.len()); | ||||
|  | ||||
|         // TODO this only allows for one thread to be in the database at a time. | ||||
|         // This is currently required since otherwise we get write errors. | ||||
|         // If the default `crawled` is set to false, we might not need to write any more | ||||
|         // than just the name. `accessed_at` is fun but not needed. | ||||
|         let now = Instant::now(); | ||||
|         let lock = LOCK.lock().await; | ||||
|         counter!(TIME_SPENT_ON_LOCK).increment(now.elapsed().as_millis() as u64); | ||||
|  | ||||
|         match db | ||||
|             .query( | ||||
|                 "INSERT INTO website $array | ||||
| @@ -169,7 +74,6 @@ impl Website { | ||||
|                 error!("{:?}", err); | ||||
|             } | ||||
|         } | ||||
|         drop(lock); | ||||
|         things | ||||
|     } | ||||
| } | ||||
|   | ||||
							
								
								
									
										60
									
								
								src/filesystem.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										60
									
								
								src/filesystem.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,60 @@ | ||||
| use std::{ffi::OsStr, path::PathBuf}; | ||||
|  | ||||
| use tokio::fs; | ||||
| use tracing::{debug, error, instrument, trace, warn}; | ||||
| use url::Url; | ||||
|  | ||||
| #[instrument(skip(data))] | ||||
| /// Returns whether or not the saved file should be parsed. | ||||
| /// If the file is just data, like an image, it doesn't need to be parsed. | ||||
| /// If it's html, then it does need to be parsed. | ||||
| pub async fn store(data: &str, url: &Url) -> bool { | ||||
|     // extract data from url to save it accurately | ||||
|     let url_path = PathBuf::from("./downloaded/".to_string() + url.domain().unwrap_or("UnknownDomain") + url.path()); | ||||
|  | ||||
|     // if it's a file | ||||
|     let (basepath, filename) = if url_path.extension().filter(valid_file_extension).is_some() { | ||||
|         // get everything up till the file | ||||
|         let basepath = url_path.ancestors().skip(1).take(1).collect::<PathBuf>(); | ||||
|         // get the file name | ||||
|         let filename = url_path.file_name().expect("This should exist").to_string_lossy(); | ||||
|         trace!("Save path: {:?} and base path: {:?}", &url_path, &basepath); | ||||
|         (basepath, filename.to_string()) | ||||
|     } else { | ||||
|         (url_path.clone(), "index.html".into()) | ||||
|     }; | ||||
|  | ||||
|     let should_parse = filename.ends_with(".html"); | ||||
|  | ||||
|     debug!("Writing at: {:?} {:?}", basepath, filename); | ||||
|  | ||||
|     // create the folders | ||||
|     if let Err(err) = fs::create_dir_all(&basepath).await { | ||||
|         error!("Dir creation: {err} {:?}", basepath); | ||||
|     } else { | ||||
|         if let Err(err) = fs::write(&basepath.join(filename), data).await { | ||||
|             error!("File creation: {err} {:?}", url_path); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     should_parse | ||||
| } | ||||
|  | ||||
| fn valid_file_extension(take: &&OsStr) -> bool { | ||||
|     let los = take.to_string_lossy(); | ||||
|     let all = los.split('.'); | ||||
|     match all.last() { | ||||
|         Some(s) => { | ||||
|             // FIXME it's worth noting that the dumb tlds like .zip are in here, | ||||
|             // which could cause problems | ||||
|             let all_domains = include_str!("tlds-alpha-by-domain.txt"); | ||||
|  | ||||
|             // check if it is a domain | ||||
|             match all_domains.lines().map(str::to_lowercase).find(|x| x==s.to_lowercase().as_str()) { | ||||
|                 Some(_) => false, | ||||
|                 None => true | ||||
|             } | ||||
|         }, | ||||
|         None => false, | ||||
|     } | ||||
| } | ||||
							
								
								
									
										107
									
								
								src/main.rs
									
									
									
									
									
								
							
							
						
						
									
										107
									
								
								src/main.rs
									
									
									
									
									
								
							| @@ -3,30 +3,29 @@ | ||||
| extern crate html5ever; | ||||
|  | ||||
| use std::{ | ||||
|     fs::File, | ||||
|     io::Read, | ||||
|     net::{IpAddr, Ipv4Addr}, | ||||
|     collections::HashSet, fs::File, io::Read, net::{IpAddr, Ipv4Addr} | ||||
| }; | ||||
|  | ||||
| use db::{connect, Website}; | ||||
| use metrics::{counter, gauge}; | ||||
| use metrics_exporter_prometheus::PrometheusBuilder; | ||||
| use s3::S3; | ||||
| use serde::Deserialize; | ||||
| use surrealdb::{engine::remote::ws::Client, Surreal}; | ||||
| use tokio::task::JoinSet; | ||||
| use tracing::{debug, error, info, instrument, trace, trace_span}; | ||||
| use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, trace_span}; | ||||
| use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry}; | ||||
|  | ||||
| mod db; | ||||
| mod parser; | ||||
| mod s3; | ||||
| mod filesystem; | ||||
|  | ||||
| const GET_METRIC: &str = "total_gets"; | ||||
| const GET_IN_FLIGHT: &str = "gets_in_flight"; | ||||
| const SITES_CRAWLED: &str = "pages_crawled"; | ||||
| const BEING_PROCESSED: &str = "pages_being_processed"; | ||||
|  | ||||
| const BATCH_SIZE: usize = 2; | ||||
|  | ||||
| #[derive(Deserialize)] | ||||
| struct Config { | ||||
|     surreal_ns: String, | ||||
| @@ -35,24 +34,24 @@ struct Config { | ||||
|     surreal_username: String, | ||||
|     surreal_password: String, | ||||
|  | ||||
|     s3_url: String, | ||||
|     s3_bucket: String, | ||||
|     s3_access_key: String, | ||||
|     s3_secret_key: String, | ||||
|  | ||||
|     crawl_filter: String, | ||||
|     start_url: String, | ||||
|     budget: usize, | ||||
| } | ||||
|  | ||||
| #[tokio::main] | ||||
| async fn main() { | ||||
|     println!("Logs and metrics are provided to the Grafana dashboard"); | ||||
|  | ||||
|     let writer = std::fs::OpenOptions::new() | ||||
|         .append(true) | ||||
|         .create(true) | ||||
|         .open("./docker/logs/tracing.log") | ||||
|         .expect("Couldn't make log file!"); | ||||
|  | ||||
|     let filter = EnvFilter::from_default_env(); | ||||
|     let filter = EnvFilter::builder() | ||||
|         .with_default_directive(LevelFilter::DEBUG.into()) | ||||
|         .from_env_lossy(); | ||||
|  | ||||
|     let registry = Registry::default().with( | ||||
|         fmt::layer() | ||||
| @@ -75,9 +74,8 @@ async fn main() { | ||||
|         .install() | ||||
|         .expect("failed to install recorder/exporter"); | ||||
|  | ||||
|     debug!("Starting..."); | ||||
|     // Would probably take these in as parameters from a cli | ||||
|     let starting_url = "https://en.wikipedia.org/"; | ||||
|     info!("Starting..."); | ||||
|  | ||||
|     // When getting uncrawled pages, name must contain this variable. "" will effectively get ignored. | ||||
|     // let crawl_filter = "en.wikipedia.org/"; | ||||
|     // let budget = 50; | ||||
| @@ -88,13 +86,11 @@ async fn main() { | ||||
|     let _ = file.read_to_string(&mut buf); | ||||
|  | ||||
|     let config: Config = toml::from_str(&buf).expect("Failed to parse Crawler.toml"); | ||||
|     let starting_url = &config.start_url; | ||||
|  | ||||
|     let db = connect(&config) | ||||
|         .await | ||||
|         .expect("Failed to connect to surreal, aborting."); | ||||
|     let s3 = S3::connect(&config) | ||||
|         .await | ||||
|         .expect("Failed to connect to minio, aborting.\n\nThis probably means you need to login to the minio console and get a new access key!\n\n(Probably here) http://localhost:9001/access-keys/new-account\n\n"); | ||||
|  | ||||
|     let reqwest = reqwest::Client::builder() | ||||
|         // .use_rustls_tls() | ||||
| @@ -108,38 +104,27 @@ async fn main() { | ||||
|     let pre_loop_span = span.enter(); | ||||
|     // Download the site | ||||
|     let site = Website::new(starting_url, false); | ||||
|     process(site, db.clone(), reqwest.clone(), s3.clone()).await; | ||||
|     process(site, db.clone(), reqwest.clone()).await; | ||||
|  | ||||
|     drop(pre_loop_span); | ||||
|  | ||||
|     let span = trace_span!("Loop"); | ||||
|     let span = span.enter(); | ||||
|     while crawled < config.budget { | ||||
|         let get_num = if config.budget - crawled < 100 { | ||||
|             config.budget - crawled | ||||
|         } else { | ||||
|             100 | ||||
|         }; | ||||
|  | ||||
|         let uncrawled = get_uncrawled_links(&db, get_num, config.crawl_filter.clone()).await; | ||||
|         let uncrawled = get_uncrawled_links(&db, config.budget - crawled, config.crawl_filter.clone()).await; | ||||
|         if uncrawled.is_empty() { | ||||
|             info!("Had more budget but finished crawling everything."); | ||||
|             return; | ||||
|         } | ||||
|         debug!("Crawling {} pages...", uncrawled.len()); | ||||
|  | ||||
|         let span = trace_span!("Crawling"); | ||||
|         let _ = span.enter(); | ||||
|  | ||||
|         { | ||||
|             let mut futures = JoinSet::new(); | ||||
|             for site in uncrawled { | ||||
|                 gauge!(BEING_PROCESSED).increment(1); | ||||
|                 futures.spawn(process(site, db.clone(), reqwest.clone(), s3.clone())); | ||||
|                 futures.spawn(process(site, db.clone(), reqwest.clone())); | ||||
|                 // let percent = format!("{:.2}%", (crawled as f32 / budget as f32) * 100f32); | ||||
|                 // info!("Crawled {crawled} out of {budget} pages. ({percent})"); | ||||
|             } | ||||
|             debug!("Joining {} futures...", futures.len()); | ||||
|  | ||||
|             let c = counter!(SITES_CRAWLED); | ||||
|             // As futures complete runs code in while block | ||||
| @@ -152,13 +137,22 @@ async fn main() { | ||||
|     } | ||||
|     drop(span); | ||||
|  | ||||
|     debug!("Done"); | ||||
|     if let Ok(mut ok) = db.query("count(select id from website where crawled = true)").await { | ||||
|         let res = ok.take::<Option<usize>>(0); | ||||
|         if let Ok(i) = res { | ||||
|             if let Some(n) = i { | ||||
|                 info!("Total crawled pages now equals {n}"); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
| #[instrument(skip(db, s3, reqwest))] | ||||
|     info!("Done"); | ||||
| } | ||||
|  | ||||
| #[instrument(skip(db, reqwest))] | ||||
| /// Downloads and crawls and stores a webpage. | ||||
| /// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver | ||||
| async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client, s3: S3) { | ||||
| async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client) { | ||||
|      | ||||
|     // METRICS | ||||
|     trace!("Process: {}", &site.site); | ||||
| @@ -172,31 +166,44 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien | ||||
|     // Send the http request (get) | ||||
|     if let Ok(response) = request_builder.send().await { | ||||
|  | ||||
|         // METRICS | ||||
|         g.decrement(1); | ||||
|         counter!(GET_METRIC).increment(1); | ||||
|  | ||||
|         // TODO if this will fail if the object we are downloading is | ||||
|         // larger than the memory of the device it's running on. | ||||
|         // We should store it *as* we download it then parse it in-place. | ||||
|         // Get body from response | ||||
|         let data = response | ||||
|             .text() | ||||
|             .await | ||||
|             .expect("Failed to read http response's body!"); | ||||
|         // Store document | ||||
|         s3.store(&data, &site.site).await; | ||||
|  | ||||
|         // METRICS | ||||
|         g.decrement(1); | ||||
|         counter!(GET_METRIC).increment(1); | ||||
|  | ||||
|         // Store document | ||||
|         let should_parse = filesystem::store(&data, &site.site).await; | ||||
|  | ||||
|         if should_parse { | ||||
|             // Parse document and get relationships | ||||
|             let sites = parser::parse(&site, &data).await; | ||||
|          | ||||
|             // De-duplicate this list | ||||
|             let prev_len = sites.len(); | ||||
|             let set = sites.into_iter().fold(HashSet::new(), |mut set,item| { | ||||
|                 set.insert(item); | ||||
|                 set | ||||
|             }); | ||||
|             let de_dupe_sites: Vec<Website> = set.into_iter().collect(); | ||||
|             let diff = prev_len - de_dupe_sites.len(); | ||||
|             trace!("Saved {diff} from being entered into the db by de-duping"); | ||||
|  | ||||
|             // Store all the other sites so that we can link to them. | ||||
|             let _ = Website::store_all(de_dupe_sites, &db).await; | ||||
|         } | ||||
|  | ||||
|         // update self in db | ||||
|         site.set_crawled(); | ||||
|         Website::store_all(vec![site], &db).await; | ||||
|  | ||||
|         // Store all the other sites so that we can link to them. | ||||
|         // let mut links_to = Vec::new(); | ||||
|         let _ = Website::store_all(sites, &db).await; | ||||
|  | ||||
|         // Make the database's links reflect the html links between sites | ||||
|         // site.links_to(others, &db).await; | ||||
|     } else { | ||||
|         error!("Failed to get: {}", &site.site); | ||||
|     } | ||||
| @@ -209,9 +216,11 @@ async fn get_uncrawled_links( | ||||
|     mut count: usize, | ||||
|     filter: String, | ||||
| ) -> Vec<Website> { | ||||
|     if count > 100 { | ||||
|         count = 100 | ||||
|  | ||||
|     if count > BATCH_SIZE { | ||||
|         count = BATCH_SIZE; | ||||
|     } | ||||
|  | ||||
|     debug!("Getting uncrawled links"); | ||||
|  | ||||
|     let mut response = db | ||||
|   | ||||
| @@ -5,18 +5,21 @@ use html5ever::tokenizer::{BufferQueue, TokenizerResult}; | ||||
| use html5ever::tokenizer::{StartTag, TagToken}; | ||||
| use html5ever::tokenizer::{Token, TokenSink, TokenSinkResult, Tokenizer, TokenizerOpts}; | ||||
| use html5ever::{local_name, tendril::*}; | ||||
| use tracing::instrument; | ||||
| use tracing::{debug, error, instrument, trace, warn}; | ||||
| use url::Url; | ||||
|  | ||||
| use crate::db::Website; | ||||
|  | ||||
| impl TokenSink for Website { | ||||
|     type Handle = Vec<Website>; | ||||
|  | ||||
|     #[instrument(skip(token, _line_number))] | ||||
|     fn process_token(&self, token: Token, _line_number: u64) -> TokenSinkResult<Self::Handle> { | ||||
|         match token { | ||||
|             TagToken(tag) => { | ||||
|                 if tag.kind == StartTag { | ||||
|                     match tag.name { | ||||
|                         // this should be all the html elements that have links | ||||
|                         local_name!("a") | ||||
|                         | local_name!("audio") | ||||
|                         | local_name!("area") | ||||
| @@ -31,23 +34,18 @@ impl TokenSink for Website { | ||||
|                                 let attr_name = attr.name.local.to_string(); | ||||
|                                 if attr_name == "src" || attr_name == "href" || attr_name == "data" | ||||
|                                 { | ||||
|                                     // Get clone of the current site object | ||||
|                                     let mut web = self.clone(); | ||||
|  | ||||
|                                     // Set url | ||||
|                                     let mut url = web.site; | ||||
|                                     url.set_fragment(None); // removes #xyz | ||||
|                                     let joined = url | ||||
|                                         .join(&attr.value) | ||||
|                                         .expect("Failed to join url during parsing!"); | ||||
|                                     web.site = joined; | ||||
|  | ||||
|                                     web.crawled = false; | ||||
|                                     trace!("Found `{}` in html `{}` tag", &attr.value, tag.name); | ||||
|                                     let url = try_get_url(&self.site, &attr.value); | ||||
|  | ||||
|                                     if let Some(mut parsed) = url { | ||||
|                                         parsed.set_query(None); | ||||
|                                         parsed.set_fragment(None); | ||||
|                                         debug!("Final cleaned URL: `{}`", parsed.to_string()); | ||||
|                                         let web = Website::new(&parsed.to_string(), false); | ||||
|                                         links.push(web); | ||||
|                                     } | ||||
|                                 } | ||||
|  | ||||
|                             } | ||||
|                             return TokenSinkResult::Script(links); | ||||
|                         } | ||||
|                         local_name!("button") | local_name!("meta") | local_name!("iframe") => { | ||||
| @@ -76,7 +74,7 @@ pub async fn parse(site: &Website, data: &str) -> Vec<Website> { | ||||
|     token_buffer.push_back( | ||||
|         chunk | ||||
|             .try_reinterpret::<fmt::UTF8>() | ||||
|             .expect("Failed to reinterprt chunk!"), | ||||
|             .expect("Failed to reinterpret chunk!"), | ||||
|     ); | ||||
|     // create the tokenizer | ||||
|     let tokenizer = Tokenizer::new(site.clone(), TokenizerOpts::default()); | ||||
| @@ -92,3 +90,56 @@ pub async fn parse(site: &Website, data: &str) -> Vec<Website> { | ||||
|     other_sites | ||||
| } | ||||
|  | ||||
| #[instrument] | ||||
| fn try_get_url(parent: &Url, link: &str) -> Option<Url> { | ||||
|     match Url::parse(link) { | ||||
|         Ok(ok) => Some(ok), | ||||
|         Err(e) => { | ||||
|             if link.starts_with('#') { | ||||
|                 trace!("Rejecting # url"); | ||||
|                 None | ||||
|             } else if link.starts_with("//") { | ||||
|                 // if a url starts with "//" is assumed that it will adopt | ||||
|                 // the same scheme as it's parent | ||||
|                 // https://stackoverflow.com/questions/9646407/two-forward-slashes-in-a-url-src-href-attribute | ||||
|                 let scheme = parent.scheme(); | ||||
|  | ||||
|                 match Url::parse(&format!("{scheme}://{}", link)) { | ||||
|                     Ok(url) => Some(url), | ||||
|                     Err(err) => { | ||||
|                         error!("Failed parsing realative scheme url: {}", err); | ||||
|                         None | ||||
|                     } | ||||
|                 } | ||||
|             } else { | ||||
|                 // # This is some sort of realative url, gonna try patching it up into an absolute | ||||
|                 // url | ||||
|                 match e { | ||||
|                     url::ParseError::RelativeUrlWithoutBase => { | ||||
|                         // Is: scheme://host:port | ||||
|                         let origin = parent.origin().ascii_serialization(); | ||||
|                         let url = origin.clone() + link; | ||||
|  | ||||
|                         trace!("Built `{url}` from `{origin} + {}`", link.to_string()); | ||||
|  | ||||
|                         if let Ok(url) = Url::parse(&url) { | ||||
|                             trace!("Saved relative url `{}` AS: `{}`", link, url); | ||||
|                             Some(url) | ||||
|                         } else { | ||||
|                             error!( | ||||
|                                 "Failed to reconstruct a url from relative url: `{}` on site: `{}`", | ||||
|                                 link, | ||||
|                                 parent.to_string() | ||||
|                             ); | ||||
|                             None | ||||
|                         } | ||||
|                     } | ||||
|                     _ => { | ||||
|                         error!("MISC error: {:?} {:?}", e, link); | ||||
|                         None | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
|   | ||||
							
								
								
									
										100
									
								
								src/s3.rs
									
									
									
									
									
								
							
							
						
						
									
										100
									
								
								src/s3.rs
									
									
									
									
									
								
							| @@ -1,100 +0,0 @@ | ||||
| use metrics::counter; | ||||
| use minio::s3::{ | ||||
|     args::{BucketExistsArgs, MakeBucketArgs}, | ||||
|     client::ClientBuilder, | ||||
|     creds::StaticProvider, | ||||
|     error::Error, | ||||
|     http::BaseUrl, | ||||
|     Client, | ||||
| }; | ||||
| use tracing::{instrument, trace, warn}; | ||||
| use url::Url; | ||||
|  | ||||
| use crate::{db::Website, Config}; | ||||
|  | ||||
| const S3_ROUND_TRIP_METRIC: &str = "s3_trips"; | ||||
|  | ||||
| #[derive(Clone)] | ||||
| pub struct S3 { | ||||
|     bucket_name: String, | ||||
|     client: Client, | ||||
| } | ||||
|  | ||||
| impl S3 { | ||||
|     #[instrument(skip_all, name = "S3")] | ||||
|     pub async fn connect(config: &Config) -> Result<Self, Error> { | ||||
|         let base_url = config | ||||
|             .s3_url | ||||
|             .parse::<BaseUrl>() | ||||
|             .expect("Failed to parse url into BaseUrl"); | ||||
|  | ||||
|         let static_provider = | ||||
|             StaticProvider::new(&config.s3_access_key, &config.s3_secret_key, None); | ||||
|  | ||||
|         let client = ClientBuilder::new(base_url) | ||||
|             .provider(Some(Box::new(static_provider))) | ||||
|             .build()?; | ||||
|  | ||||
|         trace!("Checking bucket..."); | ||||
|         let exists = client | ||||
|             .bucket_exists( | ||||
|                 &BucketExistsArgs::new(&config.s3_bucket) | ||||
|                     .expect("Failed to check if bucket exists"), | ||||
|             ) | ||||
|             .await?; | ||||
|         counter!(S3_ROUND_TRIP_METRIC).increment(1); | ||||
|  | ||||
|         if !exists { | ||||
|             trace!("Creating bucket..."); | ||||
|             client | ||||
|                 .make_bucket( | ||||
|                     &MakeBucketArgs::new(&config.s3_bucket).expect("Failed to create bucket!"), | ||||
|                 ) | ||||
|                 .await?; | ||||
|         } | ||||
|         counter!(S3_ROUND_TRIP_METRIC).increment(1); | ||||
|  | ||||
|         trace!("Connection successful"); | ||||
|  | ||||
|         Ok(Self { | ||||
|             bucket_name: config.s3_bucket.to_owned(), | ||||
|             client, | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     #[instrument(name = "s3_store", skip_all)] | ||||
|     pub async fn store(&self, data: &str, url: &Url) { | ||||
|         let counter = counter!(S3_ROUND_TRIP_METRIC); | ||||
|  | ||||
|         let filename = Website::get_url_as_string(url); | ||||
|         trace!("Storing {} as {filename}", url.to_string()); | ||||
|  | ||||
|         counter.increment(1); | ||||
|         match &self | ||||
|             .client | ||||
|             .put_object_content(&self.bucket_name, &filename, data.to_owned()) | ||||
|             .send() | ||||
|             .await | ||||
|         { | ||||
|             Ok(_) => {} | ||||
|             Err(err) => match err { | ||||
|                 Error::InvalidObjectName(_) => { | ||||
|                     // This code will really only run if the url has non-english chars | ||||
|                     warn!("Tried storing invalid object name, retrying with Base64 encoding. Last try."); | ||||
|  | ||||
|                     let filename: String = Website::get_url_as_b64_path(url); | ||||
|  | ||||
|                     counter.increment(1); | ||||
|                     let _ = &self | ||||
|                         .client | ||||
|                         .put_object_content(&self.bucket_name, &filename, data.to_owned()) | ||||
|                         .send() | ||||
|                         .await | ||||
|                         .unwrap(); | ||||
|                 } | ||||
|                 _ => {} | ||||
|             }, | ||||
|         }; | ||||
|     } | ||||
| } | ||||
|  | ||||
							
								
								
									
										1444
									
								
								src/tlds-alpha-by-domain.txt
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1444
									
								
								src/tlds-alpha-by-domain.txt
									
									
									
									
									
										Normal file
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							
		Reference in New Issue
	
	Block a user