Compare commits

...

20 Commits

Author SHA1 Message Date
135a7e4957 Merge pull request 'multithreading' (#2) from multithreading into main
Reviewed-on: #2
2025-03-19 05:00:59 +00:00
9aa34b3eee epic metrics 2025-03-19 04:59:50 +00:00
de80418c00 better logging 2025-03-18 16:09:46 -06:00
e3e4175f51 logging improvements 2025-03-18 15:25:56 -06:00
d11e7dd27c the biggest 1 line improvement ever 2025-03-18 15:25:40 -06:00
f2a3e836a0 spelling and clippy 2025-03-18 15:08:29 -06:00
3b4e6a40ce minimize vec resizing 2025-03-18 15:07:50 -06:00
bd0b946245 fixed tracing 2025-03-18 15:02:32 -06:00
b7540a4680 checkpoint - onto profiling 2025-03-18 10:53:06 -06:00
Oliver Atkinson
82929fd0fc updating for base64 2024-12-13 13:28:24 -07:00
Oliver Atkinson
f42e770a10 moved to other repo 2024-12-13 11:01:35 -07:00
Oliver Atkinson
611a1e923b starting on the extension 2024-12-12 15:32:04 -07:00
Oliver Atkinson
298ad39a79 rename 2024-12-12 14:59:54 -07:00
Oliver Atkinson
215056e493 use contains operator for better output 2024-12-12 14:26:49 -07:00
Oliver Atkinson
22be3b2f61 updating deps 2024-12-12 14:14:38 -07:00
Oliver Atkinson
c1c8cf07bb unifed settings for testing 2024-12-12 11:42:07 -07:00
0f8a3d7215 using a custom parser now :) 2024-11-12 23:08:09 -07:00
574a370f30 readme updates 2024-11-12 21:24:57 -07:00
eaa79b749e prepare get function for s3 2024-11-12 21:19:05 -07:00
2c28d69d55 add s3 support 2024-11-12 21:03:58 -07:00
20 changed files with 2412 additions and 754 deletions

4
.gitignore vendored
View File

@ -1,5 +1,7 @@
/target
/.surrealdb
/.minio
perf.data
flamegraph.svg
perf.data.old
perf.data.old
/docker/logs/*

2
.vscode/launch.json vendored
View File

@ -9,7 +9,7 @@
"request": "launch",
"name": "Debug executable 'surreal_spider'",
"env": {
"RUST_LOG": "surreal_spider=debug,reqwest=info",
"RUST_LOG": "surreal_spider=trace,reqwest=info",
},
"cargo": {
"args": [

2061
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -4,12 +4,17 @@ version = "0.1.0"
edition = "2021"
[dependencies]
html5ever = "0.29.0"
markup5ever_rcdom = "0.5.0-unofficial"
reqwest = "0.12.9"
serde = { version = "1.0.214", features = ["derive"] }
surrealdb = "2.0.4"
base64 = "0.22.1"
html5ever = "0.29"
metrics = "0.24.1"
metrics-exporter-prometheus = { version = "0.16.2", features=["http-listener"]}
# minio = "0.1.0"
minio = {git="https://github.com/minio/minio-rs.git", rev = "c28f576"}
reqwest = { version = "0.12", features = ["gzip", "default", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
surrealdb = "2.2"
tokio = { version="1.41.0", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = { version = "2.5.3", features = ["serde"] }
toml = "0.8.20"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "local-time", "json"] }
url = { version = "2.5", features = ["serde"] }

16
Crawler.toml Normal file
View File

@ -0,0 +1,16 @@
# Surreal config
surreal_url = "localhost:8000"
surreal_username = "root"
surreal_password = "root"
surreal_ns = "test"
surreal_db = "v1.12"
# Minio config
s3_bucket = "v1.12"
s3_url = "http://localhost:9000"
s3_access_key = "jLDPKGuu513VENc8kJwX"
s3_secret_key = "4T1nymEzsGYOlKSAb1WX7V3stnQn9a5ZoTQjDfcL"
# Crawler config
crawl_filter = "en.wikipedia.com"
budget = 200

View File

@ -1,23 +1,11 @@
# Surreal Crawler
Mapping with a budget of 1000 (crawl 1000 sites, so many more links are actually discovered), on [my webiste](https://oliveratkinson.net) on 8/26/2024 took 1m9s.
Crawls sites saving all the found links to a surrealdb database. It then proceeds to take batches of 100 uncrawled links untill the crawl budget is reached. It saves the data of each site in a minio database.
This is including the crawl and loading into the database and linking sites. (Locally hosted surreal db instance)
This run created 4299 site links with 23286 links between the sites. (It found my this git site which really bolsters those numbers.)
## Install / Build
* You will need rust to compile the crawler [rustup.rs](https://rustup.rs)
* You need python3 (will come installed on most linux distros) and poetry for dependancy management.
* Install `pipx`, `python3`
* Then: `pipx install poetry`
* Then: `poetry install` to install the project dependancies
* You need to install [surrealdb](https://surrealdb.com)
## Use
Just run `./crawl.sh {url}` and it will start crawling. You can tweak the budget inside [crawl.sh](https://git.oliveratkinson.net/Oliver/internet_mapper/src/branch/main/crawl.sh) if you want.
You can also prefix the command with `time` to benchmark the system, such as: `time ./crawl.sh https://discord.com`.
### TODO
- [ ] Domain filtering - prevent the crawler from going on alternate versions of wikipedia.
- [ ] Conditionally save content - based on filename or file contents
- [ ] GUI / TUI ?
- [ ] Better asynchronous getting of the sites. Currently it all happens serially.

View File

@ -1,16 +0,0 @@
services:
db:
image: surrealdb/surrealdb:latest-dev
ports:
- 8000:8000
volumes:
- ./.surrealdb/:/mydata
command:
- start
- --log
- debug
- --user
- root
- --pass
- root
- rocksdb:/mydata/database.db

14
docker/alloy.conf Normal file
View File

@ -0,0 +1,14 @@
local.file_match "tmplogs" {
path_targets = [{"__path__" = "/tmp/alloy-logs/*.log"}]
}
loki.source.file "local_files" {
targets = local.file_match.tmplogs.targets
forward_to = [loki.write.local_loki.receiver]
}
loki.write "local_loki" {
endpoint {
url = "http://loki:3100/loki/api/v1/push"
}
}

83
docker/compose.yml Normal file
View File

@ -0,0 +1,83 @@
services:
surreal:
image: surrealdb/surrealdb:latest-dev
ports:
- 8000:8000
volumes:
- surrealdb_storage:/mydata
command:
- start
- --log
- debug
- --user
- root
- --pass
- root
- rocksdb:/mydata/database.db
minio:
image: quay.io/minio/minio
ports:
- 9000:9000
- 9001:9001
environment:
- MINIO_ROOT_USER=root
- MINIO_ROOT_PASSWORD=an8charpassword
volumes:
- minio_storage:/data
command:
- server
- /data
- --console-address
- ":9001"
alloy:
image: grafana/alloy:latest
ports:
- 12345:12345
volumes:
# if you change this, you also need to change it in the alloy config file
- ./logs/:/tmp/alloy-logs
- ./alloy.conf:/etc/alloy/config.alloy
- alloy_storage:/var/lib/alloy
command: run --server.http.listen-addr=0.0.0.0:12345 --storage.path=/var/lib/alloy/data /etc/alloy/config.alloy
#logs
loki:
image: grafana/loki:latest
ports:
- 3100:3100
command: -config.file=/etc/loki/local-config.yaml
volumes:
- ./loki.yaml:/etc/loki/local-config.yaml
# Metrics collector
prometheus:
image: prom/prometheus:latest
expose:
- 9090
volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml
# persist data
- prometheus_storage:/prometheus
command: --web.enable-lifecycle --config.file=/etc/prometheus/prometheus.yml
# Everything viewer
grafana:
image: grafana/grafana:latest
volumes:
- ./grafana.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
- grafana_storage:/var/lib/grafana
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
- GF_AUTH_DISABLE_LOGIN_FORM=true
- GF_FEATURE_TOGGLES_ENABLE=traceqlEditor
ports:
- 3000:3000
volumes:
prometheus_storage:
grafana_storage:
alloy_storage:
surrealdb_storage:
minio_storage:

24
docker/grafana.yaml Normal file
View File

@ -0,0 +1,24 @@
apiVersion: 1
datasources:
- name: Loki
type: loki
access: proxy
orgId: 1
url: http://loki:3100
basicAuth: false
isDefault: true
version: 1
editable: false
- name: Prometheus
type: prometheus
uid: prometheus
access: proxy
orgId: 1
url: http://prometheus:9090
basicAuth: false
isDefault: false
version: 1
editable: false
jsonData:
httpMethod: GET

View File

@ -0,0 +1,223 @@
{
"__inputs": [
{
"name": "DS_LOKI",
"label": "Loki",
"description": "",
"type": "datasource",
"pluginId": "loki",
"pluginName": "Loki"
}
],
"__elements": {},
"__requires": [
{
"type": "grafana",
"id": "grafana",
"name": "Grafana",
"version": "11.3.1"
},
{
"type": "panel",
"id": "logs",
"name": "Logs",
"version": ""
},
{
"type": "datasource",
"id": "loki",
"name": "Loki",
"version": "1.0.0"
}
],
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": null,
"links": [],
"panels": [
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 0
},
"id": 1,
"options": {
"dedupStrategy": "none",
"enableLogDetails": true,
"prettifyLogMessage": false,
"showCommonLabels": false,
"showLabels": false,
"showTime": false,
"sortOrder": "Descending",
"wrapLogMessage": false
},
"pluginVersion": "11.3.1",
"targets": [
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"editorMode": "code",
"expr": "{filename=\"/tmp/alloy-logs/tracing.log\"} | json | level = `ERROR` | line_format \"{{.fields_message}}\"",
"queryType": "range",
"refId": "A"
}
],
"title": "Errors",
"type": "logs"
},
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 0
},
"id": 3,
"options": {
"dedupStrategy": "none",
"enableLogDetails": true,
"prettifyLogMessage": false,
"showCommonLabels": false,
"showLabels": false,
"showTime": false,
"sortOrder": "Descending",
"wrapLogMessage": false
},
"pluginVersion": "11.3.1",
"targets": [
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"editorMode": "code",
"expr": "{filename=\"/tmp/alloy-logs/tracing.log\"} | json | level = `WARN` | line_format \"{{.fields_message}}\"",
"queryType": "range",
"refId": "A"
}
],
"title": "Warnings",
"type": "logs"
},
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 8
},
"id": 2,
"options": {
"dedupStrategy": "none",
"enableLogDetails": true,
"prettifyLogMessage": false,
"showCommonLabels": false,
"showLabels": false,
"showTime": false,
"sortOrder": "Descending",
"wrapLogMessage": false
},
"pluginVersion": "11.3.1",
"targets": [
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"editorMode": "code",
"expr": "{filename=\"/tmp/alloy-logs/tracing.log\"} | json | level = `DEBUG` | line_format \"{{.fields_message}}\"",
"queryType": "range",
"refId": "A"
}
],
"title": "Debug",
"type": "logs"
},
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 8
},
"id": 4,
"options": {
"dedupStrategy": "none",
"enableLogDetails": true,
"prettifyLogMessage": false,
"showCommonLabels": false,
"showLabels": false,
"showTime": false,
"sortOrder": "Descending",
"wrapLogMessage": false
},
"pluginVersion": "11.3.1",
"targets": [
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"editorMode": "code",
"expr": "{filename=\"/tmp/alloy-logs/tracing.log\"} | json | level = `TRACE` | line_format \"{{.fields_message}}\"",
"queryType": "range",
"refId": "A"
}
],
"title": "Trace",
"type": "logs"
}
],
"schemaVersion": 40,
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-6h",
"to": "now"
},
"timepicker": {},
"timezone": "browser",
"title": "New dashboard",
"uid": "ceg90x34pqgowd",
"version": 4,
"weekStart": ""
}

62
docker/loki.yaml Normal file
View File

@ -0,0 +1,62 @@
# this is mostly the default config from grafana's website
auth_enabled: false
server:
http_listen_port: 3100
grpc_listen_port: 9096
log_level: info
grpc_server_max_concurrent_streams: 1000
common:
instance_addr: 127.0.0.1
path_prefix: /tmp/loki
storage:
filesystem:
chunks_directory: /tmp/loki/chunks
rules_directory: /tmp/loki/rules
replication_factor: 1
ring:
kvstore:
store: inmemory
query_range:
results_cache:
cache:
embedded_cache:
enabled: true
max_size_mb: 100
limits_config:
metric_aggregation_enabled: true
schema_config:
configs:
- from: 2020-10-24
store: tsdb
object_store: filesystem
schema: v13
index:
prefix: index_
period: 24h
pattern_ingester:
enabled: true
metric_aggregation:
loki_address: localhost:3100
frontend:
encoding: protobuf
# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration
# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/
#
# Statistics help us better understand how Loki is used, and they show us performance
# levels for most users. This helps us prioritize features and documentation.
# For more information on what's sent, look at
# https://github.com/grafana/loki/blob/main/pkg/analytics/stats.go
# Refer to the buildReport method to see what goes into a report.
#
# If you would like to disable reporting, uncomment the following lines:
analytics:
reporting_enabled: false

16
docker/prometheus.yaml Normal file
View File

@ -0,0 +1,16 @@
global:
scrape_interval: 5s
query_log_file: /etc/prometheus/query.log
scrape_configs:
- job_name: crawler
static_configs:
# change this your machine's ip, localhost won't work
# because localhost refers to the docker container.
- targets: ['192.168.8.209:2500']
- job_name: loki
static_configs:
- targets: ['loki:3100']
- job_name: prometheus
static_configs:
- targets: ['localhost:9090']

16
jsconfig.json Normal file
View File

@ -0,0 +1,16 @@
{
"compilerOptions": {
"module": "ESNext",
"moduleResolution": "Bundler",
"target": "ES2022",
"jsx": "react",
"allowImportingTsExtensions": true,
"strictNullChecks": true,
"strictFunctionTypes": true
},
"exclude": [
"node_modules",
"**/node_modules/*"
],
"typeAcquisition": {"include": ["firefox-webext-browser"]}
}

View File

@ -1,2 +0,0 @@
DEFINE TABLE website SCHEMALESS;
DEFINE FIELD accessed_at ON TABLE website VALUE time::now();

View File

@ -1,25 +1,36 @@
use std::fmt::Debug;
use metrics::counter;
use serde::{Deserialize, Serialize};
use surrealdb::{
engine::remote::ws::{Client, Ws},
opt::auth::Root,
sql::Thing,
Response, Surreal,
engine::remote::ws::{Client, Ws}, error::Db, opt::auth::Root, sql::Thing, Response, Surreal
};
use tracing::{error, instrument, trace, warn};
use url::Url;
use crate::Timer;
use crate::{Config, Timer};
#[derive(Debug, Serialize, Deserialize, Clone)]
const ROUND_TRIP_METRIC: &'static str = "surql_trips";
const STORE: &'static str = "surql_store_calls";
const LINK: &'static str = "surql_link_calls";
#[derive(Serialize, Deserialize, Clone)]
pub struct Website {
/// The url that this data is found at
site: Url,
pub site: Url,
/// Wether or not this link has been crawled yet
pub crawled: bool,
#[serde(skip_serializing)]
id: Option<Thing>,
}
// manual impl to make tracing look nicer
impl Debug for Website {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let site = (self.site.domain().unwrap_or("n/a")).to_string() + self.site.path();
f.debug_struct("Website").field("site", &site).finish()
}
}
impl Website {
/// Creates a blank site (assumes that url param is site's root)
pub fn new(url: &str, crawled: bool) -> Self {
@ -39,12 +50,9 @@ impl Website {
self.crawled = true
}
pub fn mut_url(&mut self) -> &mut Url {
&mut self.site
}
#[instrument(skip_all)]
pub async fn links_to(&self, other: Vec<Thing>, db: &Surreal<Client>) {
let len = other.len();
if len == 0 {return}
@ -55,6 +63,8 @@ impl Website {
let timer = Timer::start(&msg);
// prevent the timer from being dropped instantly.
let _ = timer;
counter!(ROUND_TRIP_METRIC).increment(1);
counter!(LINK).increment(1);
match db
.query("COUNT(RELATE (SELECT id FROM website WHERE site = $in) -> links_to -> $out)")
.bind(("in", from))
@ -85,22 +95,29 @@ impl Website {
}
}
#[instrument(skip_all)]
pub async fn store(&mut self, db: &Surreal<Client>) -> Option<Thing> {
#[instrument(name = "surql_store", skip_all)]
pub async fn store(&self, db: &Surreal<Client>) -> Option<Thing> {
counter!(STORE).increment(1);
let counter = counter!(ROUND_TRIP_METRIC);
let t = Timer::start("Stored link");
let _ = t;
counter.increment(1);
// check if it's been gone thru before
let mut response = db
.query("SELECT * FROM ONLY website WHERE site = $site LIMIT 1")
.bind(("site", self.site.to_string()))
.await
.unwrap();
.expect("Failed to check surreal for duplicates!");
if let Some(old) = response.take::<Option<Website>>(0).unwrap() {
if let Some(old) = response.take::<Option<Website>>(0).expect("Failed to read response from surreal for duplicates.") {
// site exists already
if let Some(id) = old.id {
// make sure to preserve the "crawled status"
let mut new = self.clone();
new.crawled = old.crawled | new.crawled;
counter.increment(1);
// update the record
match db.upsert((id.tb, id.id.to_string())).content(new).await {
Ok(e) => {
@ -110,11 +127,23 @@ impl Website {
}
}
Err(e) => {
error!("{}", e);
match e {
surrealdb::Error::Db(error) => {
match error {
Db::QueryCancelled => todo!(),
Db::QueryNotExecuted => todo!(),
Db::QueryNotExecutedDetail { message: _ } => todo!(),
_=>{},
}
},
_=>{},
}
// error!("{}", e);
}
};
}
} else {
counter.increment(1);
// sites hasn't existed yet
match db.create("website").content(self.clone()).await {
Ok(e) => {
@ -149,19 +178,30 @@ pub struct Record {
pub id: Thing,
}
pub async fn connect() -> surrealdb::Result<Surreal<Client>> {
#[instrument(skip_all, name = "SurrealDB")]
pub async fn connect(config: &Config) -> surrealdb::Result<Surreal<Client>> {
trace!("Establishing connection to surreal...");
// Connect to the server
let db = Surreal::new::<Ws>("127.0.0.1:8000").await?;
let db = Surreal::new::<Ws>(&config.surreal_url).await?;
trace!("Logging in...");
// Signin as a namespace, database, or root user
db.signin(Root {
username: "root",
password: "root",
username: &config.surreal_username,
password: &config.surreal_password,
})
.await?;
// Select a specific namespace / database
db.use_ns("test").use_db("v1.2").await?;
db
.use_ns(&config.surreal_ns)
.use_db(&config.surreal_db)
.await?;
let setup = include_bytes!("setup.surql");
let file = setup.iter().map(|c| *c as char).collect::<String>();
db.query(file).await.expect("Failed to setup surreal tables.");
Ok(db)
}

View File

@ -1,60 +1,125 @@
#![feature(ip_from)]
extern crate html5ever;
extern crate markup5ever_rcdom as rcdom;
use std::{fs::File, io::Read, net::{IpAddr, Ipv4Addr}, time::Instant};
use db::{connect, Website};
use html5ever::{
local_name, parse_document, tendril::TendrilSink, tree_builder::TreeBuilderOpts, ParseOpts,
};
use rcdom::RcDom;
use std::time::Instant;
use surrealdb::{engine::remote::ws::Client, sql::Thing, Surreal};
use tracing::{debug, info, instrument, trace, trace_span};
use tracing_subscriber::EnvFilter;
use metrics::{counter, gauge};
use metrics_exporter_prometheus::PrometheusBuilder;
use s3::S3;
use serde::Deserialize;
use surrealdb::{engine::remote::ws::Client, Surreal};
use tokio::task::JoinSet;
use tracing::{debug, info, instrument, trace, trace_span, warn};
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry};
mod db;
mod parser;
mod s3;
const GET_METRIC: &'static str = "total_gets";
const GET_IN_FLIGHT: &'static str = "gets_in_flight";
const SITES_CRAWLED: &'static str = "pages_crawled";
const BEING_PROCESSED: &'static str = "pages_being_processed";
#[derive(Deserialize)]
struct Config {
surreal_ns: String,
surreal_db: String,
surreal_url: String,
surreal_username: String,
surreal_password: String,
s3_url: String,
s3_bucket: String,
s3_access_key: String,
s3_secret_key: String,
crawl_filter: String,
budget: usize,
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.with_line_number(true)
.without_time()
.init();
debug!("Starting...");
let total_runtime = Timer::start("Completed");
let writer = std::fs::OpenOptions::new()
.append(true)
.create(true)
.open("./docker/logs/tracing.log")
.expect("Couldn't make log file!");
let registry = Registry::default()
.with(
fmt::layer()
.with_line_number(true)
.with_thread_ids(true)
.with_file(true)
// .with_timer(LocalTime::rfc_3339()) // Loki or alloy does this automatically
.json()
.with_writer(writer)
// .with_filter(EnvFilter::from_default_env())
);
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
let builder = PrometheusBuilder::new();
builder.with_http_listener(
std::net::SocketAddr::new(IpAddr::V4(Ipv4Addr::from_octets([0,0,0,0])), 2500)
)
.install()
.expect("failed to install recorder/exporter");
debug!("Starting...");
// Would probably take these in as parameters from a cli
let url = "https://oliveratkinson.net/";
// let url = "http://localhost:5500";
let budget = 1000;
let starting_url = "https://en.wikipedia.org/";
// When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
// let crawl_filter = "en.wikipedia.org/";
// let budget = 50;
let mut crawled = 0;
let db = connect().await.expect("Failed to connect to db, aborting.");
let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml");
let mut buf = String::new();
let _ = file.read_to_string(&mut buf);
let client = reqwest::Client::builder()
let config: Config = toml::from_str(&buf).expect("Failed to parse Crawler.toml");
let db = connect(&config)
.await
.expect("Failed to connect to surreal, aborting.");
let s3 = S3::connect(&config)
.await
.expect("Failed to connect to minio, aborting.\n\nThis probably means you need to login to the minio console and get a new access key!\n\n(Probably here) http://localhost:9001/access-keys/new-account\n\n");
let reqwest = reqwest::Client::builder()
// .use_rustls_tls()
.gzip(true)
.build()
.unwrap();
.expect("Failed to build reqwest client.");
// Kick off the whole machine - This Website object doesn't matter, it's just to allow for
// get() to work.
let span = trace_span!("Pre-Loop");
let pre_loop_span = span.enter();
// Download the site
let mut site = Website::new(&url, false);
get(&mut site, &db, &client, &mut crawled).await;
let site = Website::new(&starting_url, false);
get(site, db.clone(), reqwest.clone(), s3.clone()).await;
drop(pre_loop_span);
let span = trace_span!("Loop");
let span = span.enter();
while crawled < budget {
let get_num = if budget - crawled < 100 {
budget - crawled
while crawled < config.budget {
let get_num = if config.budget - crawled < 100 {
config.budget - crawled
} else {
100
};
let uncrawled = get_uncrawled_links(&db, get_num).await;
let uncrawled = get_uncrawled_links(&db, get_num, config.crawl_filter.clone()).await;
if uncrawled.len() == 0 {
info!("Had more budget but finished crawling everything.");
return;
@ -64,136 +129,78 @@ async fn main() {
let span = trace_span!("Crawling");
let _ = span.enter();
for mut site in uncrawled {
get(&mut site, &db, &client, &mut crawled).await;
let percent = format!("{:.2}%", (crawled as f32 / budget as f32) * 100f32);
info!("Crawled {crawled} out of {budget} pages. ({percent})");
{
let mut futures = JoinSet::new();
for site in uncrawled {
gauge!(BEING_PROCESSED).increment(1);
futures.spawn(get(site, db.clone(), reqwest.clone(), s3.clone()));
// let percent = format!("{:.2}%", (crawled as f32 / budget as f32) * 100f32);
// info!("Crawled {crawled} out of {budget} pages. ({percent})");
}
debug!("Joining {} futures...", futures.len());
let c = counter!(SITES_CRAWLED);
// As futures complete runs code in while block
while let Some(_) = futures.join_next().await {
c.increment(1);
gauge!(BEING_PROCESSED).decrement(1);
crawled += 1;
}
}
}
drop(span);
info!("Done");
drop(total_runtime);
}
#[instrument(skip_all)]
/// A quick helper function for downloading a url
async fn get(
site: &mut Website,
db: &Surreal<Client>,
request_client: &reqwest::Client,
count: &mut usize,
) {
#[instrument(skip (db, s3, reqwest))]
/// Downloads and crawls and stores a webpage.
/// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver
async fn get(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client, s3: S3) {
trace!("Get: {}", site.to_string());
let timer = Timer::start("Built request");
let request_builder = reqwest.get(site.to_string());
timer.stop();
let g = gauge!(GET_IN_FLIGHT);
g.increment(1);
let timer = Timer::start("Got page");
if let Ok(response) = request_client.get(site.to_string()).send().await {
if let Ok(response) = request_builder.send().await {
timer.stop();
g.decrement(1);
counter!(GET_METRIC).increment(1);
debug!("Getting body...");
// Get body
let data = response.text().await.unwrap();
let opts = ParseOpts {
tree_builder: TreeBuilderOpts {
drop_doctype: true,
..Default::default()
},
..Default::default()
};
// Get DOM
let dom = parse_document(RcDom::default(), opts)
.from_utf8()
.read_from(&mut data.as_bytes())
.unwrap();
// TODO save the dom to minio if a flag is set
// Modify record in database
site.set_crawled();
site.store(db).await;
trace!("Got: {}", site.to_string());
// Walk all the children nodes, searching for links to other pages.
let mut buffer = Vec::new();
let timer = Timer::start("Walked");
walk(&dom.document, &db, &site, &mut buffer).await;
timer.stop();
// Put all the found links into the database.
site.links_to(buffer, &db).await;
*count += 1;
let data = response.text().await.expect("Failed to read http response's body!");
// Store document
s3.store(&data, &site.site).await;
// Parse document and store relationships
parser::parse(&db, &mut site, &data).await;
return;
}
trace!("Failed to get: {}", site.to_string());
}
/// Walks the givin site, placing it's findings in the database
async fn walk(
node: &rcdom::Handle,
db: &Surreal<Client>,
site: &Website,
links_to: &mut Vec<Thing>,
) {
let span = trace_span!("Walk");
let span = span.enter();
// Match each node - node basically means element.
match &node.data {
rcdom::NodeData::Element { name, attrs, .. } => {
for attr in attrs.borrow().clone() {
match name.local {
local_name!("a")
| local_name!("audio")
| local_name!("area")
| local_name!("img")
| local_name!("link")
| local_name!("object")
| local_name!("source")
| local_name!("base")
| local_name!("video") => {
let attribute_name = attr.name.local.to_string();
if attribute_name == "src"
|| attribute_name == "href"
|| attribute_name == "data"
{
// Get clone of the current site object
let mut web = site.clone();
// Set url
let url = web.mut_url();
url.set_fragment(None); // removes #xyz
let joined = url.join(&attr.value).unwrap();
*url = joined;
// Set other attributes
web.crawled = false;
// TODO set element name
// let element_name = name.local.to_string();
if let Some(id) = web.store(db).await {
links_to.push(id);
}
}
}
local_name!("button") | local_name!("meta") | local_name!("iframe") => {
// dbg!(attrs);
}
_ => {}
};
}
}
_ => {}
};
drop(span);
for child in node.children.borrow().iter() {
Box::pin(walk(child, db, site, links_to)).await;
}
}
/// Returns uncrawled links
async fn get_uncrawled_links(db: &Surreal<Client>, mut count: usize) -> Vec<Website> {
#[instrument(skip(db))]
async fn get_uncrawled_links(
db: &Surreal<Client>,
mut count: usize,
filter: String,
) -> Vec<Website> {
if count > 100 {
count = 100
}
debug!("Getting uncrawled links");
let mut response = db
.query("SELECT * FROM website WHERE crawled = false LIMIT $count")
.query("SELECT * FROM website WHERE crawled = false AND site ~ type::string($format) LIMIT $count;")
.bind(("format", filter))
.bind(("count", count))
.await
.expect("Hard-coded query failed..?");
@ -218,7 +225,13 @@ impl<'a> Timer<'a> {
pub fn stop(&self) -> f64 {
let dif = self.start.elapsed().as_micros();
let ms = dif as f64 / 1000.;
debug!("{}", format!("{} in {:.3}ms", self.msg, ms));
if ms > 200. {
warn!("{}", format!("{} in {:.3}ms", self.msg, ms));
} else {
trace!("{}", format!("{} in {:.3}ms", self.msg, ms));
}
ms
}
}

109
src/parser.rs Normal file
View File

@ -0,0 +1,109 @@
use std::default::Default;
use std::str::FromStr;
use html5ever::tokenizer::{BufferQueue, TokenizerResult};
use html5ever::tokenizer::{StartTag, TagToken};
use html5ever::tokenizer::{Token, TokenSink, TokenSinkResult, Tokenizer, TokenizerOpts};
use html5ever::{local_name, tendril::*};
use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal;
use tracing::instrument;
use crate::db::Website;
use crate::Timer;
impl TokenSink for Website {
type Handle = Vec<Website>;
fn process_token(&self, token: Token, _line_number: u64) -> TokenSinkResult<Self::Handle> {
match token {
TagToken(tag) => {
if tag.kind == StartTag {
match tag.name {
local_name!("a")
| local_name!("audio")
| local_name!("area")
| local_name!("img")
| local_name!("link")
| local_name!("object")
| local_name!("source")
| local_name!("base")
| local_name!("video") => {
let mut links = Vec::new();
for attr in &tag.attrs {
let attr_name = attr.name.local.to_string();
if attr_name == "src" || attr_name == "href" || attr_name == "data"
{
// Get clone of the current site object
let mut web = self.clone();
// Set url
let mut url = web.site;
url.set_fragment(None); // removes #xyz
let joined = url.join(&attr.value).expect("Failed to join url during parsing!");
web.site = joined;
web.crawled = false;
links.push(web);
}
}
return TokenSinkResult::Script(links);
}
local_name!("button") | local_name!("meta") | local_name!("iframe") => {
// dbg!(attrs);
}
_ => {}
}
}
}
_ => {}
}
TokenSinkResult::Continue
}
}
#[instrument(skip_all)]
pub async fn parse(db: &Surreal<Client>, site: &mut Website, data: &str) {
// update self in db
site.set_crawled();
site.store(db).await;
// prep work
let mut other_sites: Vec<Website> = Vec::new();
{ // using blocks to prevent compiler's async worries
let _t = Timer::start("Parsed page");
// change data into something that can be tokenized
let chunk = Tendril::from_str(&data).expect("Failed to parse string into Tendril!");
// create buffer of tokens and push our input into it
let mut token_buffer = BufferQueue::default();
token_buffer.push_back(chunk.try_reinterpret::<fmt::UTF8>().expect("Failed to reinterprt chunk!"));
// create the tokenizer
let tokenizer = Tokenizer::new(site.clone(), TokenizerOpts::default());
// go thru buffer
while let TokenizerResult::Script(mut sites) = tokenizer.feed(&mut token_buffer) {
other_sites.append(&mut sites);
// other_sites.push(sites);
}
assert!(token_buffer.is_empty());
tokenizer.end();
}
{
let mut links_to = Vec::with_capacity(other_sites.len());
for a in other_sites {
let other = a.store(db).await;
if let Some(o) = other {
links_to.push(o);
}
}
site.links_to(links_to, db).await;
}
}

105
src/s3.rs Normal file
View File

@ -0,0 +1,105 @@
use base64::{alphabet, engine::{self, general_purpose}, Engine};
use metrics::counter;
use minio::s3::{
args::{BucketExistsArgs, MakeBucketArgs},
client::ClientBuilder,
creds::StaticProvider,
error::Error,
http::BaseUrl,
Client,
};
use tracing::{instrument, trace, warn};
use url::Url;
use crate::{Config, Timer};
const CUSTOM_ENGINE: engine::GeneralPurpose = engine::GeneralPurpose::new(&alphabet::URL_SAFE, general_purpose::NO_PAD);
const ROUND_TRIP_METRIC: &'static str = "s3_trips";
#[derive(Clone)]
pub struct S3 {
bucket_name: String,
client: Client,
}
impl S3 {
#[instrument(skip_all, name = "S3")]
pub async fn connect(config: &Config) -> Result<Self, Error> {
let base_url = config
.s3_url
.parse::<BaseUrl>()
.expect("Failed to parse url into BaseUrl");
let static_provider =
StaticProvider::new(&config.s3_access_key, &config.s3_secret_key, None);
let client = ClientBuilder::new(base_url)
.provider(Some(Box::new(static_provider)))
.build()?;
trace!("Checking bucket...");
let exists = client
.bucket_exists(
&BucketExistsArgs::new(&config.s3_bucket)
.expect("Failed to check if bucket exists"),
)
.await?;
if !exists {
trace!("Creating bucket...");
client
.make_bucket(
&MakeBucketArgs::new(&config.s3_bucket).expect("Failed to create bucket!"),
)
.await?;
}
trace!("Connection successful");
Ok(Self {
bucket_name: config.s3_bucket.to_owned(),
client: client,
})
}
#[instrument(name = "s3_store", skip_all)]
pub async fn store(&self, data: &str, url: &Url) {
let counter = counter!(ROUND_TRIP_METRIC);
let t = Timer::start("Stored page");
let _ = t; // prevent compiler drop
if let Some(domain) = url.domain() {
let filename = domain.to_owned() + url.path();
trace!("Created filename: {filename} from raw: {}", url.to_string());
counter.increment(1);
let _ = match &self
.client
.put_object_content(&self.bucket_name, &filename, data.to_owned())
.send()
.await {
Ok(_) => {},
Err(err) => {
match err {
Error::InvalidObjectName(_) => {
warn!("Tried storing invalid object name, retrying with Base64 encoding. Last try.");
let filename: String = domain.to_owned() + &CUSTOM_ENGINE.encode(url.path());
counter.increment(1);
let _ = &self
.client
.put_object_content(&self.bucket_name, &filename, data.to_owned())
.send()
.await
.unwrap();
},
_ => {},
}
},
};
}
}
}

3
src/setup.surql Normal file
View File

@ -0,0 +1,3 @@
DEFINE TABLE IF NOT EXISTS website SCHEMALESS;
DEFINE FIELD IF NOT EXISTS accessed_at ON TABLE website VALUE time::now();
DEFINE INDEX IF NOT EXISTS idx ON TABLE website COLUMNS site UNIQUE;