Compare commits

..

No commits in common. "76e78cc74511d57c8784bbf7e72d6944c9378cd1" and "865f9be8c001ae89d7cd11b58c5caeb3bb1924eb" have entirely different histories.

5 changed files with 72 additions and 68 deletions

View File

@ -66,7 +66,6 @@ services:
- --enable-feature=native-histograms - --enable-feature=native-histograms
- --web.enable-remote-write-receiver - --web.enable-remote-write-receiver
- --web.enable-lifecycle - --web.enable-lifecycle
- --web.enable-otlp-receiver
- --config.file=/etc/prometheus/prometheus.yml - --config.file=/etc/prometheus/prometheus.yml
# Everything viewer # Everything viewer

View File

@ -56,7 +56,7 @@ impl Website {
counter!(STORE).increment(1); counter!(STORE).increment(1);
let mut things = Vec::with_capacity(all.len()); let mut things = Vec::with_capacity(all.len());
// FIXME fails *sometimes* because "Resource Busy" // FIXME failes *sometimes* because "Resource Busy"
match db match db
.query( .query(
"INSERT INTO website $array "INSERT INTO website $array

View File

@ -2,7 +2,7 @@ use std::{io::ErrorKind, path::PathBuf};
use reqwest::header::HeaderValue; use reqwest::header::HeaderValue;
use tokio::fs; use tokio::fs;
use tracing::{error, event, trace, warn, Level}; use tracing::{error, trace, warn};
use url::Url; use url::Url;
pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf { pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf {
@ -18,7 +18,7 @@ pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf {
}; };
if let Some((ttype, subtype)) = ttype.split_once('/') { if let Some((ttype, subtype)) = ttype.split_once('/') {
trace!(url = url.to_string(), main_type = ttype, sub_type = subtype, "Found Content-Type to be: {ttype}/{subtype}"); trace!("Found Content-Type to be: {ttype}/{subtype} for {}", url.to_string());
// If the Content-Type header is "*/html" (most likely "text/html") and the path's // If the Content-Type header is "*/html" (most likely "text/html") and the path's
// extension is anything but html: // extension is anything but html:
if subtype=="html" && !url_path.extension().is_some_and(|f| f=="html" || f=="htm" ) { if subtype=="html" && !url_path.extension().is_some_and(|f| f=="html" || f=="htm" ) {
@ -29,7 +29,7 @@ pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf {
} else { } else {
warn!("Header: {:?} couldn't be parsed into a string!", content_type); warn!("Header: {:?} couldn't be parsed into a string!", content_type);
} }
trace!(url = url.to_string(), path = &*url_path.to_string_lossy(), "Converted URL into path"); trace!("Final path for {} is: {:?}", url, url_path);
url_path url_path
} }
@ -49,6 +49,7 @@ pub async fn init(filename: &PathBuf) -> Option<fs::File> {
// create the folders // create the folders
if let Err(err) = fs::create_dir_all(&parent).await { if let Err(err) = fs::create_dir_all(&parent).await {
error!("Dir creation: {err} {:?}", filename); error!("Dir creation: {err} {:?}", filename);
eprintln!("{}", err)
} else if let Ok(ok) = file().await { } else if let Ok(ok) = file().await {
return Some(ok); return Some(ok);
} }

View File

@ -1,11 +1,12 @@
#![feature(ip_from)] #![feature(ip_from)]
#![feature(path_add_extension)] #![feature(path_add_extension)]
#![warn(clippy::expect_used)]
#![deny(clippy::unwrap_used)] #![deny(clippy::unwrap_used)]
extern crate html5ever; extern crate html5ever;
use std::{ use std::{
collections::HashSet, fs::File, io::Read, sync::{Arc, LazyLock} collections::HashSet, fs::File, io::Read, sync::LazyLock
}; };
use futures_util::StreamExt; use futures_util::StreamExt;
@ -14,7 +15,7 @@ use opentelemetry_otlp::{Protocol, WithExportConfig};
use db::{connect, Website}; use db::{connect, Website};
use serde::Deserialize; use serde::Deserialize;
use surrealdb::{engine::remote::ws::Client, Surreal}; use surrealdb::{engine::remote::ws::Client, Surreal};
use tokio::{io::{AsyncWriteExt, BufWriter}, sync::RwLock, task::JoinSet}; use tokio::{io::{AsyncWriteExt, BufWriter}, task::JoinSet};
use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, warn}; use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, warn};
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry}; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry};
@ -56,8 +57,7 @@ static SITES_CRAWLED: LazyLock<Counter<u64>> = LazyLock::new(||
.build() .build()
); );
// FIXME Traces aren't working on multiple threads, they block static TRACER: LazyLock<BoxedTracer> = LazyLock::new(|| global::tracer("Internet_Mapper"));
// static TRACER: LazyLock<BoxedTracer> = LazyLock::new(|| global::tracer("Internet_Mapper"));
#[derive(Deserialize)] #[derive(Deserialize)]
struct Config { struct Config {
@ -137,16 +137,18 @@ async fn main() {
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber"); tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
// End LOGGING // End LOGGING
info!("Starting...");
// When getting uncrawled pages, name must contain this variable. "" will effectively get ignored. // When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
// let crawl_filter = "en.wikipedia.org/"; // let crawl_filter = "en.wikipedia.org/";
// let budget = 50; // let budget = 50;
let crawled = Arc::new(RwLock::new(0)); let mut crawled = 0;
let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml"); let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml");
let mut buf = String::new(); let mut buf = String::new();
let _ = file.read_to_string(&mut buf); let _ = file.read_to_string(&mut buf);
let config: Arc<Config> = Arc::new(toml::from_str(&buf).expect("Failed to parse Crawler.toml")); let config: Config = toml::from_str(&buf).expect("Failed to parse Crawler.toml");
let starting_url = &config.start_url; let starting_url = &config.start_url;
let db = connect(&config) let db = connect(&config)
@ -161,48 +163,58 @@ async fn main() {
// Kick off the whole machine - This Website object doesn't matter, it's just to allow for // Kick off the whole machine - This Website object doesn't matter, it's just to allow for
// get() to work. // get() to work.
// let mut span = TRACER.start("Pre-Loop"); let mut span = TRACER.start("Pre-Loop");
let site = Website::new(starting_url, false); let site = Website::new(starting_url, false);
process(site, db.clone(), reqwest.clone()).await; process(site, db.clone(), reqwest.clone()).await;
// span.end(); span.end();
// Download the site
// let mut main_loop_span= TRACER.start("Main-Loop"); let mut main_loop_span= TRACER.start("Main-Loop");
let mut futures = JoinSet::new(); while crawled < config.budget {
for _ in 0..config.batch_size { let uncrawled =
futures.spawn(process_single_thread(config.clone(), db.clone(), reqwest.clone(), crawled.clone())); get_uncrawled_links(&db, config.budget - crawled, config.crawl_filter.clone(), &config).await;
if uncrawled.is_empty() {
info!("Had more budget but finished crawling everything.");
return;
}
{
let mut futures = JoinSet::new();
for site in uncrawled {
futures.spawn(process(site, db.clone(), reqwest.clone()));
}
// As futures complete runs code in while block
while futures.join_next().await.is_some() {
SITES_CRAWLED.add(1, &[]);
crawled += 1;
}
}
}
main_loop_span.end();
if let Ok(mut ok) = db
.query("count(select id from website where crawled = true)")
.await
{
let res = ok.take::<Option<usize>>(0);
if let Ok(Some(n)) = res {
info!("Total crawled pages now equals {n}");
}
} }
futures.join_all().await;
// main_loop_span.end();
info!("Done"); info!("Done");
} }
async fn process_single_thread(config: Arc<Config>, db: Surreal<Client>, reqwest: reqwest::Client, crawled: Arc<RwLock<usize>>) {
while *(crawled.read().await) < config.budget {
let uncrawled = get_uncrawled_links(&db.clone(), 1, &config).await;
if uncrawled.is_empty() {
return
}
for site in uncrawled {
process(site, db.clone(), reqwest.clone()).await;
SITES_CRAWLED.add(1, &[]);
// Somehow this write doesn't hang on the while's read?
let mut c = crawled.write().await;
*c += 1;
}
}
}
#[instrument(skip(db, reqwest))] #[instrument(skip(db, reqwest))]
/// Downloads and crawls and stores a webpage. /// Downloads and crawls and stores a webpage.
/// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver /// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver
async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client) { async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client) {
// METRICS // METRICS
trace!(url = &site.site.as_str(), "Process: {}", &site.site); trace!("Process: {}", &site.site);
BEING_PROCESSED.add(1, &[]); BEING_PROCESSED.add(1, &[]);
// let mut process_span = TRACER.start("Process"); let mut process_span = TRACER.start("Process");
// Build the request // Build the request
let request_builder = reqwest.get(site.site.to_string()); let request_builder = reqwest.get(site.site.to_string());
@ -214,9 +226,6 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
let headers = response.headers(); let headers = response.headers();
let code = response.status(); let code = response.status();
if code != 200 {
warn!("{code} for {}", site.site.as_str());
}
#[allow(non_snake_case)] #[allow(non_snake_case)]
let CT = headers.get("Content-Type"); let CT = headers.get("Content-Type");
@ -252,9 +261,9 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
let mut buf: Vec<u8> = Vec::new(); let mut buf: Vec<u8> = Vec::new();
// Write file to disk // Write file to disk
trace!("Writing at: {:?}", tmp_path); trace!("Writing at: {:?}", path);
BEING_STREAMED.add(1, &[]); BEING_STREAMED.add(1, &[]);
// let mut stream_span = TRACER.start("Stream"); let mut stream_span = TRACER.start("Stream");
while let Some(data) = stream.next().await { while let Some(data) = stream.next().await {
match data { match data {
Ok(data) => { Ok(data) => {
@ -268,28 +277,23 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
} }
}, },
Err(err) => { Err(err) => {
error!("{err}") eprintln!("{}", err)
}, },
} }
} }
let _ = writer.flush().await; let _ = writer.flush();
// rename the temp file into the real file name // rename the temp file into the real file name
if let Err(err) = tokio::fs::rename(&tmp_path, &path).await { if let Err(err) = tokio::fs::rename(tmp_path, path).await {
error!( error!("{}", err);
from = &*tmp_path.to_string_lossy(),
to = &*path.to_string_lossy(),
"Error renaming file: {}",
err
);
} }
// stream_span.end(); stream_span.end();
BEING_STREAMED.add(-1, &[]); BEING_STREAMED.add(-1, &[]);
// (If needed) Parse the file // (If needed) Parse the file
if should_parse { if should_parse {
BEING_PARSED.add(1, &[]); BEING_PARSED.add(1, &[]);
// let mut parsing_span = TRACER.start("Parsing"); let mut parsing_span = TRACER.start("Parsing");
// Parse document and get relationships // Parse document and get relationships
let sites = parser::parse(&site, &buf).await; let sites = parser::parse(&site, &buf).await;
@ -305,10 +309,8 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
// Store all the other sites so that we can link to them. // Store all the other sites so that we can link to them.
let _ = Website::store_all(de_dupe_sites, &db).await; let _ = Website::store_all(de_dupe_sites, &db).await;
// parsing_span.end(); parsing_span.end();
BEING_PARSED.add(-1, &[]); BEING_PARSED.add(-1, &[]);
} else {
trace!("Did not parse: {}", site.site.as_str());
} }
// update self in db // update self in db
@ -322,7 +324,7 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
error!("Failed to get: {}", &site.site); error!("Failed to get: {}", &site.site);
} }
// process_span.end(); process_span.end();
BEING_PROCESSED.add(-1, &[]); BEING_PROCESSED.add(-1, &[]);
} }
@ -331,6 +333,7 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
async fn get_uncrawled_links( async fn get_uncrawled_links(
db: &Surreal<Client>, db: &Surreal<Client>,
mut count: usize, mut count: usize,
filter: String,
config: &Config, config: &Config,
) -> Vec<Website> { ) -> Vec<Website> {
if count > config.batch_size { if count > config.batch_size {
@ -341,7 +344,7 @@ async fn get_uncrawled_links(
let mut response = db let mut response = db
.query("SELECT * FROM website WHERE crawled = false AND site ~ type::string($format) LIMIT $count;") .query("SELECT * FROM website WHERE crawled = false AND site ~ type::string($format) LIMIT $count;")
.bind(("format", config.crawl_filter.to_string())) .bind(("format", filter))
.bind(("count", count)) .bind(("count", count))
.await .await
.expect("Hard-coded query failed..?"); .expect("Hard-coded query failed..?");

View File

@ -4,7 +4,7 @@ use html5ever::tokenizer::{BufferQueue, TokenizerResult};
use html5ever::tokenizer::{StartTag, TagToken}; use html5ever::tokenizer::{StartTag, TagToken};
use html5ever::tokenizer::{Token, TokenSink, TokenSinkResult, Tokenizer, TokenizerOpts}; use html5ever::tokenizer::{Token, TokenSink, TokenSinkResult, Tokenizer, TokenizerOpts};
use html5ever::{local_name, tendril::*}; use html5ever::{local_name, tendril::*};
use tracing::{error, instrument, trace, warn}; use tracing::{debug, error, instrument, trace, warn};
use url::Url; use url::Url;
use crate::db::Website; use crate::db::Website;
@ -12,6 +12,7 @@ use crate::db::Website;
impl TokenSink for Website { impl TokenSink for Website {
type Handle = Vec<Website>; type Handle = Vec<Website>;
#[instrument(skip(token, _line_number))]
fn process_token(&self, token: Token, _line_number: u64) -> TokenSinkResult<Self::Handle> { fn process_token(&self, token: Token, _line_number: u64) -> TokenSinkResult<Self::Handle> {
match token { match token {
TagToken(tag) => { TagToken(tag) => {
@ -32,13 +33,13 @@ impl TokenSink for Website {
let attr_name = attr.name.local.to_string(); let attr_name = attr.name.local.to_string();
if attr_name == "src" || attr_name == "href" || attr_name == "data" if attr_name == "src" || attr_name == "href" || attr_name == "data"
{ {
trace!(url = self.site.as_str(),"Found `{}` in html `{}` tag", &attr.value, tag.name); trace!("Found `{}` in html `{}` tag", &attr.value, tag.name);
let url = try_get_url(&self.site, &attr.value); let url = try_get_url(&self.site, &attr.value);
if let Some(mut parsed) = url { if let Some(mut parsed) = url {
parsed.set_query(None); parsed.set_query(None);
parsed.set_fragment(None); parsed.set_fragment(None);
trace!(url = self.site.as_str(), "Final cleaned URL: `{}`", parsed.to_string()); trace!("Final cleaned URL: `{}`", parsed.to_string());
let web = Website::new(&parsed.to_string(), false); let web = Website::new(&parsed.to_string(), false);
links.push(web); links.push(web);
} }
@ -59,10 +60,10 @@ impl TokenSink for Website {
} }
} }
#[instrument(skip(data))] #[instrument(skip_all)]
/// Parses the passed site and returns all the sites it links to. /// Parses the passed site and returns all the sites it links to.
pub async fn parse(site: &Website, data: &[u8]) -> Vec<Website> { pub async fn parse(site: &Website, data: &[u8]) -> Vec<Website> {
trace!(url = site.site.as_str(), "Parsing {}", site.site.to_string()); trace!("Parsing {}", site.site.to_string());
// prep work // prep work
let mut other_sites: Vec<Website> = Vec::new(); let mut other_sites: Vec<Website> = Vec::new();
@ -87,7 +88,7 @@ pub async fn parse(site: &Website, data: &[u8]) -> Vec<Website> {
assert!(token_buffer.is_empty()); assert!(token_buffer.is_empty());
tokenizer.end(); tokenizer.end();
} else { } else {
warn!(url = site.site.as_str(), "Tendril failed to parse on: {}", site.site.to_string()); warn!("Tendril failed to parse on: {}", site.site.to_string());
} }
other_sites other_sites
@ -99,7 +100,7 @@ fn try_get_url(parent: &Url, link: &str) -> Option<Url> {
Ok(ok) => Some(ok), Ok(ok) => Some(ok),
Err(e) => { Err(e) => {
if link.starts_with('#') { if link.starts_with('#') {
trace!(url = parent.as_str(), "Rejecting # url"); trace!("Rejecting # url");
None None
} else if link.starts_with("//") { } else if link.starts_with("//") {
// if a url starts with "//" is assumed that it will adopt // if a url starts with "//" is assumed that it will adopt
@ -107,7 +108,7 @@ fn try_get_url(parent: &Url, link: &str) -> Option<Url> {
// https://stackoverflow.com/questions/9646407/two-forward-slashes-in-a-url-src-href-attribute // https://stackoverflow.com/questions/9646407/two-forward-slashes-in-a-url-src-href-attribute
let scheme = parent.scheme(); let scheme = parent.scheme();
match Url::parse(&format!("{scheme}://{link}")) { match Url::parse(&format!("{scheme}://{}", link)) {
Ok(url) => Some(url), Ok(url) => Some(url),
Err(err) => { Err(err) => {
error!("Failed parsing relative scheme url: {}", err); error!("Failed parsing relative scheme url: {}", err);
@ -127,7 +128,7 @@ fn try_get_url(parent: &Url, link: &str) -> Option<Url> {
let url = origin.clone() + link; let url = origin.clone() + link;
if let Ok(url) = Url::parse(&url) { if let Ok(url) = Url::parse(&url) {
trace!(url = parent.as_str(), "Built `{url}` from `{origin} + `{}`", link.to_string()); trace!("Built `{url}` from `{origin} + `{}`", link.to_string());
Some(url) Some(url)
} else { } else {
error!( error!(