multithreading #2

Merged
Oliver merged 19 commits from multithreading into main 2025-03-19 05:01:00 +00:00
20 changed files with 2412 additions and 754 deletions

4
.gitignore vendored
View File

@ -1,5 +1,7 @@
/target /target
/.surrealdb /.surrealdb
/.minio
perf.data perf.data
flamegraph.svg flamegraph.svg
perf.data.old perf.data.old
/docker/logs/*

2
.vscode/launch.json vendored
View File

@ -9,7 +9,7 @@
"request": "launch", "request": "launch",
"name": "Debug executable 'surreal_spider'", "name": "Debug executable 'surreal_spider'",
"env": { "env": {
"RUST_LOG": "surreal_spider=debug,reqwest=info", "RUST_LOG": "surreal_spider=trace,reqwest=info",
}, },
"cargo": { "cargo": {
"args": [ "args": [

2061
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -4,12 +4,17 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
html5ever = "0.29.0" base64 = "0.22.1"
markup5ever_rcdom = "0.5.0-unofficial" html5ever = "0.29"
reqwest = "0.12.9" metrics = "0.24.1"
serde = { version = "1.0.214", features = ["derive"] } metrics-exporter-prometheus = { version = "0.16.2", features=["http-listener"]}
surrealdb = "2.0.4" # minio = "0.1.0"
minio = {git="https://github.com/minio/minio-rs.git", rev = "c28f576"}
reqwest = { version = "0.12", features = ["gzip", "default", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
surrealdb = "2.2"
tokio = { version="1.41.0", features = ["full"] } tokio = { version="1.41.0", features = ["full"] }
tracing = "0.1.40" toml = "0.8.20"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } tracing = "0.1"
url = { version = "2.5.3", features = ["serde"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "local-time", "json"] }
url = { version = "2.5", features = ["serde"] }

16
Crawler.toml Normal file
View File

@ -0,0 +1,16 @@
# Surreal config
surreal_url = "localhost:8000"
surreal_username = "root"
surreal_password = "root"
surreal_ns = "test"
surreal_db = "v1.12"
# Minio config
s3_bucket = "v1.12"
s3_url = "http://localhost:9000"
s3_access_key = "jLDPKGuu513VENc8kJwX"
s3_secret_key = "4T1nymEzsGYOlKSAb1WX7V3stnQn9a5ZoTQjDfcL"
# Crawler config
crawl_filter = "en.wikipedia.com"
budget = 200

View File

@ -1,23 +1,11 @@
# Surreal Crawler # Surreal Crawler
Mapping with a budget of 1000 (crawl 1000 sites, so many more links are actually discovered), on [my webiste](https://oliveratkinson.net) on 8/26/2024 took 1m9s. Crawls sites saving all the found links to a surrealdb database. It then proceeds to take batches of 100 uncrawled links untill the crawl budget is reached. It saves the data of each site in a minio database.
This is including the crawl and loading into the database and linking sites. (Locally hosted surreal db instance)
This run created 4299 site links with 23286 links between the sites. (It found my this git site which really bolsters those numbers.) ### TODO
## Install / Build
* You will need rust to compile the crawler [rustup.rs](https://rustup.rs)
* You need python3 (will come installed on most linux distros) and poetry for dependancy management.
* Install `pipx`, `python3`
* Then: `pipx install poetry`
* Then: `poetry install` to install the project dependancies
* You need to install [surrealdb](https://surrealdb.com)
## Use
Just run `./crawl.sh {url}` and it will start crawling. You can tweak the budget inside [crawl.sh](https://git.oliveratkinson.net/Oliver/internet_mapper/src/branch/main/crawl.sh) if you want.
You can also prefix the command with `time` to benchmark the system, such as: `time ./crawl.sh https://discord.com`.
- [ ] Domain filtering - prevent the crawler from going on alternate versions of wikipedia.
- [ ] Conditionally save content - based on filename or file contents
- [ ] GUI / TUI ?
- [ ] Better asynchronous getting of the sites. Currently it all happens serially.

View File

@ -1,16 +0,0 @@
services:
db:
image: surrealdb/surrealdb:latest-dev
ports:
- 8000:8000
volumes:
- ./.surrealdb/:/mydata
command:
- start
- --log
- debug
- --user
- root
- --pass
- root
- rocksdb:/mydata/database.db

14
docker/alloy.conf Normal file
View File

@ -0,0 +1,14 @@
local.file_match "tmplogs" {
path_targets = [{"__path__" = "/tmp/alloy-logs/*.log"}]
}
loki.source.file "local_files" {
targets = local.file_match.tmplogs.targets
forward_to = [loki.write.local_loki.receiver]
}
loki.write "local_loki" {
endpoint {
url = "http://loki:3100/loki/api/v1/push"
}
}

83
docker/compose.yml Normal file
View File

@ -0,0 +1,83 @@
services:
surreal:
image: surrealdb/surrealdb:latest-dev
ports:
- 8000:8000
volumes:
- surrealdb_storage:/mydata
command:
- start
- --log
- debug
- --user
- root
- --pass
- root
- rocksdb:/mydata/database.db
minio:
image: quay.io/minio/minio
ports:
- 9000:9000
- 9001:9001
environment:
- MINIO_ROOT_USER=root
- MINIO_ROOT_PASSWORD=an8charpassword
volumes:
- minio_storage:/data
command:
- server
- /data
- --console-address
- ":9001"
alloy:
image: grafana/alloy:latest
ports:
- 12345:12345
volumes:
# if you change this, you also need to change it in the alloy config file
- ./logs/:/tmp/alloy-logs
- ./alloy.conf:/etc/alloy/config.alloy
- alloy_storage:/var/lib/alloy
command: run --server.http.listen-addr=0.0.0.0:12345 --storage.path=/var/lib/alloy/data /etc/alloy/config.alloy
#logs
loki:
image: grafana/loki:latest
ports:
- 3100:3100
command: -config.file=/etc/loki/local-config.yaml
volumes:
- ./loki.yaml:/etc/loki/local-config.yaml
# Metrics collector
prometheus:
image: prom/prometheus:latest
expose:
- 9090
volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml
# persist data
- prometheus_storage:/prometheus
command: --web.enable-lifecycle --config.file=/etc/prometheus/prometheus.yml
# Everything viewer
grafana:
image: grafana/grafana:latest
volumes:
- ./grafana.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
- grafana_storage:/var/lib/grafana
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
- GF_AUTH_DISABLE_LOGIN_FORM=true
- GF_FEATURE_TOGGLES_ENABLE=traceqlEditor
ports:
- 3000:3000
volumes:
prometheus_storage:
grafana_storage:
alloy_storage:
surrealdb_storage:
minio_storage:

24
docker/grafana.yaml Normal file
View File

@ -0,0 +1,24 @@
apiVersion: 1
datasources:
- name: Loki
type: loki
access: proxy
orgId: 1
url: http://loki:3100
basicAuth: false
isDefault: true
version: 1
editable: false
- name: Prometheus
type: prometheus
uid: prometheus
access: proxy
orgId: 1
url: http://prometheus:9090
basicAuth: false
isDefault: false
version: 1
editable: false
jsonData:
httpMethod: GET

View File

@ -0,0 +1,223 @@
{
"__inputs": [
{
"name": "DS_LOKI",
"label": "Loki",
"description": "",
"type": "datasource",
"pluginId": "loki",
"pluginName": "Loki"
}
],
"__elements": {},
"__requires": [
{
"type": "grafana",
"id": "grafana",
"name": "Grafana",
"version": "11.3.1"
},
{
"type": "panel",
"id": "logs",
"name": "Logs",
"version": ""
},
{
"type": "datasource",
"id": "loki",
"name": "Loki",
"version": "1.0.0"
}
],
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": null,
"links": [],
"panels": [
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 0
},
"id": 1,
"options": {
"dedupStrategy": "none",
"enableLogDetails": true,
"prettifyLogMessage": false,
"showCommonLabels": false,
"showLabels": false,
"showTime": false,
"sortOrder": "Descending",
"wrapLogMessage": false
},
"pluginVersion": "11.3.1",
"targets": [
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"editorMode": "code",
"expr": "{filename=\"/tmp/alloy-logs/tracing.log\"} | json | level = `ERROR` | line_format \"{{.fields_message}}\"",
"queryType": "range",
"refId": "A"
}
],
"title": "Errors",
"type": "logs"
},
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 0
},
"id": 3,
"options": {
"dedupStrategy": "none",
"enableLogDetails": true,
"prettifyLogMessage": false,
"showCommonLabels": false,
"showLabels": false,
"showTime": false,
"sortOrder": "Descending",
"wrapLogMessage": false
},
"pluginVersion": "11.3.1",
"targets": [
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"editorMode": "code",
"expr": "{filename=\"/tmp/alloy-logs/tracing.log\"} | json | level = `WARN` | line_format \"{{.fields_message}}\"",
"queryType": "range",
"refId": "A"
}
],
"title": "Warnings",
"type": "logs"
},
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 8
},
"id": 2,
"options": {
"dedupStrategy": "none",
"enableLogDetails": true,
"prettifyLogMessage": false,
"showCommonLabels": false,
"showLabels": false,
"showTime": false,
"sortOrder": "Descending",
"wrapLogMessage": false
},
"pluginVersion": "11.3.1",
"targets": [
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"editorMode": "code",
"expr": "{filename=\"/tmp/alloy-logs/tracing.log\"} | json | level = `DEBUG` | line_format \"{{.fields_message}}\"",
"queryType": "range",
"refId": "A"
}
],
"title": "Debug",
"type": "logs"
},
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 8
},
"id": 4,
"options": {
"dedupStrategy": "none",
"enableLogDetails": true,
"prettifyLogMessage": false,
"showCommonLabels": false,
"showLabels": false,
"showTime": false,
"sortOrder": "Descending",
"wrapLogMessage": false
},
"pluginVersion": "11.3.1",
"targets": [
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"editorMode": "code",
"expr": "{filename=\"/tmp/alloy-logs/tracing.log\"} | json | level = `TRACE` | line_format \"{{.fields_message}}\"",
"queryType": "range",
"refId": "A"
}
],
"title": "Trace",
"type": "logs"
}
],
"schemaVersion": 40,
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-6h",
"to": "now"
},
"timepicker": {},
"timezone": "browser",
"title": "New dashboard",
"uid": "ceg90x34pqgowd",
"version": 4,
"weekStart": ""
}

62
docker/loki.yaml Normal file
View File

@ -0,0 +1,62 @@
# this is mostly the default config from grafana's website
auth_enabled: false
server:
http_listen_port: 3100
grpc_listen_port: 9096
log_level: info
grpc_server_max_concurrent_streams: 1000
common:
instance_addr: 127.0.0.1
path_prefix: /tmp/loki
storage:
filesystem:
chunks_directory: /tmp/loki/chunks
rules_directory: /tmp/loki/rules
replication_factor: 1
ring:
kvstore:
store: inmemory
query_range:
results_cache:
cache:
embedded_cache:
enabled: true
max_size_mb: 100
limits_config:
metric_aggregation_enabled: true
schema_config:
configs:
- from: 2020-10-24
store: tsdb
object_store: filesystem
schema: v13
index:
prefix: index_
period: 24h
pattern_ingester:
enabled: true
metric_aggregation:
loki_address: localhost:3100
frontend:
encoding: protobuf
# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration
# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/
#
# Statistics help us better understand how Loki is used, and they show us performance
# levels for most users. This helps us prioritize features and documentation.
# For more information on what's sent, look at
# https://github.com/grafana/loki/blob/main/pkg/analytics/stats.go
# Refer to the buildReport method to see what goes into a report.
#
# If you would like to disable reporting, uncomment the following lines:
analytics:
reporting_enabled: false

16
docker/prometheus.yaml Normal file
View File

@ -0,0 +1,16 @@
global:
scrape_interval: 5s
query_log_file: /etc/prometheus/query.log
scrape_configs:
- job_name: crawler
static_configs:
# change this your machine's ip, localhost won't work
# because localhost refers to the docker container.
- targets: ['192.168.8.209:2500']
- job_name: loki
static_configs:
- targets: ['loki:3100']
- job_name: prometheus
static_configs:
- targets: ['localhost:9090']

16
jsconfig.json Normal file
View File

@ -0,0 +1,16 @@
{
"compilerOptions": {
"module": "ESNext",
"moduleResolution": "Bundler",
"target": "ES2022",
"jsx": "react",
"allowImportingTsExtensions": true,
"strictNullChecks": true,
"strictFunctionTypes": true
},
"exclude": [
"node_modules",
"**/node_modules/*"
],
"typeAcquisition": {"include": ["firefox-webext-browser"]}
}

View File

@ -1,2 +0,0 @@
DEFINE TABLE website SCHEMALESS;
DEFINE FIELD accessed_at ON TABLE website VALUE time::now();

View File

@ -1,25 +1,36 @@
use std::fmt::Debug;
use metrics::counter;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use surrealdb::{ use surrealdb::{
engine::remote::ws::{Client, Ws}, engine::remote::ws::{Client, Ws}, error::Db, opt::auth::Root, sql::Thing, Response, Surreal
opt::auth::Root,
sql::Thing,
Response, Surreal,
}; };
use tracing::{error, instrument, trace, warn}; use tracing::{error, instrument, trace, warn};
use url::Url; use url::Url;
use crate::Timer; use crate::{Config, Timer};
#[derive(Debug, Serialize, Deserialize, Clone)] const ROUND_TRIP_METRIC: &'static str = "surql_trips";
const STORE: &'static str = "surql_store_calls";
const LINK: &'static str = "surql_link_calls";
#[derive(Serialize, Deserialize, Clone)]
pub struct Website { pub struct Website {
/// The url that this data is found at /// The url that this data is found at
site: Url, pub site: Url,
/// Wether or not this link has been crawled yet /// Wether or not this link has been crawled yet
pub crawled: bool, pub crawled: bool,
#[serde(skip_serializing)] #[serde(skip_serializing)]
id: Option<Thing>, id: Option<Thing>,
} }
// manual impl to make tracing look nicer
impl Debug for Website {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let site = (self.site.domain().unwrap_or("n/a")).to_string() + self.site.path();
f.debug_struct("Website").field("site", &site).finish()
}
}
impl Website { impl Website {
/// Creates a blank site (assumes that url param is site's root) /// Creates a blank site (assumes that url param is site's root)
pub fn new(url: &str, crawled: bool) -> Self { pub fn new(url: &str, crawled: bool) -> Self {
@ -39,12 +50,9 @@ impl Website {
self.crawled = true self.crawled = true
} }
pub fn mut_url(&mut self) -> &mut Url {
&mut self.site
}
#[instrument(skip_all)] #[instrument(skip_all)]
pub async fn links_to(&self, other: Vec<Thing>, db: &Surreal<Client>) { pub async fn links_to(&self, other: Vec<Thing>, db: &Surreal<Client>) {
let len = other.len(); let len = other.len();
if len == 0 {return} if len == 0 {return}
@ -55,6 +63,8 @@ impl Website {
let timer = Timer::start(&msg); let timer = Timer::start(&msg);
// prevent the timer from being dropped instantly. // prevent the timer from being dropped instantly.
let _ = timer; let _ = timer;
counter!(ROUND_TRIP_METRIC).increment(1);
counter!(LINK).increment(1);
match db match db
.query("COUNT(RELATE (SELECT id FROM website WHERE site = $in) -> links_to -> $out)") .query("COUNT(RELATE (SELECT id FROM website WHERE site = $in) -> links_to -> $out)")
.bind(("in", from)) .bind(("in", from))
@ -85,22 +95,29 @@ impl Website {
} }
} }
#[instrument(skip_all)] #[instrument(name = "surql_store", skip_all)]
pub async fn store(&mut self, db: &Surreal<Client>) -> Option<Thing> { pub async fn store(&self, db: &Surreal<Client>) -> Option<Thing> {
counter!(STORE).increment(1);
let counter = counter!(ROUND_TRIP_METRIC);
let t = Timer::start("Stored link");
let _ = t;
counter.increment(1);
// check if it's been gone thru before // check if it's been gone thru before
let mut response = db let mut response = db
.query("SELECT * FROM ONLY website WHERE site = $site LIMIT 1") .query("SELECT * FROM ONLY website WHERE site = $site LIMIT 1")
.bind(("site", self.site.to_string())) .bind(("site", self.site.to_string()))
.await .await
.unwrap(); .expect("Failed to check surreal for duplicates!");
if let Some(old) = response.take::<Option<Website>>(0).unwrap() { if let Some(old) = response.take::<Option<Website>>(0).expect("Failed to read response from surreal for duplicates.") {
// site exists already // site exists already
if let Some(id) = old.id { if let Some(id) = old.id {
// make sure to preserve the "crawled status" // make sure to preserve the "crawled status"
let mut new = self.clone(); let mut new = self.clone();
new.crawled = old.crawled | new.crawled; new.crawled = old.crawled | new.crawled;
counter.increment(1);
// update the record // update the record
match db.upsert((id.tb, id.id.to_string())).content(new).await { match db.upsert((id.tb, id.id.to_string())).content(new).await {
Ok(e) => { Ok(e) => {
@ -110,11 +127,23 @@ impl Website {
} }
} }
Err(e) => { Err(e) => {
error!("{}", e); match e {
surrealdb::Error::Db(error) => {
match error {
Db::QueryCancelled => todo!(),
Db::QueryNotExecuted => todo!(),
Db::QueryNotExecutedDetail { message: _ } => todo!(),
_=>{},
}
},
_=>{},
}
// error!("{}", e);
} }
}; };
} }
} else { } else {
counter.increment(1);
// sites hasn't existed yet // sites hasn't existed yet
match db.create("website").content(self.clone()).await { match db.create("website").content(self.clone()).await {
Ok(e) => { Ok(e) => {
@ -149,19 +178,30 @@ pub struct Record {
pub id: Thing, pub id: Thing,
} }
pub async fn connect() -> surrealdb::Result<Surreal<Client>> { #[instrument(skip_all, name = "SurrealDB")]
pub async fn connect(config: &Config) -> surrealdb::Result<Surreal<Client>> {
trace!("Establishing connection to surreal...");
// Connect to the server // Connect to the server
let db = Surreal::new::<Ws>("127.0.0.1:8000").await?; let db = Surreal::new::<Ws>(&config.surreal_url).await?;
trace!("Logging in...");
// Signin as a namespace, database, or root user // Signin as a namespace, database, or root user
db.signin(Root { db.signin(Root {
username: "root", username: &config.surreal_username,
password: "root", password: &config.surreal_password,
}) })
.await?; .await?;
// Select a specific namespace / database // Select a specific namespace / database
db.use_ns("test").use_db("v1.2").await?; db
.use_ns(&config.surreal_ns)
.use_db(&config.surreal_db)
.await?;
let setup = include_bytes!("setup.surql");
let file = setup.iter().map(|c| *c as char).collect::<String>();
db.query(file).await.expect("Failed to setup surreal tables.");
Ok(db) Ok(db)
} }

View File

@ -1,60 +1,125 @@
#![feature(ip_from)]
extern crate html5ever; extern crate html5ever;
extern crate markup5ever_rcdom as rcdom;
use std::{fs::File, io::Read, net::{IpAddr, Ipv4Addr}, time::Instant};
use db::{connect, Website}; use db::{connect, Website};
use html5ever::{ use metrics::{counter, gauge};
local_name, parse_document, tendril::TendrilSink, tree_builder::TreeBuilderOpts, ParseOpts, use metrics_exporter_prometheus::PrometheusBuilder;
}; use s3::S3;
use rcdom::RcDom; use serde::Deserialize;
use std::time::Instant; use surrealdb::{engine::remote::ws::Client, Surreal};
use surrealdb::{engine::remote::ws::Client, sql::Thing, Surreal}; use tokio::task::JoinSet;
use tracing::{debug, info, instrument, trace, trace_span}; use tracing::{debug, info, instrument, trace, trace_span, warn};
use tracing_subscriber::EnvFilter; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry};
mod db; mod db;
mod parser;
mod s3;
const GET_METRIC: &'static str = "total_gets";
const GET_IN_FLIGHT: &'static str = "gets_in_flight";
const SITES_CRAWLED: &'static str = "pages_crawled";
const BEING_PROCESSED: &'static str = "pages_being_processed";
#[derive(Deserialize)]
struct Config {
surreal_ns: String,
surreal_db: String,
surreal_url: String,
surreal_username: String,
surreal_password: String,
s3_url: String,
s3_bucket: String,
s3_access_key: String,
s3_secret_key: String,
crawl_filter: String,
budget: usize,
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
tracing_subscriber::fmt() let total_runtime = Timer::start("Completed");
.with_env_filter(EnvFilter::from_default_env())
.with_line_number(true)
.without_time()
.init();
debug!("Starting...");
let writer = std::fs::OpenOptions::new()
.append(true)
.create(true)
.open("./docker/logs/tracing.log")
.expect("Couldn't make log file!");
let registry = Registry::default()
.with(
fmt::layer()
.with_line_number(true)
.with_thread_ids(true)
.with_file(true)
// .with_timer(LocalTime::rfc_3339()) // Loki or alloy does this automatically
.json()
.with_writer(writer)
// .with_filter(EnvFilter::from_default_env())
);
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
let builder = PrometheusBuilder::new();
builder.with_http_listener(
std::net::SocketAddr::new(IpAddr::V4(Ipv4Addr::from_octets([0,0,0,0])), 2500)
)
.install()
.expect("failed to install recorder/exporter");
debug!("Starting...");
// Would probably take these in as parameters from a cli // Would probably take these in as parameters from a cli
let url = "https://oliveratkinson.net/"; let starting_url = "https://en.wikipedia.org/";
// let url = "http://localhost:5500"; // When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
let budget = 1000; // let crawl_filter = "en.wikipedia.org/";
// let budget = 50;
let mut crawled = 0; let mut crawled = 0;
let db = connect().await.expect("Failed to connect to db, aborting.");
let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml");
let mut buf = String::new();
let _ = file.read_to_string(&mut buf);
let client = reqwest::Client::builder() let config: Config = toml::from_str(&buf).expect("Failed to parse Crawler.toml");
let db = connect(&config)
.await
.expect("Failed to connect to surreal, aborting.");
let s3 = S3::connect(&config)
.await
.expect("Failed to connect to minio, aborting.\n\nThis probably means you need to login to the minio console and get a new access key!\n\n(Probably here) http://localhost:9001/access-keys/new-account\n\n");
let reqwest = reqwest::Client::builder()
// .use_rustls_tls() // .use_rustls_tls()
.gzip(true)
.build() .build()
.unwrap(); .expect("Failed to build reqwest client.");
// Kick off the whole machine - This Website object doesn't matter, it's just to allow for // Kick off the whole machine - This Website object doesn't matter, it's just to allow for
// get() to work. // get() to work.
let span = trace_span!("Pre-Loop"); let span = trace_span!("Pre-Loop");
let pre_loop_span = span.enter(); let pre_loop_span = span.enter();
// Download the site // Download the site
let mut site = Website::new(&url, false); let site = Website::new(&starting_url, false);
get(&mut site, &db, &client, &mut crawled).await; get(site, db.clone(), reqwest.clone(), s3.clone()).await;
drop(pre_loop_span); drop(pre_loop_span);
let span = trace_span!("Loop"); let span = trace_span!("Loop");
let span = span.enter(); let span = span.enter();
while crawled < budget { while crawled < config.budget {
let get_num = if budget - crawled < 100 { let get_num = if config.budget - crawled < 100 {
budget - crawled config.budget - crawled
} else { } else {
100 100
}; };
let uncrawled = get_uncrawled_links(&db, get_num).await; let uncrawled = get_uncrawled_links(&db, get_num, config.crawl_filter.clone()).await;
if uncrawled.len() == 0 { if uncrawled.len() == 0 {
info!("Had more budget but finished crawling everything."); info!("Had more budget but finished crawling everything.");
return; return;
@ -64,136 +129,78 @@ async fn main() {
let span = trace_span!("Crawling"); let span = trace_span!("Crawling");
let _ = span.enter(); let _ = span.enter();
for mut site in uncrawled { {
get(&mut site, &db, &client, &mut crawled).await; let mut futures = JoinSet::new();
let percent = format!("{:.2}%", (crawled as f32 / budget as f32) * 100f32); for site in uncrawled {
info!("Crawled {crawled} out of {budget} pages. ({percent})"); gauge!(BEING_PROCESSED).increment(1);
futures.spawn(get(site, db.clone(), reqwest.clone(), s3.clone()));
// let percent = format!("{:.2}%", (crawled as f32 / budget as f32) * 100f32);
// info!("Crawled {crawled} out of {budget} pages. ({percent})");
}
debug!("Joining {} futures...", futures.len());
let c = counter!(SITES_CRAWLED);
// As futures complete runs code in while block
while let Some(_) = futures.join_next().await {
c.increment(1);
gauge!(BEING_PROCESSED).decrement(1);
crawled += 1;
}
} }
} }
drop(span); drop(span);
info!("Done"); info!("Done");
drop(total_runtime);
} }
#[instrument(skip_all)] #[instrument(skip (db, s3, reqwest))]
/// A quick helper function for downloading a url /// Downloads and crawls and stores a webpage.
async fn get( /// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver
site: &mut Website, async fn get(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client, s3: S3) {
db: &Surreal<Client>,
request_client: &reqwest::Client,
count: &mut usize,
) {
trace!("Get: {}", site.to_string()); trace!("Get: {}", site.to_string());
let timer = Timer::start("Built request");
let request_builder = reqwest.get(site.to_string());
timer.stop();
let g = gauge!(GET_IN_FLIGHT);
g.increment(1);
let timer = Timer::start("Got page"); let timer = Timer::start("Got page");
if let Ok(response) = request_client.get(site.to_string()).send().await { if let Ok(response) = request_builder.send().await {
timer.stop(); timer.stop();
g.decrement(1);
counter!(GET_METRIC).increment(1);
debug!("Getting body...");
// Get body // Get body
let data = response.text().await.unwrap(); let data = response.text().await.expect("Failed to read http response's body!");
let opts = ParseOpts { // Store document
tree_builder: TreeBuilderOpts { s3.store(&data, &site.site).await;
drop_doctype: true, // Parse document and store relationships
..Default::default() parser::parse(&db, &mut site, &data).await;
}, return;
..Default::default()
};
// Get DOM
let dom = parse_document(RcDom::default(), opts)
.from_utf8()
.read_from(&mut data.as_bytes())
.unwrap();
// TODO save the dom to minio if a flag is set
// Modify record in database
site.set_crawled();
site.store(db).await;
trace!("Got: {}", site.to_string());
// Walk all the children nodes, searching for links to other pages.
let mut buffer = Vec::new();
let timer = Timer::start("Walked");
walk(&dom.document, &db, &site, &mut buffer).await;
timer.stop();
// Put all the found links into the database.
site.links_to(buffer, &db).await;
*count += 1;
} }
trace!("Failed to get: {}", site.to_string()); trace!("Failed to get: {}", site.to_string());
} }
/// Walks the givin site, placing it's findings in the database
async fn walk(
node: &rcdom::Handle,
db: &Surreal<Client>,
site: &Website,
links_to: &mut Vec<Thing>,
) {
let span = trace_span!("Walk");
let span = span.enter();
// Match each node - node basically means element.
match &node.data {
rcdom::NodeData::Element { name, attrs, .. } => {
for attr in attrs.borrow().clone() {
match name.local {
local_name!("a")
| local_name!("audio")
| local_name!("area")
| local_name!("img")
| local_name!("link")
| local_name!("object")
| local_name!("source")
| local_name!("base")
| local_name!("video") => {
let attribute_name = attr.name.local.to_string();
if attribute_name == "src"
|| attribute_name == "href"
|| attribute_name == "data"
{
// Get clone of the current site object
let mut web = site.clone();
// Set url
let url = web.mut_url();
url.set_fragment(None); // removes #xyz
let joined = url.join(&attr.value).unwrap();
*url = joined;
// Set other attributes
web.crawled = false;
// TODO set element name
// let element_name = name.local.to_string();
if let Some(id) = web.store(db).await {
links_to.push(id);
}
}
}
local_name!("button") | local_name!("meta") | local_name!("iframe") => {
// dbg!(attrs);
}
_ => {}
};
}
}
_ => {}
};
drop(span);
for child in node.children.borrow().iter() {
Box::pin(walk(child, db, site, links_to)).await;
}
}
/// Returns uncrawled links /// Returns uncrawled links
async fn get_uncrawled_links(db: &Surreal<Client>, mut count: usize) -> Vec<Website> { #[instrument(skip(db))]
async fn get_uncrawled_links(
db: &Surreal<Client>,
mut count: usize,
filter: String,
) -> Vec<Website> {
if count > 100 { if count > 100 {
count = 100 count = 100
} }
debug!("Getting uncrawled links");
let mut response = db let mut response = db
.query("SELECT * FROM website WHERE crawled = false LIMIT $count") .query("SELECT * FROM website WHERE crawled = false AND site ~ type::string($format) LIMIT $count;")
.bind(("format", filter))
.bind(("count", count)) .bind(("count", count))
.await .await
.expect("Hard-coded query failed..?"); .expect("Hard-coded query failed..?");
@ -218,7 +225,13 @@ impl<'a> Timer<'a> {
pub fn stop(&self) -> f64 { pub fn stop(&self) -> f64 {
let dif = self.start.elapsed().as_micros(); let dif = self.start.elapsed().as_micros();
let ms = dif as f64 / 1000.; let ms = dif as f64 / 1000.;
debug!("{}", format!("{} in {:.3}ms", self.msg, ms));
if ms > 200. {
warn!("{}", format!("{} in {:.3}ms", self.msg, ms));
} else {
trace!("{}", format!("{} in {:.3}ms", self.msg, ms));
}
ms ms
} }
} }

109
src/parser.rs Normal file
View File

@ -0,0 +1,109 @@
use std::default::Default;
use std::str::FromStr;
use html5ever::tokenizer::{BufferQueue, TokenizerResult};
use html5ever::tokenizer::{StartTag, TagToken};
use html5ever::tokenizer::{Token, TokenSink, TokenSinkResult, Tokenizer, TokenizerOpts};
use html5ever::{local_name, tendril::*};
use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal;
use tracing::instrument;
use crate::db::Website;
use crate::Timer;
impl TokenSink for Website {
type Handle = Vec<Website>;
fn process_token(&self, token: Token, _line_number: u64) -> TokenSinkResult<Self::Handle> {
match token {
TagToken(tag) => {
if tag.kind == StartTag {
match tag.name {
local_name!("a")
| local_name!("audio")
| local_name!("area")
| local_name!("img")
| local_name!("link")
| local_name!("object")
| local_name!("source")
| local_name!("base")
| local_name!("video") => {
let mut links = Vec::new();
for attr in &tag.attrs {
let attr_name = attr.name.local.to_string();
if attr_name == "src" || attr_name == "href" || attr_name == "data"
{
// Get clone of the current site object
let mut web = self.clone();
// Set url
let mut url = web.site;
url.set_fragment(None); // removes #xyz
let joined = url.join(&attr.value).expect("Failed to join url during parsing!");
web.site = joined;
web.crawled = false;
links.push(web);
}
}
return TokenSinkResult::Script(links);
}
local_name!("button") | local_name!("meta") | local_name!("iframe") => {
// dbg!(attrs);
}
_ => {}
}
}
}
_ => {}
}
TokenSinkResult::Continue
}
}
#[instrument(skip_all)]
pub async fn parse(db: &Surreal<Client>, site: &mut Website, data: &str) {
// update self in db
site.set_crawled();
site.store(db).await;
// prep work
let mut other_sites: Vec<Website> = Vec::new();
{ // using blocks to prevent compiler's async worries
let _t = Timer::start("Parsed page");
// change data into something that can be tokenized
let chunk = Tendril::from_str(&data).expect("Failed to parse string into Tendril!");
// create buffer of tokens and push our input into it
let mut token_buffer = BufferQueue::default();
token_buffer.push_back(chunk.try_reinterpret::<fmt::UTF8>().expect("Failed to reinterprt chunk!"));
// create the tokenizer
let tokenizer = Tokenizer::new(site.clone(), TokenizerOpts::default());
// go thru buffer
while let TokenizerResult::Script(mut sites) = tokenizer.feed(&mut token_buffer) {
other_sites.append(&mut sites);
// other_sites.push(sites);
}
assert!(token_buffer.is_empty());
tokenizer.end();
}
{
let mut links_to = Vec::with_capacity(other_sites.len());
for a in other_sites {
let other = a.store(db).await;
if let Some(o) = other {
links_to.push(o);
}
}
site.links_to(links_to, db).await;
}
}

105
src/s3.rs Normal file
View File

@ -0,0 +1,105 @@
use base64::{alphabet, engine::{self, general_purpose}, Engine};
use metrics::counter;
use minio::s3::{
args::{BucketExistsArgs, MakeBucketArgs},
client::ClientBuilder,
creds::StaticProvider,
error::Error,
http::BaseUrl,
Client,
};
use tracing::{instrument, trace, warn};
use url::Url;
use crate::{Config, Timer};
const CUSTOM_ENGINE: engine::GeneralPurpose = engine::GeneralPurpose::new(&alphabet::URL_SAFE, general_purpose::NO_PAD);
const ROUND_TRIP_METRIC: &'static str = "s3_trips";
#[derive(Clone)]
pub struct S3 {
bucket_name: String,
client: Client,
}
impl S3 {
#[instrument(skip_all, name = "S3")]
pub async fn connect(config: &Config) -> Result<Self, Error> {
let base_url = config
.s3_url
.parse::<BaseUrl>()
.expect("Failed to parse url into BaseUrl");
let static_provider =
StaticProvider::new(&config.s3_access_key, &config.s3_secret_key, None);
let client = ClientBuilder::new(base_url)
.provider(Some(Box::new(static_provider)))
.build()?;
trace!("Checking bucket...");
let exists = client
.bucket_exists(
&BucketExistsArgs::new(&config.s3_bucket)
.expect("Failed to check if bucket exists"),
)
.await?;
if !exists {
trace!("Creating bucket...");
client
.make_bucket(
&MakeBucketArgs::new(&config.s3_bucket).expect("Failed to create bucket!"),
)
.await?;
}
trace!("Connection successful");
Ok(Self {
bucket_name: config.s3_bucket.to_owned(),
client: client,
})
}
#[instrument(name = "s3_store", skip_all)]
pub async fn store(&self, data: &str, url: &Url) {
let counter = counter!(ROUND_TRIP_METRIC);
let t = Timer::start("Stored page");
let _ = t; // prevent compiler drop
if let Some(domain) = url.domain() {
let filename = domain.to_owned() + url.path();
trace!("Created filename: {filename} from raw: {}", url.to_string());
counter.increment(1);
let _ = match &self
.client
.put_object_content(&self.bucket_name, &filename, data.to_owned())
.send()
.await {
Ok(_) => {},
Err(err) => {
match err {
Error::InvalidObjectName(_) => {
warn!("Tried storing invalid object name, retrying with Base64 encoding. Last try.");
let filename: String = domain.to_owned() + &CUSTOM_ENGINE.encode(url.path());
counter.increment(1);
let _ = &self
.client
.put_object_content(&self.bucket_name, &filename, data.to_owned())
.send()
.await
.unwrap();
},
_ => {},
}
},
};
}
}
}

3
src/setup.surql Normal file
View File

@ -0,0 +1,3 @@
DEFINE TABLE IF NOT EXISTS website SCHEMALESS;
DEFINE FIELD IF NOT EXISTS accessed_at ON TABLE website VALUE time::now();
DEFINE INDEX IF NOT EXISTS idx ON TABLE website COLUMNS site UNIQUE;