Compare commits

9 Commits

Author SHA1 Message Date
2c339a36f9 handle checking for file better 2025-10-09 23:00:11 -06:00
73216f7003 fix the issue where nothing works 2025-10-09 22:35:01 -06:00
1e59ebd5c4 even when not downloading, update the database 2025-10-09 22:13:06 -06:00
52d5e101d0 bragging 2025-10-09 22:03:19 -06:00
5b728bacd6 close #24, make program aware of the files already on disk 2025-10-09 21:52:41 -06:00
b0fe7f4761 close #18, format 2025-10-09 21:52:06 -06:00
5ade5e36df closes #11 2025-08-08 23:39:44 -06:00
95b8af0356 Restart threads that prematurely ended 2025-08-08 23:35:01 -06:00
ad8d7c606d increase csma/ca time 2025-08-08 23:34:45 -06:00
5 changed files with 196 additions and 89 deletions

View File

@@ -40,14 +40,18 @@ $EDITOR Crawler.toml
- [x] Allow for storing asynchronously - dropping the "links to" logic fixes this need - [x] Allow for storing asynchronously - dropping the "links to" logic fixes this need
- [x] Control crawler via config file (no recompliation needed) - [x] Control crawler via config file (no recompliation needed)
3/17/25: Took >1hr to crawl 100 pages ### Feats
3/19/25: Took 20min to crawl 1000 pages 3/17/25: Took >1hr to crawl 100 pages.
3/19/25: Took 20min to crawl 1000 pages.
This ment we stored 1000 pages, 142,997 urls, and 1,425,798 links between the two. This ment we stored 1000 pages, 142,997 urls, and 1,425,798 links between the two.
3/20/25: Took 5min to crawl 1000 pages 3/20/25: Took 5min to crawl 1000 pages.
3/21/25: Took 3min to crawl 1000 pages 3/21/25: Took 3min to crawl 1000 pages.
7/.../25: Downloaded just shy of 12TB of data from a remote server.
# About # About

View File

@@ -102,7 +102,7 @@ pub async fn get_next(db: &Surreal<Client>, config: &Config) -> Option<Website>
Ok(ok) => ok, Ok(ok) => ok,
Err(_err) => { Err(_err) => {
// basically just CSMA/CA // basically just CSMA/CA
let delay = rand::random_range(10..500); let delay = rand::random_range(10..10_000);
sleep(Duration::from_millis(delay)).await; sleep(Duration::from_millis(delay)).await;
fails += 1; fails += 1;
// Don't get stuck here forever, failing... // Don't get stuck here forever, failing...
@@ -146,9 +146,9 @@ pub async fn connect(config: &Config) -> surrealdb::Result<Surreal<Client>> {
.await?; .await?;
let setup = include_bytes!("setup.surql"); let setup = include_bytes!("setup.surql");
let file = setup.iter().map(|c| *c as char).collect::<String>(); let init_commands = setup.iter().map(|c| *c as char).collect::<String>();
db.query(file) db.query(init_commands)
.await .await
.expect("Failed to setup surreal tables."); .expect("Failed to setup surreal tables.");

View File

@@ -34,10 +34,38 @@ pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf {
url_path url_path
} }
pub async fn check_file_length(file: &PathBuf) -> Option<u64> {
match tokio::fs::OpenOptions::new()
.write(false)
.read(true)
.create(false)
.open(file).await
{
Ok(file) => {
match file.metadata().await {
Ok(meta) => {
return Some(meta.len())
},
Err(err) => {
error!("Failed to get metadata. {}", err)
},
}
},
Err(err) => {
match err.kind() {
ErrorKind::NotFound => {/* ignore */},
_ => warn!("Failed to open file to check length. {:?} {}", file, err),
}
},
}
None
}
pub async fn init(filename: &PathBuf) -> Option<fs::File> { pub async fn init(filename: &PathBuf) -> Option<fs::File> {
let file = async || tokio::fs::OpenOptions::new() let file = async || tokio::fs::OpenOptions::new()
.write(true) .write(true)
.append(true) .append(false)
.create(true) .create(true)
.open(&filename).await; .open(&filename).await;

View File

@@ -5,17 +5,27 @@
extern crate html5ever; extern crate html5ever;
use std::{ use std::{
collections::HashSet, fs::File, io::Read, sync::{Arc, LazyLock} collections::HashSet,
fs::File,
io::Read,
sync::{Arc, LazyLock},
}; };
use futures_util::StreamExt;
use opentelemetry::{global::{self}, metrics::{Counter, Meter, UpDownCounter}};
use opentelemetry_otlp::{Protocol, WithExportConfig};
use db::{connect, Website}; use db::{connect, Website};
use futures_util::StreamExt;
use opentelemetry::{
global::{self},
metrics::{Counter, Meter, UpDownCounter},
};
use opentelemetry_otlp::{Protocol, WithExportConfig};
use opentelemetry_sdk::{metrics::SdkMeterProvider, trace::SdkTracerProvider}; use opentelemetry_sdk::{metrics::SdkMeterProvider, trace::SdkTracerProvider};
use serde::Deserialize; use serde::Deserialize;
use surrealdb::{engine::remote::ws::Client, Surreal}; use surrealdb::{engine::remote::ws::Client, Surreal};
use tokio::{io::{AsyncWriteExt, BufWriter}, sync::RwLock, task::JoinSet}; use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufWriter},
sync::RwLock,
task::JoinSet,
};
use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, warn}; use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, warn};
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry}; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry};
@@ -26,36 +36,29 @@ mod filesystem;
mod parser; mod parser;
static METER: LazyLock<Meter> = LazyLock::new(|| global::meter("Internet_Mapper")); static METER: LazyLock<Meter> = LazyLock::new(|| global::meter("Internet_Mapper"));
static BEING_PROCESSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| static BATCH_SIZE: LazyLock<Counter<u64>> =
LazyLock::new(|| METER.u64_counter("crawler_batch_size").build());
static BEING_PROCESSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER METER
.i64_up_down_counter("crawler_pages_being_processed") .i64_up_down_counter("crawler_pages_being_processed")
.build() .build()
); });
static BEING_PARSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| static BEING_PARSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER METER
.i64_up_down_counter("crawler_pages_being_parsed") .i64_up_down_counter("crawler_pages_being_parsed")
.build() .build()
); });
static BEING_STREAMED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| static BEING_STREAMED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER METER
.i64_up_down_counter("crawler_pages_being_streamed") .i64_up_down_counter("crawler_pages_being_streamed")
.build() .build()
); });
static GET_IN_FLIGHT: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| static GET_IN_FLIGHT: LazyLock<UpDownCounter<i64>> =
METER LazyLock::new(|| METER.i64_up_down_counter("crawler_gets_in_flight").build());
.i64_up_down_counter("crawler_gets_in_flight") static TOTAL_BYTES_DOWN: LazyLock<Counter<u64>> =
.build() LazyLock::new(|| METER.u64_counter("crawler_total_bytes_down").build());
); static SITES_CRAWLED: LazyLock<Counter<u64>> =
static TOTAL_BYTES_DOWN: LazyLock<Counter<u64>> = LazyLock::new(|| LazyLock::new(|| METER.u64_counter("crawler_total_sites_crawled").build());
METER
.u64_counter("crawler_total_bytes_down")
.build()
);
static SITES_CRAWLED: LazyLock<Counter<u64>> = LazyLock::new(||
METER
.u64_counter("crawler_total_sites_crawled")
.build()
);
static CONFIG: LazyLock<Config> = LazyLock::new(|| { static CONFIG: LazyLock<Config> = LazyLock::new(|| {
let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml"); let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml");
@@ -96,6 +99,8 @@ async fn main() {
global::set_tracer_provider(load_tracing(&CONFIG)); global::set_tracer_provider(load_tracing(&CONFIG));
global::set_meter_provider(load_metrics(&CONFIG)); global::set_meter_provider(load_metrics(&CONFIG));
BATCH_SIZE.add(CONFIG.batch_size as u64, &[]);
// When getting uncrawled pages, name must contain this variable. "" will effectively get ignored. // When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
// let crawl_filter = "en.wikipedia.org/"; // let crawl_filter = "en.wikipedia.org/";
// let budget = 50; // let budget = 50;
@@ -123,15 +128,40 @@ async fn main() {
// let mut main_loop_span= TRACER.start("Main-Loop"); // let mut main_loop_span= TRACER.start("Main-Loop");
let mut futures = JoinSet::new(); let mut futures = JoinSet::new();
for _ in 0..CONFIG.batch_size { for _ in 0..CONFIG.batch_size {
futures.spawn(process_single_thread(&CONFIG, db.clone(), reqwest.clone(), crawled.clone())); futures.spawn(process_single_thread(
&CONFIG,
db.clone(),
reqwest.clone(),
crawled.clone(),
));
} }
while let Some(_) = futures.join_next().await {
// Budget - Threads - This thread (1)
// Would roughly be the acceptable amount at which a thread should exit
if *(crawled.read().await) < CONFIG.budget - CONFIG.batch_size - 1 {
warn!("Thread terminated early, restarting");
futures.spawn(process_single_thread(
&CONFIG,
db.clone(),
reqwest.clone(),
crawled.clone(),
));
}
}
futures.join_all().await; futures.join_all().await;
// main_loop_span.end(); // main_loop_span.end();
info!("Done"); info!("Done");
} }
async fn process_single_thread(config: &Config, db: Surreal<Client>, reqwest: reqwest::Client, crawled: Arc<RwLock<usize>>) { async fn process_single_thread(
config: &Config,
db: Surreal<Client>,
reqwest: reqwest::Client,
crawled: Arc<RwLock<usize>>,
) {
while *(crawled.read().await) < config.budget { while *(crawled.read().await) < config.budget {
let uncrawled = get_next(&db.clone(), &config).await; let uncrawled = get_next(&db.clone(), &config).await;
match uncrawled { match uncrawled {
@@ -141,11 +171,11 @@ async fn process_single_thread(config: &Config, db: Surreal<Client>, reqwest: re
// Somehow this write doesn't hang on the while's read? // Somehow this write doesn't hang on the while's read?
let mut c = crawled.write().await; let mut c = crawled.write().await;
*c += 1; *c += 1;
}, }
None => { None => {
warn!("fn::get_next() returned None"); warn!("fn::get_next() returned None");
return; return;
}, }
} }
} }
} }
@@ -154,7 +184,6 @@ async fn process_single_thread(config: &Config, db: Surreal<Client>, reqwest: re
/// Downloads and crawls and stores a webpage. /// Downloads and crawls and stores a webpage.
/// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver /// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver
async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client) { async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client) {
// METRICS // METRICS
debug!(url = &site.site.as_str(), "Process: {}", &site.site); debug!(url = &site.site.as_str(), "Process: {}", &site.site);
BEING_PROCESSED.add(1, &[]); BEING_PROCESSED.add(1, &[]);
@@ -166,6 +195,8 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
// Send the http request (get) // Send the http request (get)
GET_IN_FLIGHT.add(1, &[]); GET_IN_FLIGHT.add(1, &[]);
if let Ok(response) = request_builder.send().await { if let Ok(response) = request_builder.send().await {
let mut skip_download = false;
GET_IN_FLIGHT.add(-1, &[]); GET_IN_FLIGHT.add(-1, &[]);
let headers = response.headers(); let headers = response.headers();
@@ -180,33 +211,76 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
let ct = match (CT, ct) { let ct = match (CT, ct) {
(None, None) => { (None, None) => {
warn!("Server did not respond with Content-Type header. Url: {} Headers: ({:?})", site.site.to_string(), headers); warn!(
return "Server did not respond with Content-Type header. Url: {} Headers: ({:?})",
}, site.site.to_string(),
headers
);
return;
}
(None, Some(a)) => a, (None, Some(a)) => a,
(Some(a), None) => a, (Some(a), None) => a,
(Some(a), Some(_)) => a, (Some(a), Some(_)) => a,
}; };
// create filepath (handles / -> /index.html) // create filepath (handles / -> /index.html)
let path = filesystem::as_path(&site.site, ct); let real_path = filesystem::as_path(&site.site, ct);
let mut tmp_path= path.clone(); let mut tmp_path = real_path.clone();
if !(tmp_path.add_extension("crawl_temp")) { if !(tmp_path.add_extension("crawl_temp")) {
warn!("Failed to add extension to file"); warn!("Failed to add extension to file");
// fallback ig // fallback ig
tmp_path = tmp_path.with_extension("crawl_temp"); tmp_path = tmp_path.with_extension("crawl_temp");
} }
// CODE FOR UPDATING DOWNLOADED CONTENT:
// Check the Content-Length header (we assume the server is telling the truth) (I don't see
// a reason for it to lie in this case).
// And see if the file on the disk is the same length.
// Yes, technically this isn't perfect, but the other option is storing ETags, which I
// don't want to do right now.
if let Some(len) = headers.get("Content-Length") {
if let Ok(s) = len.to_str() {
// length is in bytes
if let Ok(len) = s.parse::<u64>() {
if let Some(disk_len) = filesystem::check_file_length(&real_path).await {
if disk_len == len {
skip_download = true;
}
} else {
// File not found (or other error).
// Program will continue on it's way, downloading content.
}
}
}
}
// make sure that the file is good to go // make sure that the file is good to go
if let Some(file) = filesystem::init(&tmp_path).await { if let Some(file) = filesystem::init(&tmp_path).await {
// Get body from response // Get body from response
// stream the response onto the disk // stream the response onto the disk
let mut stream = response.bytes_stream(); let mut stream = response.bytes_stream();
let should_parse = path.to_string_lossy().ends_with(".html"); let should_parse = real_path.to_string_lossy().ends_with(".html");
let mut writer = BufWriter::new(file);
let mut buf: Vec<u8> = Vec::new(); let mut buf: Vec<u8> = Vec::new();
if skip_download && should_parse {
// since we are skipping the download we will just read the file off the disk to
// parse it
if let Ok(mut file) = tokio::fs::OpenOptions::new()
.read(true)
.open(&real_path).await
{
if let Err(err) = file.read_to_end(&mut buf).await {
warn!("Failed to read file off disk for parsing, {}", err);
}
}
}
// !!!DOWNLOADING TIME!!!
if !skip_download {
let mut writer = BufWriter::new(file);
// Write file to disk // Write file to disk
trace!("Writing at: {:?}", tmp_path); trace!("Writing at: {:?}", tmp_path);
BEING_STREAMED.add(1, &[]); BEING_STREAMED.add(1, &[]);
@@ -222,18 +296,18 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
if should_parse { if should_parse {
data.iter().for_each(|f| buf.push(*f)); data.iter().for_each(|f| buf.push(*f));
} }
}, }
Err(err) => { Err(err) => {
error!("{err}") error!("{err}")
}, }
} }
} }
let _ = writer.flush().await; let _ = writer.flush().await;
// rename the temp file into the real file name // rename the temp file into the real file name
if let Err(err) = tokio::fs::rename(&tmp_path, &path).await { if let Err(err) = tokio::fs::rename(&tmp_path, &real_path).await {
error!( error!(
from = &*tmp_path.to_string_lossy(), from = &*tmp_path.to_string_lossy(),
to = &*path.to_string_lossy(), to = &*real_path.to_string_lossy(),
"Error renaming file: {}", "Error renaming file: {}",
err err
); );
@@ -241,6 +315,7 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
// stream_span.end(); // stream_span.end();
BEING_STREAMED.add(-1, &[]); BEING_STREAMED.add(-1, &[]);
}
// (If needed) Parse the file // (If needed) Parse the file
if should_parse { if should_parse {
@@ -319,7 +394,7 @@ fn load_logging(config: &Config) {
.with_file(true) .with_file(true)
.json() .json()
.with_writer(writer) .with_writer(writer)
.with_filter(filter) .with_filter(filter),
); );
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber"); tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
@@ -338,5 +413,3 @@ fn load_metrics(config: &Config) -> SdkMeterProvider {
.build(); .build();
metrics_provider metrics_provider
} }

View File

@@ -14,3 +14,5 @@ DEFINE FUNCTION OVERWRITE fn::get_next($filter: string) {
UPDATE $site.id SET processing = true; UPDATE $site.id SET processing = true;
RETURN $site RETURN $site
}; };
UPDATE website SET processing = false WHERE processing = true;