Compare commits

...

4 Commits

Author SHA1 Message Date
76e78cc745 better logging 2025-07-16 16:02:16 -06:00
b4038b76dd fix prometheus lol 2025-07-16 16:02:07 -06:00
caa523f1eb cleanup 2025-07-16 11:48:23 -06:00
f7bb0eef16 turn program into batch_size parrallel downloaders 2025-07-16 11:47:42 -06:00
5 changed files with 68 additions and 72 deletions

View File

@ -66,6 +66,7 @@ services:
- --enable-feature=native-histograms
- --web.enable-remote-write-receiver
- --web.enable-lifecycle
- --web.enable-otlp-receiver
- --config.file=/etc/prometheus/prometheus.yml
# Everything viewer

View File

@ -56,7 +56,7 @@ impl Website {
counter!(STORE).increment(1);
let mut things = Vec::with_capacity(all.len());
// FIXME failes *sometimes* because "Resource Busy"
// FIXME fails *sometimes* because "Resource Busy"
match db
.query(
"INSERT INTO website $array

View File

@ -2,7 +2,7 @@ use std::{io::ErrorKind, path::PathBuf};
use reqwest::header::HeaderValue;
use tokio::fs;
use tracing::{error, trace, warn};
use tracing::{error, event, trace, warn, Level};
use url::Url;
pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf {
@ -18,7 +18,7 @@ pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf {
};
if let Some((ttype, subtype)) = ttype.split_once('/') {
trace!("Found Content-Type to be: {ttype}/{subtype} for {}", url.to_string());
trace!(url = url.to_string(), main_type = ttype, sub_type = subtype, "Found Content-Type to be: {ttype}/{subtype}");
// If the Content-Type header is "*/html" (most likely "text/html") and the path's
// extension is anything but html:
if subtype=="html" && !url_path.extension().is_some_and(|f| f=="html" || f=="htm" ) {
@ -29,7 +29,7 @@ pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf {
} else {
warn!("Header: {:?} couldn't be parsed into a string!", content_type);
}
trace!("Final path for {} is: {:?}", url, url_path);
trace!(url = url.to_string(), path = &*url_path.to_string_lossy(), "Converted URL into path");
url_path
}
@ -49,7 +49,6 @@ pub async fn init(filename: &PathBuf) -> Option<fs::File> {
// create the folders
if let Err(err) = fs::create_dir_all(&parent).await {
error!("Dir creation: {err} {:?}", filename);
eprintln!("{}", err)
} else if let Ok(ok) = file().await {
return Some(ok);
}

View File

@ -1,12 +1,11 @@
#![feature(ip_from)]
#![feature(path_add_extension)]
#![warn(clippy::expect_used)]
#![deny(clippy::unwrap_used)]
extern crate html5ever;
use std::{
collections::HashSet, fs::File, io::Read, sync::LazyLock
collections::HashSet, fs::File, io::Read, sync::{Arc, LazyLock}
};
use futures_util::StreamExt;
@ -15,7 +14,7 @@ use opentelemetry_otlp::{Protocol, WithExportConfig};
use db::{connect, Website};
use serde::Deserialize;
use surrealdb::{engine::remote::ws::Client, Surreal};
use tokio::{io::{AsyncWriteExt, BufWriter}, task::JoinSet};
use tokio::{io::{AsyncWriteExt, BufWriter}, sync::RwLock, task::JoinSet};
use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, warn};
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry};
@ -57,7 +56,8 @@ static SITES_CRAWLED: LazyLock<Counter<u64>> = LazyLock::new(||
.build()
);
static TRACER: LazyLock<BoxedTracer> = LazyLock::new(|| global::tracer("Internet_Mapper"));
// FIXME Traces aren't working on multiple threads, they block
// static TRACER: LazyLock<BoxedTracer> = LazyLock::new(|| global::tracer("Internet_Mapper"));
#[derive(Deserialize)]
struct Config {
@ -137,18 +137,16 @@ async fn main() {
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
// End LOGGING
info!("Starting...");
// When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
// let crawl_filter = "en.wikipedia.org/";
// let budget = 50;
let mut crawled = 0;
let crawled = Arc::new(RwLock::new(0));
let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml");
let mut buf = String::new();
let _ = file.read_to_string(&mut buf);
let config: Config = toml::from_str(&buf).expect("Failed to parse Crawler.toml");
let config: Arc<Config> = Arc::new(toml::from_str(&buf).expect("Failed to parse Crawler.toml"));
let starting_url = &config.start_url;
let db = connect(&config)
@ -163,58 +161,48 @@ async fn main() {
// Kick off the whole machine - This Website object doesn't matter, it's just to allow for
// get() to work.
let mut span = TRACER.start("Pre-Loop");
// let mut span = TRACER.start("Pre-Loop");
let site = Website::new(starting_url, false);
process(site, db.clone(), reqwest.clone()).await;
span.end();
// Download the site
// span.end();
let mut main_loop_span= TRACER.start("Main-Loop");
while crawled < config.budget {
let uncrawled =
get_uncrawled_links(&db, config.budget - crawled, config.crawl_filter.clone(), &config).await;
if uncrawled.is_empty() {
info!("Had more budget but finished crawling everything.");
return;
}
{
let mut futures = JoinSet::new();
for site in uncrawled {
futures.spawn(process(site, db.clone(), reqwest.clone()));
}
// As futures complete runs code in while block
while futures.join_next().await.is_some() {
SITES_CRAWLED.add(1, &[]);
crawled += 1;
}
}
}
main_loop_span.end();
if let Ok(mut ok) = db
.query("count(select id from website where crawled = true)")
.await
{
let res = ok.take::<Option<usize>>(0);
if let Ok(Some(n)) = res {
info!("Total crawled pages now equals {n}");
}
// let mut main_loop_span= TRACER.start("Main-Loop");
let mut futures = JoinSet::new();
for _ in 0..config.batch_size {
futures.spawn(process_single_thread(config.clone(), db.clone(), reqwest.clone(), crawled.clone()));
}
futures.join_all().await;
// main_loop_span.end();
info!("Done");
}
async fn process_single_thread(config: Arc<Config>, db: Surreal<Client>, reqwest: reqwest::Client, crawled: Arc<RwLock<usize>>) {
while *(crawled.read().await) < config.budget {
let uncrawled = get_uncrawled_links(&db.clone(), 1, &config).await;
if uncrawled.is_empty() {
return
}
for site in uncrawled {
process(site, db.clone(), reqwest.clone()).await;
SITES_CRAWLED.add(1, &[]);
// Somehow this write doesn't hang on the while's read?
let mut c = crawled.write().await;
*c += 1;
}
}
}
#[instrument(skip(db, reqwest))]
/// Downloads and crawls and stores a webpage.
/// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver
async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client) {
// METRICS
trace!("Process: {}", &site.site);
trace!(url = &site.site.as_str(), "Process: {}", &site.site);
BEING_PROCESSED.add(1, &[]);
let mut process_span = TRACER.start("Process");
// let mut process_span = TRACER.start("Process");
// Build the request
let request_builder = reqwest.get(site.site.to_string());
@ -226,6 +214,9 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
let headers = response.headers();
let code = response.status();
if code != 200 {
warn!("{code} for {}", site.site.as_str());
}
#[allow(non_snake_case)]
let CT = headers.get("Content-Type");
@ -261,9 +252,9 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
let mut buf: Vec<u8> = Vec::new();
// Write file to disk
trace!("Writing at: {:?}", path);
trace!("Writing at: {:?}", tmp_path);
BEING_STREAMED.add(1, &[]);
let mut stream_span = TRACER.start("Stream");
// let mut stream_span = TRACER.start("Stream");
while let Some(data) = stream.next().await {
match data {
Ok(data) => {
@ -277,23 +268,28 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
}
},
Err(err) => {
eprintln!("{}", err)
error!("{err}")
},
}
}
let _ = writer.flush();
let _ = writer.flush().await;
// rename the temp file into the real file name
if let Err(err) = tokio::fs::rename(tmp_path, path).await {
error!("{}", err);
if let Err(err) = tokio::fs::rename(&tmp_path, &path).await {
error!(
from = &*tmp_path.to_string_lossy(),
to = &*path.to_string_lossy(),
"Error renaming file: {}",
err
);
}
stream_span.end();
// stream_span.end();
BEING_STREAMED.add(-1, &[]);
// (If needed) Parse the file
if should_parse {
BEING_PARSED.add(1, &[]);
let mut parsing_span = TRACER.start("Parsing");
// let mut parsing_span = TRACER.start("Parsing");
// Parse document and get relationships
let sites = parser::parse(&site, &buf).await;
@ -309,8 +305,10 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
// Store all the other sites so that we can link to them.
let _ = Website::store_all(de_dupe_sites, &db).await;
parsing_span.end();
// parsing_span.end();
BEING_PARSED.add(-1, &[]);
} else {
trace!("Did not parse: {}", site.site.as_str());
}
// update self in db
@ -324,7 +322,7 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
error!("Failed to get: {}", &site.site);
}
process_span.end();
// process_span.end();
BEING_PROCESSED.add(-1, &[]);
}
@ -333,7 +331,6 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
async fn get_uncrawled_links(
db: &Surreal<Client>,
mut count: usize,
filter: String,
config: &Config,
) -> Vec<Website> {
if count > config.batch_size {
@ -344,7 +341,7 @@ async fn get_uncrawled_links(
let mut response = db
.query("SELECT * FROM website WHERE crawled = false AND site ~ type::string($format) LIMIT $count;")
.bind(("format", filter))
.bind(("format", config.crawl_filter.to_string()))
.bind(("count", count))
.await
.expect("Hard-coded query failed..?");

View File

@ -4,7 +4,7 @@ use html5ever::tokenizer::{BufferQueue, TokenizerResult};
use html5ever::tokenizer::{StartTag, TagToken};
use html5ever::tokenizer::{Token, TokenSink, TokenSinkResult, Tokenizer, TokenizerOpts};
use html5ever::{local_name, tendril::*};
use tracing::{debug, error, instrument, trace, warn};
use tracing::{error, instrument, trace, warn};
use url::Url;
use crate::db::Website;
@ -12,7 +12,6 @@ use crate::db::Website;
impl TokenSink for Website {
type Handle = Vec<Website>;
#[instrument(skip(token, _line_number))]
fn process_token(&self, token: Token, _line_number: u64) -> TokenSinkResult<Self::Handle> {
match token {
TagToken(tag) => {
@ -33,13 +32,13 @@ impl TokenSink for Website {
let attr_name = attr.name.local.to_string();
if attr_name == "src" || attr_name == "href" || attr_name == "data"
{
trace!("Found `{}` in html `{}` tag", &attr.value, tag.name);
trace!(url = self.site.as_str(),"Found `{}` in html `{}` tag", &attr.value, tag.name);
let url = try_get_url(&self.site, &attr.value);
if let Some(mut parsed) = url {
parsed.set_query(None);
parsed.set_fragment(None);
trace!("Final cleaned URL: `{}`", parsed.to_string());
trace!(url = self.site.as_str(), "Final cleaned URL: `{}`", parsed.to_string());
let web = Website::new(&parsed.to_string(), false);
links.push(web);
}
@ -60,10 +59,10 @@ impl TokenSink for Website {
}
}
#[instrument(skip_all)]
#[instrument(skip(data))]
/// Parses the passed site and returns all the sites it links to.
pub async fn parse(site: &Website, data: &[u8]) -> Vec<Website> {
trace!("Parsing {}", site.site.to_string());
trace!(url = site.site.as_str(), "Parsing {}", site.site.to_string());
// prep work
let mut other_sites: Vec<Website> = Vec::new();
@ -88,7 +87,7 @@ pub async fn parse(site: &Website, data: &[u8]) -> Vec<Website> {
assert!(token_buffer.is_empty());
tokenizer.end();
} else {
warn!("Tendril failed to parse on: {}", site.site.to_string());
warn!(url = site.site.as_str(), "Tendril failed to parse on: {}", site.site.to_string());
}
other_sites
@ -100,7 +99,7 @@ fn try_get_url(parent: &Url, link: &str) -> Option<Url> {
Ok(ok) => Some(ok),
Err(e) => {
if link.starts_with('#') {
trace!("Rejecting # url");
trace!(url = parent.as_str(), "Rejecting # url");
None
} else if link.starts_with("//") {
// if a url starts with "//" is assumed that it will adopt
@ -108,7 +107,7 @@ fn try_get_url(parent: &Url, link: &str) -> Option<Url> {
// https://stackoverflow.com/questions/9646407/two-forward-slashes-in-a-url-src-href-attribute
let scheme = parent.scheme();
match Url::parse(&format!("{scheme}://{}", link)) {
match Url::parse(&format!("{scheme}://{link}")) {
Ok(url) => Some(url),
Err(err) => {
error!("Failed parsing relative scheme url: {}", err);
@ -128,7 +127,7 @@ fn try_get_url(parent: &Url, link: &str) -> Option<Url> {
let url = origin.clone() + link;
if let Ok(url) = Url::parse(&url) {
trace!("Built `{url}` from `{origin} + `{}`", link.to_string());
trace!(url = parent.as_str(), "Built `{url}` from `{origin} + `{}`", link.to_string());
Some(url)
} else {
error!(