1 Commits

Author SHA1 Message Date
4989a59ddf checkpoint 2025-07-10 18:46:25 -06:00
17 changed files with 560 additions and 3138 deletions

1
.gitignore vendored
View File

@@ -6,4 +6,3 @@ flamegraph.svg
perf.data.old perf.data.old
/docker/logs/* /docker/logs/*
/downloaded /downloaded
/Crawler.toml

19
.vscode/launch.json vendored
View File

@@ -7,15 +7,18 @@
{ {
"type": "lldb", "type": "lldb",
"request": "launch", "request": "launch",
"name": "Debug executable 'internet_mapper'", "name": "Debug executable 'surreal_spider'",
"env": {
"RUST_LOG": "surreal_spider=trace,reqwest=info",
},
"cargo": { "cargo": {
"args": [ "args": [
"build", "build",
"--bin=internet_mapper", "--bin=surreal_spider",
"--package=internet_mapper" "--package=surreal_spider"
], ],
"filter": { "filter": {
"name": "internet_mapper", "name": "surreal_spider",
"kind": "bin" "kind": "bin"
} }
}, },
@@ -25,16 +28,16 @@
{ {
"type": "lldb", "type": "lldb",
"request": "launch", "request": "launch",
"name": "Debug unit tests in executable 'internet_mapper'", "name": "Debug unit tests in executable 'surreal_spider'",
"cargo": { "cargo": {
"args": [ "args": [
"test", "test",
"--no-run", "--no-run",
"--bin=internet_mapper", "--bin=surreal_spider",
"--package=internet_mapper" "--package=surreal_spider"
], ],
"filter": { "filter": {
"name": "internet_mapper", "name": "surreal_spider",
"kind": "bin" "kind": "bin"
} }
}, },

View File

@@ -3,6 +3,6 @@
"creds", "creds",
"reqwest", "reqwest",
"rustls", "rustls",
"surql", "surql"
] ]
} }

2569
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -5,17 +5,14 @@ edition = "2021"
[dependencies] [dependencies]
base64 = "0.22.1" base64 = "0.22.1"
futures-util = "0.3.31"
html5ever = "0.29" html5ever = "0.29"
metrics = "0.24.1" metrics = "0.24.1"
metrics-exporter-prometheus = { version = "0.16.2", features=["http-listener"]} metrics-exporter-prometheus = { version = "0.16.2", features=["http-listener"]}
opentelemetry = "0.30.0" # minio = "0.1.0"
opentelemetry-otlp = { version = "0.30.0", features = ["metrics", "trace", "logs", "grpc-tonic"] } minio = {git="https://github.com/minio/minio-rs.git", rev = "c28f576"}
opentelemetry_sdk = "0.30.0" reqwest = { version = "0.12", features = ["gzip", "default", "rustls-tls"] }
rand = "0.9.1" rusqlite = { version = "0.34.0", features = ["bundled"] }
reqwest = { version = "0.12", features = ["gzip", "default", "rustls-tls", "stream"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
surrealdb = "2.2"
tokio = { version="1.41.0", features = ["full"] } tokio = { version="1.41.0", features = ["full"] }
toml = "0.8.20" toml = "0.8.20"
tracing = "0.1" tracing = "0.1"

View File

@@ -1,22 +1,10 @@
# Visability config
# Alloy (for Tempo)
tracing_endpoint = "http://localhost:4317"
# Prometheus
metrics_endpoint = "http://localhost:9090/api/v1/otlp/v1/metrics"
# Alloy (for Loki)
log_file = "./docker/logs/tracing.log"
# Surreal config # Surreal config
surreal_url = "localhost:8000" surreal_url = "localhost:8000"
surreal_username = "root" surreal_username = "root"
surreal_password = "root" surreal_password = "root"
surreal_ns = "test" surreal_ns = "test"
surreal_db = "v1.21.1" surreal_db = "v1.19.2"
# Crawler config # Crawler config
crawl_filter = "https://ftpgeoinfo.msl.mt.gov/Data/Spatial/MSDI/Imagery/2023_NAIP/UTM_County_Mosaics/" crawl_filter = "en.wikipedia.com"
# crawl_filter = "https://oliveratkinson.net" budget = 1000
start_url = "https://ftpgeoinfo.msl.mt.gov/Data/Spatial/MSDI/Imagery/2023_NAIP/UTM_County_Mosaics/"
# start_url = "https://oliveratkinson.net"
budget = 100
batch_size = 2

View File

@@ -2,56 +2,22 @@
Crawls sites saving all the found links to a surrealdb database. It then proceeds to take batches of 100 uncrawled links untill the crawl budget is reached. It saves the data of each site in a minio database. Crawls sites saving all the found links to a surrealdb database. It then proceeds to take batches of 100 uncrawled links untill the crawl budget is reached. It saves the data of each site in a minio database.
## How to use
1. Clone the repo and `cd` into it.
2. Build the repo with `cargo build -r`
3. Start the docker conatiners
1. cd into the docker folder `cd docker`
2. Bring up the docker containers `docker compose up -d`
4. From the project's root, edit the `Crawler.toml` file to your liking.
5. Run with `./target/release/internet_mapper`
You can view stats of the project at `http://<your-ip>:3000/dashboards`
```bash
# Untested script but probably works
git clone https://git.oliveratkinson.net/Oliver/internet_mapper.git
cd internet_mapper
cargo build -r
cd docker
docker compose up -d
cd ..
$EDITOR Crawler.toml
./target/release/internet_mapper
```
### TODO ### TODO
- [x] Domain filtering - prevent the crawler from going on alternate versions of wikipedia. - [ ] Domain filtering - prevent the crawler from going on alternate versions of wikipedia.
- [ ] Conditionally save content - based on filename or file contents - [ ] Conditionally save content - based on filename or file contents
- [x] GUI / TUI ? - Graphana - [x] GUI / TUI ? - Graphana
- [x] Better asynchronous getting of the sites. Currently it all happens serially. - [x] Better asynchronous getting of the sites. Currently it all happens serially.
- [x] Allow for storing asynchronously - dropping the "links to" logic fixes this need - [ ] Allow for storing asynchronously
- [x] Control crawler via config file (no recompliation needed)
### Feats 3/17/25: Took >1hr to crawl 100 pages
3/17/25: Took >1hr to crawl 100 pages. 3/19/25: Took 20min to crawl 1000 pages
3/19/25: Took 20min to crawl 1000 pages.
This ment we stored 1000 pages, 142,997 urls, and 1,425,798 links between the two. This ment we stored 1000 pages, 142,997 urls, and 1,425,798 links between the two.
3/20/25: Took 5min to crawl 1000 pages. 3/20/25: Took 5min to crawl 1000 pages
3/21/25: Took 3min to crawl 1000 pages. 3/21/25: Took 3min to crawl 1000 pages
7/.../25: Downloaded just shy of 12TB of data from a remote server.
# About # About

View File

@@ -12,25 +12,3 @@ loki.write "local_loki" {
url = "http://loki:3100/loki/api/v1/push" url = "http://loki:3100/loki/api/v1/push"
} }
} }
otelcol.receiver.otlp "otlp_receiver" {
grpc {
endpoint = "0.0.0.0:4317"
}
http {
endpoint = "0.0.0.0:4318"
}
output {
traces = [otelcol.exporter.otlp.tempo.input,]
}
}
otelcol.exporter.otlp "tempo" {
client {
endpoint = "tempo:4317"
tls {
insecure = true
}
}
}

View File

@@ -1,6 +1,4 @@
services: services:
# Database
surreal: surreal:
image: surrealdb/surrealdb:latest-dev image: surrealdb/surrealdb:latest-dev
ports: ports:
@@ -17,18 +15,6 @@ services:
- root - root
- rocksdb:/mydata/database.db - rocksdb:/mydata/database.db
# Tracing
tempo:
image: grafana/tempo:latest
command: [ "-config.file=/etc/tempo.yaml" ]
volumes:
- ./tempo.yaml:/etc/tempo.yaml
- tempo_storage:/var/tempo
ports:
- 3200:3200 # self metrics for prometheus
- 4317:4317 # otlp grpc - (alloy)
# Log scraper
alloy: alloy:
image: grafana/alloy:latest image: grafana/alloy:latest
ports: ports:
@@ -38,13 +24,9 @@ services:
- ./logs/:/tmp/alloy-logs - ./logs/:/tmp/alloy-logs
- ./alloy.conf:/etc/alloy/config.alloy - ./alloy.conf:/etc/alloy/config.alloy
- alloy_storage:/var/lib/alloy - alloy_storage:/var/lib/alloy
command: command: run --server.http.listen-addr=0.0.0.0:12345 --storage.path=/var/lib/alloy/data /etc/alloy/config.alloy
- run
- --server.http.listen-addr=0.0.0.0:12345
- --storage.path=/var/lib/alloy/data
- /etc/alloy/config.alloy
# Log storage / analysis #logs
loki: loki:
image: grafana/loki:latest image: grafana/loki:latest
ports: ports:
@@ -53,21 +35,16 @@ services:
volumes: volumes:
- ./loki.yaml:/etc/loki/local-config.yaml - ./loki.yaml:/etc/loki/local-config.yaml
# Metrics # Metrics collector
prometheus: prometheus:
image: prom/prometheus:latest image: prom/prometheus:latest
ports: expose:
- 9090:9090 - 9090
volumes: volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml - ./prometheus.yaml:/etc/prometheus/prometheus.yml
# persist data # persist data
# - prometheus_storage:/prometheus - prometheus_storage:/prometheus
command: command: --web.enable-lifecycle --config.file=/etc/prometheus/prometheus.yml
- --enable-feature=native-histograms
- --web.enable-remote-write-receiver
- --web.enable-lifecycle
- --web.enable-otlp-receiver
- --config.file=/etc/prometheus/prometheus.yml
# Everything viewer # Everything viewer
grafana: grafana:
@@ -89,4 +66,4 @@ volumes:
grafana_storage: grafana_storage:
alloy_storage: alloy_storage:
surrealdb_storage: surrealdb_storage:
tempo_storage: minio_storage:

View File

@@ -22,20 +22,3 @@ datasources:
editable: false editable: false
jsonData: jsonData:
httpMethod: GET httpMethod: GET
- name: Tempo
type: tempo
access: proxy
orgId: 1
url: http://tempo:3200
basicAuth: false
isDefault: false
version: 1
editable: true
apiVersion: 1
uid: tempo
jsonData:
httpMethod: GET
serviceMap:
datasourceUid: prometheus
streamingEnabled:
search: true

View File

@@ -1,15 +1,17 @@
global: global:
scrape_interval: 60s scrape_interval: 5s
query_log_file: /etc/prometheus/query.log query_log_file: /etc/prometheus/query.log
scrape_configs: scrape_configs:
# Crawler configs get pushed with OTLP - job_name: crawler
# - job_name: 'loki'
# static_configs:
# - targets: ['loki:3100']
# - job_name: 'prometheus'
# static_configs:
# - targets: ['localhost:9090']
- job_name: 'tempo'
static_configs: static_configs:
- targets: ['tempo:3200'] # change this your machine's ip, localhost won't work
# because localhost refers to the docker container.
- targets: ['172.20.239.48:2500']
#- targets: ['192.168.8.209:2500']
- job_name: loki
static_configs:
- targets: ['loki:3100']
- job_name: prometheus
static_configs:
- targets: ['localhost:9090']

View File

@@ -1,48 +0,0 @@
stream_over_http_enabled: true
server:
http_listen_port: 3200
log_level: info
query_frontend:
search:
duration_slo: 5s
throughput_bytes_slo: 1.073741824e+09
metadata_slo:
duration_slo: 5s
throughput_bytes_slo: 1.073741824e+09
trace_by_id:
duration_slo: 5s
distributor:
receivers:
otlp:
protocols:
grpc:
endpoint: "tempo:4317"
metrics_generator:
registry:
external_labels:
source: tempo
cluster: docker-compose
storage:
path: /var/tempo/generator/wal
remote_write:
- url: http://prometheus:9090/api/v1/write
send_exemplars: true
traces_storage:
path: /var/tempo/generator/traces
storage:
trace:
backend: local # backend configuration to use
wal:
path: /var/tempo/wal # where to store the wal locally
local:
path: /var/tempo/blocks
overrides:
defaults:
metrics_generator:
processors: [service-graphs, span-metrics, local-blocks] # enables metrics generator
generate_native_histograms: both

113
src/db.rs
View File

@@ -1,13 +1,7 @@
use metrics::counter; use metrics::counter;
use rusqlite::Connection;
use std::fmt::Debug;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{fmt::Debug, time::Duration};
use surrealdb::{
engine::remote::ws::{Client, Ws},
opt::auth::Root,
sql::Thing,
Surreal,
};
use tokio::time::sleep;
use tracing::{error, instrument, trace}; use tracing::{error, instrument, trace};
use url::Url; use url::Url;
@@ -17,23 +11,16 @@ const STORE: &str = "surql_store_calls";
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash)] #[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash)]
pub struct Website { pub struct Website {
pub id: Option<Thing>,
/// The url that this data is found at /// The url that this data is found at
pub site: Url, pub site: Url,
/// Wether or not this link has been crawled yet /// Wether or not this link has been crawled yet
pub crawled: bool, pub crawled: bool,
/// 200, 404, etc
pub status_code: u16,
} }
// manual impl to make tracing look nicer // manual impl to make tracing look nicer
impl Debug for Website { impl Debug for Website {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Website") f.debug_struct("Website").field("site", &self.site).finish()
.field("host", &self.site.host())
.field("path", &self.site.path())
.field("status_code", &self.status_code)
.finish()
} }
} }
@@ -46,26 +33,32 @@ impl Website {
}; };
Self { Self {
crawled, crawled,
site, site
status_code: 0,
id: None,
} }
} }
pub fn set_crawled(&mut self) {
trace!("Set crawled to true");
self.crawled = true
}
// Insert ever item in the vec into surreal, crawled state will be preserved as TRUE // Insert ever item in the vec into surreal, crawled state will be preserved as TRUE
// if already in the database as such or incoming data is TRUE. // if already in the database as such or incoming data is TRUE.
#[instrument(skip(db))] pub async fn store_all(all: Vec<Self>, db: &Connection) {
pub async fn store_all(all: Vec<Self>, db: &Surreal<Client>) -> Vec<Thing> {
counter!(STORE).increment(1); counter!(STORE).increment(1);
let mut things = Vec::with_capacity(all.len()); let mut things = Vec::with_capacity(all.len());
rusqlite::ParamsFromIter;
db.execute("",
params![]
);
match db match db
.query( .query(
"INSERT INTO website $array "INSERT INTO website $array
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
accessed_at = time::now(), accessed_at = time::now(),
status_code = $input.status_code,
processing = false,
crawled = crawled OR $input.crawled crawled = crawled OR $input.crawled
RETURN VALUE id; RETURN VALUE id;
", ",
@@ -85,72 +78,22 @@ impl Website {
} }
} }
/// Returns uncrawled links
#[instrument(skip(db, config))]
pub async fn get_next(db: &Surreal<Client>, config: &Config) -> Option<Website> {
let mut res: Option<Website> = None;
let mut fails = 0;
while res == None {
let mut response = db
.query("fn::get_next($format)")
.bind(("format", config.crawl_filter.to_string()))
.await
.expect("Hard-coded query failed..?");
res = match response.take(0) {
Ok(ok) => ok,
Err(_err) => {
// basically just CSMA/CA
let delay = rand::random_range(10..10_000);
sleep(Duration::from_millis(delay)).await;
fails += 1;
// Don't get stuck here forever, failing...
// (most I've seen is 1)
if fails > 5 {
error!("Max attempts to get_next() reached... ({fails})");
return None
}
None
}
};
}
res
}
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
#[allow(dead_code)]
pub struct Email { pub struct Email {
pub email: String, pub email: String,
pub on: String, pub on: String,
} }
#[instrument(skip_all, name = "SurrealDB")] #[derive(Debug, Deserialize)]
pub async fn connect(config: &Config) -> surrealdb::Result<Surreal<Client>> { pub struct Record {
trace!("Establishing connection to surreal..."); #[allow(dead_code)]
// Connect to the server pub id: Thing,
let db = Surreal::new::<Ws>(&config.surreal_url).await?;
trace!("Logging in...");
// Signin as a namespace, database, or root user
db.signin(Root {
username: &config.surreal_username,
password: &config.surreal_password,
})
.await?;
// Select a specific namespace / database
db.use_ns(&config.surreal_ns)
.use_db(&config.surreal_db)
.await?;
let setup = include_bytes!("setup.surql");
let init_commands = setup.iter().map(|c| *c as char).collect::<String>();
db.query(init_commands)
.await
.expect("Failed to setup surreal tables.");
Ok(db)
} }
#[instrument(skip_all, name = "sqlite_connect")]
pub async fn connect(config: &Config) -> Result<Connection, rusqlite::Error> {
trace!("Establishing connection to sqlite...");
// Connect to the server
Connection::open("./squeelite.db")
}

View File

@@ -1,105 +1,70 @@
use std::{io::ErrorKind, path::PathBuf}; use std::{ffi::OsStr, path::PathBuf};
use reqwest::header::HeaderValue;
use tokio::fs; use tokio::fs;
use tracing::{error, trace, warn}; use tracing::{debug, error, instrument, trace, warn};
use url::Url; use url::Url;
pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf { #[instrument(skip(data))]
pub async fn store(data: &str, url: &Url) {
// extract data from url to save it accurately // extract data from url to save it accurately
let mut url_path = PathBuf::from("./downloaded/".to_string() + url.domain().unwrap_or("UnknownDomain") + url.path()); let url_path = PathBuf::from("./downloaded/".to_string() + url.domain().unwrap_or("UnknownDomain") + url.path());
if let Ok(header) = content_type.to_str() { // if it's a file
// text/html; charset=UTF-8; option=value let (basepath, filename) = if url_path.extension().filter(valid_file_extension).is_some() {
let ttype = if let Some((t, _)) = header.split_once(';') { // get everything up till the file
t let basepath = url_path.ancestors().skip(1).take(1).collect::<PathBuf>();
// get the file name
let filename = url_path.file_name().expect("This should exist").to_string_lossy();
trace!("Save path: {:?} and base path: {:?}", &url_path, &basepath);
(basepath, filename.to_string())
} else { } else {
header (url_path.clone(), "index.html".into())
}; };
if let Some((ttype, subtype)) = ttype.split_once('/') { debug!("Writing at: {:?} {:?}", basepath, filename);
trace!(url = url.to_string(), main_type = ttype, sub_type = subtype, "Found Content-Type to be: {ttype}/{subtype}");
// If the Content-Type header is "*/html" (most likely "text/html") and the path's
// extension is anything but html:
if subtype=="html" && !url_path.extension().is_some_and(|f| f=="html" || f=="htm" ) {
// time to slap a index.html to the end of that path there!
url_path = url_path.join("index.html");
}
}
} else {
warn!("Header: {:?} couldn't be parsed into a string!", content_type);
}
trace!(url = url.to_string(), path = &*url_path.to_string_lossy(), "Converted URL into path");
url_path
}
pub async fn check_file_length(file: &PathBuf) -> Option<u64> {
match tokio::fs::OpenOptions::new()
.write(false)
.read(true)
.create(false)
.open(file).await
{
Ok(file) => {
match file.metadata().await {
Ok(meta) => {
return Some(meta.len())
},
Err(err) => {
error!("Failed to get metadata. {}", err)
},
}
},
Err(err) => {
match err.kind() {
ErrorKind::NotFound => {/* ignore */},
_ => warn!("Failed to open file to check length. {:?} {}", file, err),
}
},
}
None
}
pub async fn init(filename: &PathBuf) -> Option<fs::File> {
let file = async || tokio::fs::OpenOptions::new()
.write(true)
.append(false)
.create(true)
.open(&filename).await;
match file().await {
Ok(ok) => {
trace!("Initialized file {}", filename.to_str().unwrap_or("N/A"));
Some(ok)
},
Err(err) => {
// the file/folder isn't found
if err.kind() == ErrorKind::NotFound {
if let Some(parent ) = &filename.parent() {
// create the folders // create the folders
if let Err(err) = fs::create_dir_all(&parent).await { if let Err(err) = fs::create_dir_all(&basepath).await {
error!("Dir creation: {err} {:?}", filename); error!("Dir creation: {err} {:?}", basepath);
} else if let Ok(ok) = file().await {
return Some(ok);
}
} else { } else {
error!("Couldn't get file's parents: {:?}", &filename); // FIXME I don't think this handles index.html files well...
// TODO this should probably append .html to non-described files
// create the file if that was successful
if let Err(err) = fs::write(&basepath.join(filename), data).await {
error!("File creation: {err} {:?}", url_path);
}
}
} }
} else if err.kind() == ErrorKind::NotADirectory {
// Example:
// 1. example.com/user
// 2. example.com/user/post
// If file 1 exists it will prevent file 2 from existing
// FIXME
error!("One of the parent directories is actually a file...") fn valid_file_extension(take: &&OsStr) -> bool {
} else { let los = take.to_string_lossy();
error!("File open error: {err} {:?}", filename); let all = los.split('.');
} match all.last() {
// we don't care about other errors, we can't/shouldn't fix them Some(s) => {
None match s.to_lowercase().as_str() {
"html" => true,
"css" => true,
"js" => true,
"ts" => true,
"otf" => true, // font
"png" => true,
"svg" => true,
"jpg" => true,
"jpeg" => true,
"mp4" => true,
"mp3" => true,
"webp" => true,
"pdf" => true,
"json" => true,
"xml" => true,
_ => {
warn!("Might be forgetting a file extension: {s}");
false
} }
} }
},
None => false,
}
} }

View File

@@ -1,83 +1,31 @@
#![feature(ip_from)] #![feature(ip_from)]
#![feature(path_add_extension)]
#![deny(clippy::unwrap_used)]
extern crate html5ever; extern crate html5ever;
use std::{ use std::{
collections::HashSet, collections::HashSet, fs::File, io::Read, net::{IpAddr, Ipv4Addr}
fs::File,
io::Read,
sync::{Arc, LazyLock},
}; };
use db::{connect, Website}; use db::{connect, Website};
use futures_util::StreamExt; use metrics::{counter, gauge};
use opentelemetry::{ use metrics_exporter_prometheus::PrometheusBuilder;
global::{self},
metrics::{Counter, Meter, UpDownCounter},
};
use opentelemetry_otlp::{Protocol, WithExportConfig};
use opentelemetry_sdk::{metrics::SdkMeterProvider, trace::SdkTracerProvider};
use serde::Deserialize; use serde::Deserialize;
use surrealdb::{engine::remote::ws::Client, Surreal}; use surrealdb::{engine::remote::ws::Client, Surreal};
use tokio::{ use tokio::task::JoinSet;
io::{AsyncReadExt, AsyncWriteExt, BufWriter}, use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, trace_span};
sync::RwLock,
task::JoinSet,
};
use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, warn};
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry}; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry};
use crate::db::get_next;
mod db; mod db;
mod filesystem;
mod parser; mod parser;
mod filesystem;
static METER: LazyLock<Meter> = LazyLock::new(|| global::meter("Internet_Mapper")); const GET_METRIC: &str = "total_gets";
static BATCH_SIZE: LazyLock<Counter<u64>> = const GET_IN_FLIGHT: &str = "gets_in_flight";
LazyLock::new(|| METER.u64_counter("crawler_batch_size").build()); const SITES_CRAWLED: &str = "pages_crawled";
static BEING_PROCESSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| { const BEING_PROCESSED: &str = "pages_being_processed";
METER
.i64_up_down_counter("crawler_pages_being_processed")
.build()
});
static BEING_PARSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER
.i64_up_down_counter("crawler_pages_being_parsed")
.build()
});
static BEING_STREAMED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER
.i64_up_down_counter("crawler_pages_being_streamed")
.build()
});
static GET_IN_FLIGHT: LazyLock<UpDownCounter<i64>> =
LazyLock::new(|| METER.i64_up_down_counter("crawler_gets_in_flight").build());
static TOTAL_BYTES_DOWN: LazyLock<Counter<u64>> =
LazyLock::new(|| METER.u64_counter("crawler_total_bytes_down").build());
static SITES_CRAWLED: LazyLock<Counter<u64>> =
LazyLock::new(|| METER.u64_counter("crawler_total_sites_crawled").build());
static CONFIG: LazyLock<Config> = LazyLock::new(|| {
let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml");
let mut buf = String::new();
let _ = file.read_to_string(&mut buf);
let config: Config = toml::from_str(&buf).expect("Failed to parse Crawler.toml");
config
});
// FIXME Traces aren't working on multiple threads, they block
// static TRACER: LazyLock<BoxedTracer> = LazyLock::new(|| global::tracer("Internet_Mapper"));
#[derive(Deserialize)] #[derive(Deserialize)]
struct Config { struct Config {
tracing_endpoint: String,
metrics_endpoint: String,
log_file: String,
surreal_ns: String, surreal_ns: String,
surreal_db: String, surreal_db: String,
surreal_url: String, surreal_url: String,
@@ -85,302 +33,15 @@ struct Config {
surreal_password: String, surreal_password: String,
crawl_filter: String, crawl_filter: String,
start_url: String,
budget: usize, budget: usize,
batch_size: usize,
} }
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
println!("Logs and metrics are provided to the Grafana dashboard");
// Start TRACE / LOGGING / METRICS
load_logging(&CONFIG); // this seems to be working ok
global::set_tracer_provider(load_tracing(&CONFIG));
global::set_meter_provider(load_metrics(&CONFIG));
BATCH_SIZE.add(CONFIG.batch_size as u64, &[]);
// When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
// let crawl_filter = "en.wikipedia.org/";
// let budget = 50;
let crawled = Arc::new(RwLock::new(0));
let starting_url = &CONFIG.start_url;
let db = connect(&CONFIG)
.await
.expect("Failed to connect to surreal, aborting.");
let reqwest = reqwest::Client::builder()
// .use_rustls_tls()
.gzip(true)
.build()
.expect("Failed to build reqwest client.");
// Kick off the whole machine - This Website object doesn't matter, it's just to allow for
// get() to work.
// let mut span = TRACER.start("Pre-Loop");
let site = Website::new(starting_url, false);
process(site, db.clone(), reqwest.clone()).await;
// span.end();
// let mut main_loop_span= TRACER.start("Main-Loop");
let mut futures = JoinSet::new();
for _ in 0..CONFIG.batch_size {
futures.spawn(process_single_thread(
&CONFIG,
db.clone(),
reqwest.clone(),
crawled.clone(),
));
}
while let Some(_) = futures.join_next().await {
// Budget - Threads - This thread (1)
// Would roughly be the acceptable amount at which a thread should exit
if *(crawled.read().await) < CONFIG.budget - CONFIG.batch_size - 1 {
warn!("Thread terminated early, restarting");
futures.spawn(process_single_thread(
&CONFIG,
db.clone(),
reqwest.clone(),
crawled.clone(),
));
}
}
futures.join_all().await;
// main_loop_span.end();
info!("Done");
}
async fn process_single_thread(
config: &Config,
db: Surreal<Client>,
reqwest: reqwest::Client,
crawled: Arc<RwLock<usize>>,
) {
while *(crawled.read().await) < config.budget {
let uncrawled = get_next(&db.clone(), &config).await;
match uncrawled {
Some(site) => {
process(site, db.clone(), reqwest.clone()).await;
SITES_CRAWLED.add(1, &[]);
// Somehow this write doesn't hang on the while's read?
let mut c = crawled.write().await;
*c += 1;
}
None => {
warn!("fn::get_next() returned None");
return;
}
}
}
}
#[instrument(skip(db, reqwest))]
/// Downloads and crawls and stores a webpage.
/// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver
async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client) {
// METRICS
debug!(url = &site.site.as_str(), "Process: {}", &site.site);
BEING_PROCESSED.add(1, &[]);
// let mut process_span = TRACER.start("Process");
// Build the request
let request_builder = reqwest.get(site.site.to_string());
// Send the http request (get)
GET_IN_FLIGHT.add(1, &[]);
if let Ok(response) = request_builder.send().await {
let mut skip_download = false;
GET_IN_FLIGHT.add(-1, &[]);
let headers = response.headers();
let code = response.status();
if code != 200 {
warn!("{code} for {}", site.site.as_str());
}
#[allow(non_snake_case)]
let CT = headers.get("Content-Type");
let ct = headers.get("content-type");
let ct = match (CT, ct) {
(None, None) => {
warn!(
"Server did not respond with Content-Type header. Url: {} Headers: ({:?})",
site.site.to_string(),
headers
);
return;
}
(None, Some(a)) => a,
(Some(a), None) => a,
(Some(a), Some(_)) => a,
};
// create filepath (handles / -> /index.html)
let real_path = filesystem::as_path(&site.site, ct);
let mut tmp_path = real_path.clone();
if !(tmp_path.add_extension("crawl_temp")) {
warn!("Failed to add extension to file");
// fallback ig
tmp_path = tmp_path.with_extension("crawl_temp");
}
// CODE FOR UPDATING DOWNLOADED CONTENT:
// Check the Content-Length header (we assume the server is telling the truth) (I don't see
// a reason for it to lie in this case).
// And see if the file on the disk is the same length.
// Yes, technically this isn't perfect, but the other option is storing ETags, which I
// don't want to do right now.
if let Some(len) = headers.get("Content-Length") {
if let Ok(s) = len.to_str() {
// length is in bytes
if let Ok(len) = s.parse::<u64>() {
if let Some(disk_len) = filesystem::check_file_length(&real_path).await {
if disk_len == len {
skip_download = true;
}
} else {
// File not found (or other error).
// Program will continue on it's way, downloading content.
}
}
}
}
// make sure that the file is good to go
if let Some(file) = filesystem::init(&tmp_path).await {
// Get body from response
// stream the response onto the disk
let mut stream = response.bytes_stream();
let should_parse = real_path.to_string_lossy().ends_with(".html");
let mut buf: Vec<u8> = Vec::new();
if skip_download && should_parse {
// since we are skipping the download we will just read the file off the disk to
// parse it
if let Ok(mut file) = tokio::fs::OpenOptions::new()
.read(true)
.open(&real_path).await
{
if let Err(err) = file.read_to_end(&mut buf).await {
warn!("Failed to read file off disk for parsing, {}", err);
}
}
}
// !!!DOWNLOADING TIME!!!
if !skip_download {
let mut writer = BufWriter::new(file);
// Write file to disk
trace!("Writing at: {:?}", tmp_path);
BEING_STREAMED.add(1, &[]);
// let mut stream_span = TRACER.start("Stream");
while let Some(data) = stream.next().await {
match data {
Ok(data) => {
TOTAL_BYTES_DOWN.add(data.len() as u64, &[]);
let _ = writer.write_all(&data).await;
// If we are going to parse this file later, we will save it
// into memory as well as the disk.
// We do this because the data here might be incomplete
if should_parse {
data.iter().for_each(|f| buf.push(*f));
}
}
Err(err) => {
error!("{err}")
}
}
}
let _ = writer.flush().await;
// rename the temp file into the real file name
if let Err(err) = tokio::fs::rename(&tmp_path, &real_path).await {
error!(
from = &*tmp_path.to_string_lossy(),
to = &*real_path.to_string_lossy(),
"Error renaming file: {}",
err
);
}
// stream_span.end();
BEING_STREAMED.add(-1, &[]);
}
// (If needed) Parse the file
if should_parse {
BEING_PARSED.add(1, &[]);
// let mut parsing_span = TRACER.start("Parsing");
// Parse document and get relationships
let sites = parser::parse(&site, &buf).await;
// De-duplicate this list
let prev_len = sites.len();
let set = sites.into_iter().fold(HashSet::new(), |mut set, item| {
set.insert(item);
set
});
let de_dupe_sites: Vec<Website> = set.into_iter().collect();
let diff = prev_len - de_dupe_sites.len();
trace!("Saved {diff} from being entered into the db by de-duping");
// Store all the other sites so that we can link to them.
let _ = Website::store_all(de_dupe_sites, &db).await;
// parsing_span.end();
BEING_PARSED.add(-1, &[]);
} else {
trace!(url = site.site.as_str(), "Parse = False");
}
// update self in db
site.crawled = true;
site.status_code = code.as_u16();
Website::store_all(vec![site.clone()], &db).await;
}
} else {
error!(url = site.site.as_str(), "Failed to get: {}", &site.site);
}
// process_span.end();
BEING_PROCESSED.add(-1, &[]);
}
fn load_tracing(config: &Config) -> SdkTracerProvider {
// Send spans to Alloy (which will send them to Tempo)
let otlp_span = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(config.tracing_endpoint.clone())
.build()
.unwrap();
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_simple_exporter(otlp_span)
.build();
tracer_provider
}
fn load_logging(config: &Config) {
// let otlp_log = opentelemetry_otlp::LogExporter::builder()
// .with_tonic()
// .with_endpoint(endpoint)
// .build()
// .unwrap();
// let logger_provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder()
// .with_simple_exporter(otlp_log)
// .build();
let writer = std::fs::OpenOptions::new() let writer = std::fs::OpenOptions::new()
.append(true) .append(true)
.create(true) .create(true)
.open(config.log_file.clone()) .open("./docker/logs/tracing.log")
.expect("Couldn't make log file!"); .expect("Couldn't make log file!");
let filter = EnvFilter::builder() let filter = EnvFilter::builder()
@@ -394,22 +55,167 @@ fn load_logging(config: &Config) {
.with_file(true) .with_file(true)
.json() .json()
.with_writer(writer) .with_writer(writer)
.with_filter(filter), .with_filter(filter)
); );
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber"); tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
let builder = PrometheusBuilder::new();
builder
.with_http_listener(std::net::SocketAddr::new(
IpAddr::V4(Ipv4Addr::from_octets([0, 0, 0, 0])),
2500,
))
.install()
.expect("failed to install recorder/exporter");
info!("Starting...");
// Would probably take these in as parameters from a cli
let starting_url = "https://en.wikipedia.org/";
// When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
// let crawl_filter = "en.wikipedia.org/";
// let budget = 50;
let mut crawled = 0;
let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml");
let mut buf = String::new();
let _ = file.read_to_string(&mut buf);
let config: Config = toml::from_str(&buf).expect("Failed to parse Crawler.toml");
let db = connect(&config)
.await
.expect("Failed to connect to surreal, aborting.");
let reqwest = reqwest::Client::builder()
// .use_rustls_tls()
.gzip(true)
.build()
.expect("Failed to build reqwest client.");
// Kick off the whole machine - This Website object doesn't matter, it's just to allow for
// get() to work.
let span = trace_span!("Pre-Loop");
let pre_loop_span = span.enter();
// Download the site
let site = Website::new(starting_url, false);
process(site, db.clone(), reqwest.clone()).await;
drop(pre_loop_span);
let span = trace_span!("Loop");
let span = span.enter();
while crawled < config.budget {
let get_num = if config.budget - crawled < 100 {
config.budget - crawled
} else {
100
};
let uncrawled = get_uncrawled_links(&db, get_num, config.crawl_filter.clone()).await;
if uncrawled.is_empty() {
info!("Had more budget but finished crawling everything.");
return;
} }
fn load_metrics(config: &Config) -> SdkMeterProvider { {
// Send metrics to Prometheus let mut futures = JoinSet::new();
let otlp_metrics = opentelemetry_otlp::MetricExporter::builder() for site in uncrawled {
.with_http() gauge!(BEING_PROCESSED).increment(1);
.with_protocol(Protocol::HttpBinary) futures.spawn(process(site, db.clone(), reqwest.clone()));
.with_endpoint(config.metrics_endpoint.clone()) // let percent = format!("{:.2}%", (crawled as f32 / budget as f32) * 100f32);
.build() // info!("Crawled {crawled} out of {budget} pages. ({percent})");
.unwrap();
let metrics_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
.with_periodic_exporter(otlp_metrics) // default delay is 60s, turn down to like 15
.build();
metrics_provider
} }
let c = counter!(SITES_CRAWLED);
// As futures complete runs code in while block
while futures.join_next().await.is_some() {
c.increment(1);
gauge!(BEING_PROCESSED).decrement(1);
crawled += 1;
}
}
}
drop(span);
info!("Done");
}
#[instrument(skip(db, reqwest))]
/// Downloads and crawls and stores a webpage.
/// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver
async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client) {
// METRICS
trace!("Process: {}", &site.site);
// Build the request
let request_builder = reqwest.get(site.site.to_string());
// METRICS
let g = gauge!(GET_IN_FLIGHT);
g.increment(1);
// Send the http request (get)
if let Ok(response) = request_builder.send().await {
// METRICS
g.decrement(1);
counter!(GET_METRIC).increment(1);
// Get body from response
let data = response
.text()
.await
.expect("Failed to read http response's body!");
// Store document
filesystem::store(&data, &site.site).await;
// Parse document and get relationships
let sites = parser::parse(&site, &data).await;
// update self in db
site.set_crawled();
Website::store_all(vec![site], &db).await;
// De-duplicate this list
let prev_len = sites.len();
let set = sites.into_iter().fold(HashSet::new(), |mut set,item| {
set.insert(item);
set
});
let de_dupe_sites: Vec<Website> = set.into_iter().collect();
let diff = prev_len - de_dupe_sites.len();
trace!("Saved {diff} from being entered into the db by de-duping");
// Store all the other sites so that we can link to them.
let _ = Website::store_all(de_dupe_sites, &db).await;
} else {
error!("Failed to get: {}", &site.site);
}
}
/// Returns uncrawled links
#[instrument(skip(db))]
async fn get_uncrawled_links(
db: &Surreal<Client>,
mut count: usize,
filter: String,
) -> Vec<Website> {
if count > 100 {
count = 100
}
debug!("Getting uncrawled links");
let mut response = db
.query("SELECT * FROM website WHERE crawled = false AND site ~ type::string($format) LIMIT $count;")
.bind(("format", filter))
.bind(("count", count))
.await
.expect("Hard-coded query failed..?");
response
.take(0)
.expect("Returned websites couldn't be parsed")
}

View File

@@ -1,10 +1,11 @@
use std::default::Default; use std::default::Default;
use std::str::FromStr;
use html5ever::tokenizer::{BufferQueue, TokenizerResult}; use html5ever::tokenizer::{BufferQueue, TokenizerResult};
use html5ever::tokenizer::{StartTag, TagToken}; use html5ever::tokenizer::{StartTag, TagToken};
use html5ever::tokenizer::{Token, TokenSink, TokenSinkResult, Tokenizer, TokenizerOpts}; use html5ever::tokenizer::{Token, TokenSink, TokenSinkResult, Tokenizer, TokenizerOpts};
use html5ever::{local_name, tendril::*}; use html5ever::{local_name, tendril::*};
use tracing::{error, instrument, trace, warn}; use tracing::{debug, error, instrument, trace, warn};
use url::Url; use url::Url;
use crate::db::Website; use crate::db::Website;
@@ -12,6 +13,7 @@ use crate::db::Website;
impl TokenSink for Website { impl TokenSink for Website {
type Handle = Vec<Website>; type Handle = Vec<Website>;
#[instrument(skip(token, _line_number))]
fn process_token(&self, token: Token, _line_number: u64) -> TokenSinkResult<Self::Handle> { fn process_token(&self, token: Token, _line_number: u64) -> TokenSinkResult<Self::Handle> {
match token { match token {
TagToken(tag) => { TagToken(tag) => {
@@ -32,13 +34,13 @@ impl TokenSink for Website {
let attr_name = attr.name.local.to_string(); let attr_name = attr.name.local.to_string();
if attr_name == "src" || attr_name == "href" || attr_name == "data" if attr_name == "src" || attr_name == "href" || attr_name == "data"
{ {
trace!(url = self.site.as_str(),"Found `{}` in html `{}` tag", &attr.value, tag.name); trace!("Found `{}` in html `{}` tag", &attr.value, tag.name);
let url = try_get_url(&self.site, &attr.value); let url = try_get_url(&self.site, &attr.value);
if let Some(mut parsed) = url { if let Some(mut parsed) = url {
parsed.set_query(None); parsed.set_query(None);
parsed.set_fragment(None); parsed.set_fragment(None);
trace!(url = self.site.as_str(), "Final cleaned URL: `{}`", parsed.to_string()); debug!("Final cleaned URL: `{}`", parsed.to_string());
let web = Website::new(&parsed.to_string(), false); let web = Website::new(&parsed.to_string(), false);
links.push(web); links.push(web);
} }
@@ -59,16 +61,14 @@ impl TokenSink for Website {
} }
} }
#[instrument(skip(data))] #[instrument(skip_all)]
/// Parses the passed site and returns all the sites it links to. /// Parses the passed site and returns all the sites it links to.
pub async fn parse(site: &Website, data: &[u8]) -> Vec<Website> { pub async fn parse(site: &Website, data: &str) -> Vec<Website> {
trace!(url = site.site.as_str(), "Parsing {}", site.site.to_string());
// prep work // prep work
let mut other_sites: Vec<Website> = Vec::new(); let mut other_sites: Vec<Website> = Vec::new();
// change data into something that can be tokenized // change data into something that can be tokenized
let s: Result<Tendril<fmt::UTF8>, ()> = Tendril::try_from_byte_slice(data); let chunk = Tendril::from_str(data).expect("Failed to parse string into Tendril!");
if let Ok(chunk) = s {
// create buffer of tokens and push our input into it // create buffer of tokens and push our input into it
let token_buffer = BufferQueue::default(); let token_buffer = BufferQueue::default();
token_buffer.push_back( token_buffer.push_back(
@@ -86,9 +86,6 @@ pub async fn parse(site: &Website, data: &[u8]) -> Vec<Website> {
} }
assert!(token_buffer.is_empty()); assert!(token_buffer.is_empty());
tokenizer.end(); tokenizer.end();
} else {
warn!(url = site.site.as_str(), "Tendril failed to parse on: {}", site.site.to_string());
}
other_sites other_sites
} }
@@ -99,7 +96,7 @@ fn try_get_url(parent: &Url, link: &str) -> Option<Url> {
Ok(ok) => Some(ok), Ok(ok) => Some(ok),
Err(e) => { Err(e) => {
if link.starts_with('#') { if link.starts_with('#') {
trace!(url = parent.as_str(), "Rejecting # url"); trace!("Rejecting # url");
None None
} else if link.starts_with("//") { } else if link.starts_with("//") {
// if a url starts with "//" is assumed that it will adopt // if a url starts with "//" is assumed that it will adopt
@@ -107,34 +104,32 @@ fn try_get_url(parent: &Url, link: &str) -> Option<Url> {
// https://stackoverflow.com/questions/9646407/two-forward-slashes-in-a-url-src-href-attribute // https://stackoverflow.com/questions/9646407/two-forward-slashes-in-a-url-src-href-attribute
let scheme = parent.scheme(); let scheme = parent.scheme();
match Url::parse(&format!("{scheme}://{link}")) { match Url::parse(&format!("{scheme}://{}", link)) {
Ok(url) => Some(url), Ok(url) => Some(url),
Err(err) => { Err(err) => {
error!("Failed parsing relative scheme url: {}", err); error!("Failed parsing realative scheme url: {}", err);
None None
} }
} }
} else { } else {
// # This is some sort of relative url, gonna try patching it up into an absolute // # This is some sort of realative url, gonna try patching it up into an absolute
// url // url
match e { match e {
url::ParseError::RelativeUrlWithoutBase => { url::ParseError::RelativeUrlWithoutBase => {
// Is: scheme://host:port // Is: scheme://host:port
let mut origin = parent.origin().ascii_serialization(); let origin = parent.origin().ascii_serialization();
if !origin.ends_with('/') && !link.starts_with('/') {
origin += "/";
}
let url = origin.clone() + link; let url = origin.clone() + link;
trace!("Built `{url}` from `{origin} + {}`", link.to_string());
if let Ok(url) = Url::parse(&url) { if let Ok(url) = Url::parse(&url) {
trace!(url = parent.as_str(), "Built `{url}` from `{origin} + `{}`", link.to_string()); trace!("Saved relative url `{}` AS: `{}`", link, url);
Some(url) Some(url)
} else { } else {
error!( error!(
"Failed to reconstruct a url from relative url: `{}` on site: `{}`. Failed url was: {}", "Failed to reconstruct a url from relative url: `{}` on site: `{}`",
link, link,
parent.to_string(), parent.to_string()
url
); );
None None
} }

View File

@@ -4,15 +4,6 @@ DEFINE FIELD IF NOT EXISTS site ON TABLE website TYPE string;
DEFINE INDEX IF NOT EXISTS idx ON TABLE website COLUMNS site UNIQUE; DEFINE INDEX IF NOT EXISTS idx ON TABLE website COLUMNS site UNIQUE;
DEFINE FIELD IF NOT EXISTS crawled ON TABLE website TYPE bool; DEFINE FIELD IF NOT EXISTS crawled ON TABLE website TYPE bool;
DEFINE FIELD IF NOT EXISTS processing ON TABLE website TYPE bool DEFAULT false;
DEFINE FIELD IF NOT EXISTS accessed_at ON TABLE website VALUE time::now(); DEFINE FIELD IF NOT EXISTS accessed_at ON TABLE website VALUE time::now();
DEFINE FIELD IF NOT EXISTS first_accessed_at ON TABLE website VALUE time::now(); DEFINE FIELD IF NOT EXISTS first_accessed_at ON TABLE website VALUE time::now();
DEFINE FUNCTION OVERWRITE fn::get_next($filter: string) {
LET $site = SELECT * FROM ONLY website WHERE crawled = false AND processing = false AND site ~ type::string($filter) LIMIT 1;
UPDATE $site.id SET processing = true;
RETURN $site
};
UPDATE website SET processing = false WHERE processing = true;