Compare commits

14 Commits

Author SHA1 Message Date
2c339a36f9 handle checking for file better 2025-10-09 23:00:11 -06:00
73216f7003 fix the issue where nothing works 2025-10-09 22:35:01 -06:00
1e59ebd5c4 even when not downloading, update the database 2025-10-09 22:13:06 -06:00
52d5e101d0 bragging 2025-10-09 22:03:19 -06:00
5b728bacd6 close #24, make program aware of the files already on disk 2025-10-09 21:52:41 -06:00
b0fe7f4761 close #18, format 2025-10-09 21:52:06 -06:00
5ade5e36df closes #11 2025-08-08 23:39:44 -06:00
95b8af0356 Restart threads that prematurely ended 2025-08-08 23:35:01 -06:00
ad8d7c606d increase csma/ca time 2025-08-08 23:34:45 -06:00
f3a51065b5 remove fixme 2025-07-17 09:37:03 -06:00
343d3a7570 better logging 2025-07-17 09:36:37 -06:00
e535bcc295 Merge branch 'main' of https://git.oliveratkinson.net/Oliver/internet_mapper 2025-07-17 08:59:32 -06:00
a0fd81d956 better config file 2025-07-17 08:58:30 -06:00
5cbba33a09 update how the database interactions work 2025-07-17 08:52:47 -06:00
9 changed files with 358 additions and 199 deletions

12
Cargo.lock generated
View File

@@ -1862,6 +1862,7 @@ dependencies = [
"opentelemetry", "opentelemetry",
"opentelemetry-otlp", "opentelemetry-otlp",
"opentelemetry_sdk", "opentelemetry_sdk",
"rand 0.9.1",
"reqwest", "reqwest",
"serde", "serde",
"surrealdb", "surrealdb",
@@ -2610,7 +2611,7 @@ dependencies = [
"futures-util", "futures-util",
"opentelemetry", "opentelemetry",
"percent-encoding", "percent-encoding",
"rand 0.9.0", "rand 0.9.1",
"serde_json", "serde_json",
"thiserror 2.0.12", "thiserror 2.0.12",
] ]
@@ -2998,7 +2999,7 @@ checksum = "b820744eb4dc9b57a3398183639c511b5a26d2ed702cedd3febaa1393caa22cc"
dependencies = [ dependencies = [
"bytes", "bytes",
"getrandom 0.3.2", "getrandom 0.3.2",
"rand 0.9.0", "rand 0.9.1",
"ring", "ring",
"rustc-hash 2.1.1", "rustc-hash 2.1.1",
"rustls", "rustls",
@@ -3069,13 +3070,12 @@ dependencies = [
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.9.0" version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97"
dependencies = [ dependencies = [
"rand_chacha 0.9.0", "rand_chacha 0.9.0",
"rand_core 0.9.3", "rand_core 0.9.3",
"zerocopy 0.8.23",
] ]
[[package]] [[package]]
@@ -4711,7 +4711,7 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe" checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe"
dependencies = [ dependencies = [
"rand 0.9.0", "rand 0.9.1",
"serde", "serde",
"web-time", "web-time",
] ]

View File

@@ -12,6 +12,7 @@ metrics-exporter-prometheus = { version = "0.16.2", features=["http-listener"]}
opentelemetry = "0.30.0" opentelemetry = "0.30.0"
opentelemetry-otlp = { version = "0.30.0", features = ["metrics", "trace", "logs", "grpc-tonic"] } opentelemetry-otlp = { version = "0.30.0", features = ["metrics", "trace", "logs", "grpc-tonic"] }
opentelemetry_sdk = "0.30.0" opentelemetry_sdk = "0.30.0"
rand = "0.9.1"
reqwest = { version = "0.12", features = ["gzip", "default", "rustls-tls", "stream"] } reqwest = { version = "0.12", features = ["gzip", "default", "rustls-tls", "stream"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
surrealdb = "2.2" surrealdb = "2.2"

View File

@@ -1,3 +1,11 @@
# Visability config
# Alloy (for Tempo)
tracing_endpoint = "http://localhost:4317"
# Prometheus
metrics_endpoint = "http://localhost:9090/api/v1/otlp/v1/metrics"
# Alloy (for Loki)
log_file = "./docker/logs/tracing.log"
# Surreal config # Surreal config
surreal_url = "localhost:8000" surreal_url = "localhost:8000"
surreal_username = "root" surreal_username = "root"

View File

@@ -40,14 +40,18 @@ $EDITOR Crawler.toml
- [x] Allow for storing asynchronously - dropping the "links to" logic fixes this need - [x] Allow for storing asynchronously - dropping the "links to" logic fixes this need
- [x] Control crawler via config file (no recompliation needed) - [x] Control crawler via config file (no recompliation needed)
3/17/25: Took >1hr to crawl 100 pages ### Feats
3/19/25: Took 20min to crawl 1000 pages 3/17/25: Took >1hr to crawl 100 pages.
3/19/25: Took 20min to crawl 1000 pages.
This ment we stored 1000 pages, 142,997 urls, and 1,425,798 links between the two. This ment we stored 1000 pages, 142,997 urls, and 1,425,798 links between the two.
3/20/25: Took 5min to crawl 1000 pages 3/20/25: Took 5min to crawl 1000 pages.
3/21/25: Took 3min to crawl 1000 pages 3/21/25: Took 3min to crawl 1000 pages.
7/.../25: Downloaded just shy of 12TB of data from a remote server.
# About # About

View File

@@ -1,12 +1,13 @@
use metrics::counter; use metrics::counter;
use std::fmt::Debug;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{fmt::Debug, time::Duration};
use surrealdb::{ use surrealdb::{
engine::remote::ws::{Client, Ws}, engine::remote::ws::{Client, Ws},
opt::auth::Root, opt::auth::Root,
sql::Thing, sql::Thing,
Surreal, Surreal,
}; };
use tokio::time::sleep;
use tracing::{error, instrument, trace}; use tracing::{error, instrument, trace};
use url::Url; use url::Url;
@@ -16,6 +17,7 @@ const STORE: &str = "surql_store_calls";
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash)] #[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash)]
pub struct Website { pub struct Website {
pub id: Option<Thing>,
/// The url that this data is found at /// The url that this data is found at
pub site: Url, pub site: Url,
/// Wether or not this link has been crawled yet /// Wether or not this link has been crawled yet
@@ -46,6 +48,7 @@ impl Website {
crawled, crawled,
site, site,
status_code: 0, status_code: 0,
id: None,
} }
} }
@@ -56,13 +59,13 @@ impl Website {
counter!(STORE).increment(1); counter!(STORE).increment(1);
let mut things = Vec::with_capacity(all.len()); let mut things = Vec::with_capacity(all.len());
// FIXME fails *sometimes* because "Resource Busy"
match db match db
.query( .query(
"INSERT INTO website $array "INSERT INTO website $array
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
accessed_at = time::now(), accessed_at = time::now(),
status_code = $input.status_code, status_code = $input.status_code,
processing = false,
crawled = crawled OR $input.crawled crawled = crawled OR $input.crawled
RETURN VALUE id; RETURN VALUE id;
", ",
@@ -82,18 +85,47 @@ impl Website {
} }
} }
/// Returns uncrawled links
#[instrument(skip(db, config))]
pub async fn get_next(db: &Surreal<Client>, config: &Config) -> Option<Website> {
let mut res: Option<Website> = None;
let mut fails = 0;
while res == None {
let mut response = db
.query("fn::get_next($format)")
.bind(("format", config.crawl_filter.to_string()))
.await
.expect("Hard-coded query failed..?");
res = match response.take(0) {
Ok(ok) => ok,
Err(_err) => {
// basically just CSMA/CA
let delay = rand::random_range(10..10_000);
sleep(Duration::from_millis(delay)).await;
fails += 1;
// Don't get stuck here forever, failing...
// (most I've seen is 1)
if fails > 5 {
error!("Max attempts to get_next() reached... ({fails})");
return None
}
None
}
};
}
res
}
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
#[allow(dead_code)]
pub struct Email { pub struct Email {
pub email: String, pub email: String,
pub on: String, pub on: String,
} }
#[derive(Debug, Deserialize)]
pub struct Record {
#[allow(dead_code)]
pub id: Thing,
}
#[instrument(skip_all, name = "SurrealDB")] #[instrument(skip_all, name = "SurrealDB")]
pub async fn connect(config: &Config) -> surrealdb::Result<Surreal<Client>> { pub async fn connect(config: &Config) -> surrealdb::Result<Surreal<Client>> {
trace!("Establishing connection to surreal..."); trace!("Establishing connection to surreal...");
@@ -114,12 +146,11 @@ pub async fn connect(config: &Config) -> surrealdb::Result<Surreal<Client>> {
.await?; .await?;
let setup = include_bytes!("setup.surql"); let setup = include_bytes!("setup.surql");
let file = setup.iter().map(|c| *c as char).collect::<String>(); let init_commands = setup.iter().map(|c| *c as char).collect::<String>();
db.query(file) db.query(init_commands)
.await .await
.expect("Failed to setup surreal tables."); .expect("Failed to setup surreal tables.");
Ok(db) Ok(db)
} }

View File

@@ -2,7 +2,7 @@ use std::{io::ErrorKind, path::PathBuf};
use reqwest::header::HeaderValue; use reqwest::header::HeaderValue;
use tokio::fs; use tokio::fs;
use tracing::{error, event, trace, warn, Level}; use tracing::{error, trace, warn};
use url::Url; use url::Url;
pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf { pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf {
@@ -34,14 +34,46 @@ pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf {
url_path url_path
} }
pub async fn check_file_length(file: &PathBuf) -> Option<u64> {
match tokio::fs::OpenOptions::new()
.write(false)
.read(true)
.create(false)
.open(file).await
{
Ok(file) => {
match file.metadata().await {
Ok(meta) => {
return Some(meta.len())
},
Err(err) => {
error!("Failed to get metadata. {}", err)
},
}
},
Err(err) => {
match err.kind() {
ErrorKind::NotFound => {/* ignore */},
_ => warn!("Failed to open file to check length. {:?} {}", file, err),
}
},
}
None
}
pub async fn init(filename: &PathBuf) -> Option<fs::File> { pub async fn init(filename: &PathBuf) -> Option<fs::File> {
let file = async || tokio::fs::OpenOptions::new() let file = async || tokio::fs::OpenOptions::new()
.append(true) .write(true)
.append(false)
.create(true) .create(true)
.open(&filename).await; .open(&filename).await;
match file().await { match file().await {
Ok(ok) => Some(ok), Ok(ok) => {
trace!("Initialized file {}", filename.to_str().unwrap_or("N/A"));
Some(ok)
},
Err(err) => { Err(err) => {
// the file/folder isn't found // the file/folder isn't found
if err.kind() == ErrorKind::NotFound { if err.kind() == ErrorKind::NotFound {
@@ -55,6 +87,14 @@ pub async fn init(filename: &PathBuf) -> Option<fs::File> {
} else { } else {
error!("Couldn't get file's parents: {:?}", &filename); error!("Couldn't get file's parents: {:?}", &filename);
} }
} else if err.kind() == ErrorKind::NotADirectory {
// Example:
// 1. example.com/user
// 2. example.com/user/post
// If file 1 exists it will prevent file 2 from existing
// FIXME
error!("One of the parent directories is actually a file...")
} else { } else {
error!("File open error: {err} {:?}", filename); error!("File open error: {err} {:?}", filename);
} }

View File

@@ -5,60 +5,79 @@
extern crate html5ever; extern crate html5ever;
use std::{ use std::{
collections::HashSet, fs::File, io::Read, sync::{Arc, LazyLock} collections::HashSet,
fs::File,
io::Read,
sync::{Arc, LazyLock},
}; };
use futures_util::StreamExt;
use opentelemetry::{global::{self, BoxedTracer}, metrics::{Counter, Meter, UpDownCounter}, trace::{Span, Tracer}};
use opentelemetry_otlp::{Protocol, WithExportConfig};
use db::{connect, Website}; use db::{connect, Website};
use futures_util::StreamExt;
use opentelemetry::{
global::{self},
metrics::{Counter, Meter, UpDownCounter},
};
use opentelemetry_otlp::{Protocol, WithExportConfig};
use opentelemetry_sdk::{metrics::SdkMeterProvider, trace::SdkTracerProvider};
use serde::Deserialize; use serde::Deserialize;
use surrealdb::{engine::remote::ws::Client, Surreal}; use surrealdb::{engine::remote::ws::Client, Surreal};
use tokio::{io::{AsyncWriteExt, BufWriter}, sync::RwLock, task::JoinSet}; use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufWriter},
sync::RwLock,
task::JoinSet,
};
use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, warn}; use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, warn};
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry}; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry};
use crate::db::get_next;
mod db; mod db;
mod filesystem; mod filesystem;
mod parser; mod parser;
static METER: LazyLock<Meter> = LazyLock::new(|| global::meter("Internet_Mapper")); static METER: LazyLock<Meter> = LazyLock::new(|| global::meter("Internet_Mapper"));
static BEING_PROCESSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| static BATCH_SIZE: LazyLock<Counter<u64>> =
LazyLock::new(|| METER.u64_counter("crawler_batch_size").build());
static BEING_PROCESSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER METER
.i64_up_down_counter("crawler_pages_being_processed") .i64_up_down_counter("crawler_pages_being_processed")
.build() .build()
); });
static BEING_PARSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| static BEING_PARSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER METER
.i64_up_down_counter("crawler_pages_being_parsed") .i64_up_down_counter("crawler_pages_being_parsed")
.build() .build()
); });
static BEING_STREAMED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| static BEING_STREAMED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER METER
.i64_up_down_counter("crawler_pages_being_streamed") .i64_up_down_counter("crawler_pages_being_streamed")
.build() .build()
); });
static GET_IN_FLIGHT: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| static GET_IN_FLIGHT: LazyLock<UpDownCounter<i64>> =
METER LazyLock::new(|| METER.i64_up_down_counter("crawler_gets_in_flight").build());
.i64_up_down_counter("crawler_gets_in_flight") static TOTAL_BYTES_DOWN: LazyLock<Counter<u64>> =
.build() LazyLock::new(|| METER.u64_counter("crawler_total_bytes_down").build());
); static SITES_CRAWLED: LazyLock<Counter<u64>> =
static TOTAL_BYTES_DOWN: LazyLock<Counter<u64>> = LazyLock::new(|| LazyLock::new(|| METER.u64_counter("crawler_total_sites_crawled").build());
METER
.u64_counter("crawler_total_bytes_down") static CONFIG: LazyLock<Config> = LazyLock::new(|| {
.build() let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml");
); let mut buf = String::new();
static SITES_CRAWLED: LazyLock<Counter<u64>> = LazyLock::new(|| let _ = file.read_to_string(&mut buf);
METER
.u64_counter("crawler_total_sites_crawled") let config: Config = toml::from_str(&buf).expect("Failed to parse Crawler.toml");
.build() config
); });
// FIXME Traces aren't working on multiple threads, they block // FIXME Traces aren't working on multiple threads, they block
// static TRACER: LazyLock<BoxedTracer> = LazyLock::new(|| global::tracer("Internet_Mapper")); // static TRACER: LazyLock<BoxedTracer> = LazyLock::new(|| global::tracer("Internet_Mapper"));
#[derive(Deserialize)] #[derive(Deserialize)]
struct Config { struct Config {
tracing_endpoint: String,
metrics_endpoint: String,
log_file: String,
surreal_ns: String, surreal_ns: String,
surreal_db: String, surreal_db: String,
surreal_url: String, surreal_url: String,
@@ -76,78 +95,20 @@ async fn main() {
println!("Logs and metrics are provided to the Grafana dashboard"); println!("Logs and metrics are provided to the Grafana dashboard");
// Start TRACE / LOGGING / METRICS // Start TRACE / LOGGING / METRICS
// let otlp_log = opentelemetry_otlp::LogExporter::builder() load_logging(&CONFIG); // this seems to be working ok
// .with_tonic() global::set_tracer_provider(load_tracing(&CONFIG));
// .with_endpoint(endpoint) global::set_meter_provider(load_metrics(&CONFIG));
// .build()
// .unwrap();
// Send metrics to Prometheus
let otlp_metrics = opentelemetry_otlp::MetricExporter::builder()
.with_http()
.with_protocol(Protocol::HttpBinary)
.with_endpoint("http://localhost:9090/api/v1/otlp/v1/metrics")
.build()
.unwrap();
// Send spans to Alloy (which will send them to Tempo)
let otlp_span = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint("http://localhost:4317")
.build()
.unwrap();
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() BATCH_SIZE.add(CONFIG.batch_size as u64, &[]);
.with_simple_exporter(otlp_span)
.build();
// let logger_provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder()
// .with_simple_exporter(otlp_log)
// .build();
let metrics_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
.with_periodic_exporter(otlp_metrics) // default delay is 60s, turn down to like 15
.build();
global::set_tracer_provider(tracer_provider);
global::set_meter_provider(metrics_provider);
// How to set logger?
// End TRACE
// Start LOGGING
let writer = std::fs::OpenOptions::new()
.append(true)
.create(true)
.open("./docker/logs/tracing.log")
.expect("Couldn't make log file!");
let filter = EnvFilter::builder()
.with_default_directive(LevelFilter::DEBUG.into())
.from_env_lossy();
let registry = Registry::default().with(
fmt::layer()
.with_line_number(true)
.with_thread_ids(true)
.with_file(true)
.json()
.with_writer(writer)
.with_filter(filter)
);
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
// End LOGGING
// When getting uncrawled pages, name must contain this variable. "" will effectively get ignored. // When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
// let crawl_filter = "en.wikipedia.org/"; // let crawl_filter = "en.wikipedia.org/";
// let budget = 50; // let budget = 50;
let crawled = Arc::new(RwLock::new(0)); let crawled = Arc::new(RwLock::new(0));
let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml"); let starting_url = &CONFIG.start_url;
let mut buf = String::new();
let _ = file.read_to_string(&mut buf);
let config: Arc<Config> = Arc::new(toml::from_str(&buf).expect("Failed to parse Crawler.toml")); let db = connect(&CONFIG)
let starting_url = &config.start_url;
let db = connect(&config)
.await .await
.expect("Failed to connect to surreal, aborting."); .expect("Failed to connect to surreal, aborting.");
@@ -166,28 +127,55 @@ async fn main() {
// let mut main_loop_span= TRACER.start("Main-Loop"); // let mut main_loop_span= TRACER.start("Main-Loop");
let mut futures = JoinSet::new(); let mut futures = JoinSet::new();
for _ in 0..config.batch_size { for _ in 0..CONFIG.batch_size {
futures.spawn(process_single_thread(config.clone(), db.clone(), reqwest.clone(), crawled.clone())); futures.spawn(process_single_thread(
&CONFIG,
db.clone(),
reqwest.clone(),
crawled.clone(),
));
} }
while let Some(_) = futures.join_next().await {
// Budget - Threads - This thread (1)
// Would roughly be the acceptable amount at which a thread should exit
if *(crawled.read().await) < CONFIG.budget - CONFIG.batch_size - 1 {
warn!("Thread terminated early, restarting");
futures.spawn(process_single_thread(
&CONFIG,
db.clone(),
reqwest.clone(),
crawled.clone(),
));
}
}
futures.join_all().await; futures.join_all().await;
// main_loop_span.end(); // main_loop_span.end();
info!("Done"); info!("Done");
} }
async fn process_single_thread(config: Arc<Config>, db: Surreal<Client>, reqwest: reqwest::Client, crawled: Arc<RwLock<usize>>) { async fn process_single_thread(
config: &Config,
db: Surreal<Client>,
reqwest: reqwest::Client,
crawled: Arc<RwLock<usize>>,
) {
while *(crawled.read().await) < config.budget { while *(crawled.read().await) < config.budget {
let uncrawled = get_uncrawled_links(&db.clone(), 1, &config).await; let uncrawled = get_next(&db.clone(), &config).await;
if uncrawled.is_empty() { match uncrawled {
return Some(site) => {
} process(site, db.clone(), reqwest.clone()).await;
SITES_CRAWLED.add(1, &[]);
for site in uncrawled { // Somehow this write doesn't hang on the while's read?
process(site, db.clone(), reqwest.clone()).await; let mut c = crawled.write().await;
SITES_CRAWLED.add(1, &[]); *c += 1;
// Somehow this write doesn't hang on the while's read? }
let mut c = crawled.write().await; None => {
*c += 1; warn!("fn::get_next() returned None");
return;
}
} }
} }
} }
@@ -196,18 +184,19 @@ async fn process_single_thread(config: Arc<Config>, db: Surreal<Client>, reqwest
/// Downloads and crawls and stores a webpage. /// Downloads and crawls and stores a webpage.
/// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver /// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver
async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client) { async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client) {
// METRICS // METRICS
trace!(url = &site.site.as_str(), "Process: {}", &site.site); debug!(url = &site.site.as_str(), "Process: {}", &site.site);
BEING_PROCESSED.add(1, &[]); BEING_PROCESSED.add(1, &[]);
// let mut process_span = TRACER.start("Process"); // let mut process_span = TRACER.start("Process");
// Build the request // Build the request
let request_builder = reqwest.get(site.site.to_string()); let request_builder = reqwest.get(site.site.to_string());
// Send the http request (get) // Send the http request (get)
GET_IN_FLIGHT.add(1, &[]); GET_IN_FLIGHT.add(1, &[]);
if let Ok(response) = request_builder.send().await { if let Ok(response) = request_builder.send().await {
let mut skip_download = false;
GET_IN_FLIGHT.add(-1, &[]); GET_IN_FLIGHT.add(-1, &[]);
let headers = response.headers(); let headers = response.headers();
@@ -220,69 +209,113 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
let CT = headers.get("Content-Type"); let CT = headers.get("Content-Type");
let ct = headers.get("content-type"); let ct = headers.get("content-type");
let ct = match (CT,ct) { let ct = match (CT, ct) {
(None, None) => { (None, None) => {
warn!("Server did not respond with Content-Type header. Url: {} Headers: ({:?})", site.site.to_string(), headers); warn!(
return "Server did not respond with Content-Type header. Url: {} Headers: ({:?})",
}, site.site.to_string(),
headers
);
return;
}
(None, Some(a)) => a, (None, Some(a)) => a,
(Some(a), None) => a, (Some(a), None) => a,
(Some(a), Some(_)) => a, (Some(a), Some(_)) => a,
}; };
// create filepath (handles / -> /index.html) // create filepath (handles / -> /index.html)
let path = filesystem::as_path(&site.site, ct); let real_path = filesystem::as_path(&site.site, ct);
let mut tmp_path= path.clone(); let mut tmp_path = real_path.clone();
if !(tmp_path.add_extension("crawl_temp")) { if !(tmp_path.add_extension("crawl_temp")) {
warn!("Failed to add extension to file"); warn!("Failed to add extension to file");
// fallback ig // fallback ig
tmp_path = tmp_path.with_extension("crawl_temp"); tmp_path = tmp_path.with_extension("crawl_temp");
} }
// CODE FOR UPDATING DOWNLOADED CONTENT:
// Check the Content-Length header (we assume the server is telling the truth) (I don't see
// a reason for it to lie in this case).
// And see if the file on the disk is the same length.
// Yes, technically this isn't perfect, but the other option is storing ETags, which I
// don't want to do right now.
if let Some(len) = headers.get("Content-Length") {
if let Ok(s) = len.to_str() {
// length is in bytes
if let Ok(len) = s.parse::<u64>() {
if let Some(disk_len) = filesystem::check_file_length(&real_path).await {
if disk_len == len {
skip_download = true;
}
} else {
// File not found (or other error).
// Program will continue on it's way, downloading content.
}
}
}
}
// make sure that the file is good to go // make sure that the file is good to go
if let Some(file) = filesystem::init(&tmp_path).await { if let Some(file) = filesystem::init(&tmp_path).await {
// Get body from response // Get body from response
// stream the response onto the disk // stream the response onto the disk
let mut stream = response.bytes_stream(); let mut stream = response.bytes_stream();
let should_parse = path.to_string_lossy().ends_with(".html"); let should_parse = real_path.to_string_lossy().ends_with(".html");
let mut writer = BufWriter::new(file);
let mut buf: Vec<u8> = Vec::new(); let mut buf: Vec<u8> = Vec::new();
// Write file to disk if skip_download && should_parse {
trace!("Writing at: {:?}", tmp_path); // since we are skipping the download we will just read the file off the disk to
BEING_STREAMED.add(1, &[]); // parse it
// let mut stream_span = TRACER.start("Stream"); if let Ok(mut file) = tokio::fs::OpenOptions::new()
while let Some(data) = stream.next().await { .read(true)
match data { .open(&real_path).await
Ok(data) => { {
TOTAL_BYTES_DOWN.add(data.len() as u64, &[]); if let Err(err) = file.read_to_end(&mut buf).await {
let _ = writer.write_all(&data).await; warn!("Failed to read file off disk for parsing, {}", err);
// If we are going to parse this file later, we will save it }
// into memory as well as the disk.
// We do this because the data here might be incomplete
if should_parse {
data.iter().for_each(|f| buf.push(*f));
}
},
Err(err) => {
error!("{err}")
},
} }
} }
let _ = writer.flush().await;
// rename the temp file into the real file name
if let Err(err) = tokio::fs::rename(&tmp_path, &path).await {
error!(
from = &*tmp_path.to_string_lossy(),
to = &*path.to_string_lossy(),
"Error renaming file: {}",
err
);
}
// stream_span.end(); // !!!DOWNLOADING TIME!!!
BEING_STREAMED.add(-1, &[]); if !skip_download {
let mut writer = BufWriter::new(file);
// Write file to disk
trace!("Writing at: {:?}", tmp_path);
BEING_STREAMED.add(1, &[]);
// let mut stream_span = TRACER.start("Stream");
while let Some(data) = stream.next().await {
match data {
Ok(data) => {
TOTAL_BYTES_DOWN.add(data.len() as u64, &[]);
let _ = writer.write_all(&data).await;
// If we are going to parse this file later, we will save it
// into memory as well as the disk.
// We do this because the data here might be incomplete
if should_parse {
data.iter().for_each(|f| buf.push(*f));
}
}
Err(err) => {
error!("{err}")
}
}
}
let _ = writer.flush().await;
// rename the temp file into the real file name
if let Err(err) = tokio::fs::rename(&tmp_path, &real_path).await {
error!(
from = &*tmp_path.to_string_lossy(),
to = &*real_path.to_string_lossy(),
"Error renaming file: {}",
err
);
}
// stream_span.end();
BEING_STREAMED.add(-1, &[]);
}
// (If needed) Parse the file // (If needed) Parse the file
if should_parse { if should_parse {
@@ -306,44 +339,77 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
// parsing_span.end(); // parsing_span.end();
BEING_PARSED.add(-1, &[]); BEING_PARSED.add(-1, &[]);
} else { } else {
trace!("Did not parse: {}", site.site.as_str()); trace!(url = site.site.as_str(), "Parse = False");
} }
// update self in db // update self in db
site.crawled = true; site.crawled = true;
site.status_code = code.as_u16(); site.status_code = code.as_u16();
Website::store_all(vec![site.clone()], &db).await; Website::store_all(vec![site.clone()], &db).await;
} else {
error!("File failed to cooperate: {:?}", path);
} }
} else { } else {
error!("Failed to get: {}", &site.site); error!(url = site.site.as_str(), "Failed to get: {}", &site.site);
} }
// process_span.end(); // process_span.end();
BEING_PROCESSED.add(-1, &[]); BEING_PROCESSED.add(-1, &[]);
} }
/// Returns uncrawled links fn load_tracing(config: &Config) -> SdkTracerProvider {
#[instrument(skip(db, config))] // Send spans to Alloy (which will send them to Tempo)
async fn get_uncrawled_links( let otlp_span = opentelemetry_otlp::SpanExporter::builder()
db: &Surreal<Client>, .with_tonic()
mut count: usize, .with_endpoint(config.tracing_endpoint.clone())
config: &Config, .build()
) -> Vec<Website> { .unwrap();
if count > config.batch_size { let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
count = config.batch_size; .with_simple_exporter(otlp_span)
} .build();
tracer_provider
debug!("Getting {} uncrawled links from DB", count); }
let mut response = db fn load_logging(config: &Config) {
.query("SELECT * FROM website WHERE crawled = false AND site ~ type::string($format) LIMIT $count;") // let otlp_log = opentelemetry_otlp::LogExporter::builder()
.bind(("format", config.crawl_filter.to_string())) // .with_tonic()
.bind(("count", count)) // .with_endpoint(endpoint)
.await // .build()
.expect("Hard-coded query failed..?"); // .unwrap();
response // let logger_provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder()
.take(0) // .with_simple_exporter(otlp_log)
.expect("Returned websites couldn't be parsed") // .build();
let writer = std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(config.log_file.clone())
.expect("Couldn't make log file!");
let filter = EnvFilter::builder()
.with_default_directive(LevelFilter::DEBUG.into())
.from_env_lossy();
let registry = Registry::default().with(
fmt::layer()
.with_line_number(true)
.with_thread_ids(true)
.with_file(true)
.json()
.with_writer(writer)
.with_filter(filter),
);
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
}
fn load_metrics(config: &Config) -> SdkMeterProvider {
// Send metrics to Prometheus
let otlp_metrics = opentelemetry_otlp::MetricExporter::builder()
.with_http()
.with_protocol(Protocol::HttpBinary)
.with_endpoint(config.metrics_endpoint.clone())
.build()
.unwrap();
let metrics_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
.with_periodic_exporter(otlp_metrics) // default delay is 60s, turn down to like 15
.build();
metrics_provider
} }

View File

@@ -115,7 +115,7 @@ fn try_get_url(parent: &Url, link: &str) -> Option<Url> {
} }
} }
} else { } else {
// # This is some sort of realative url, gonna try patching it up into an absolute // # This is some sort of relative url, gonna try patching it up into an absolute
// url // url
match e { match e {
url::ParseError::RelativeUrlWithoutBase => { url::ParseError::RelativeUrlWithoutBase => {

View File

@@ -4,6 +4,15 @@ DEFINE FIELD IF NOT EXISTS site ON TABLE website TYPE string;
DEFINE INDEX IF NOT EXISTS idx ON TABLE website COLUMNS site UNIQUE; DEFINE INDEX IF NOT EXISTS idx ON TABLE website COLUMNS site UNIQUE;
DEFINE FIELD IF NOT EXISTS crawled ON TABLE website TYPE bool; DEFINE FIELD IF NOT EXISTS crawled ON TABLE website TYPE bool;
DEFINE FIELD IF NOT EXISTS processing ON TABLE website TYPE bool DEFAULT false;
DEFINE FIELD IF NOT EXISTS accessed_at ON TABLE website VALUE time::now(); DEFINE FIELD IF NOT EXISTS accessed_at ON TABLE website VALUE time::now();
DEFINE FIELD IF NOT EXISTS first_accessed_at ON TABLE website VALUE time::now(); DEFINE FIELD IF NOT EXISTS first_accessed_at ON TABLE website VALUE time::now();
DEFINE FUNCTION OVERWRITE fn::get_next($filter: string) {
LET $site = SELECT * FROM ONLY website WHERE crawled = false AND processing = false AND site ~ type::string($filter) LIMIT 1;
UPDATE $site.id SET processing = true;
RETURN $site
};
UPDATE website SET processing = false WHERE processing = true;