Compare commits

80 Commits

Author SHA1 Message Date
2c339a36f9 handle checking for file better 2025-10-09 23:00:11 -06:00
73216f7003 fix the issue where nothing works 2025-10-09 22:35:01 -06:00
1e59ebd5c4 even when not downloading, update the database 2025-10-09 22:13:06 -06:00
52d5e101d0 bragging 2025-10-09 22:03:19 -06:00
5b728bacd6 close #24, make program aware of the files already on disk 2025-10-09 21:52:41 -06:00
b0fe7f4761 close #18, format 2025-10-09 21:52:06 -06:00
5ade5e36df closes #11 2025-08-08 23:39:44 -06:00
95b8af0356 Restart threads that prematurely ended 2025-08-08 23:35:01 -06:00
ad8d7c606d increase csma/ca time 2025-08-08 23:34:45 -06:00
f3a51065b5 remove fixme 2025-07-17 09:37:03 -06:00
343d3a7570 better logging 2025-07-17 09:36:37 -06:00
e535bcc295 Merge branch 'main' of https://git.oliveratkinson.net/Oliver/internet_mapper 2025-07-17 08:59:32 -06:00
a0fd81d956 better config file 2025-07-17 08:58:30 -06:00
5cbba33a09 update how the database interactions work 2025-07-17 08:52:47 -06:00
83def7ba27 close #10 2025-07-16 16:07:37 -06:00
76e78cc745 better logging 2025-07-16 16:02:16 -06:00
b4038b76dd fix prometheus lol 2025-07-16 16:02:07 -06:00
caa523f1eb cleanup 2025-07-16 11:48:23 -06:00
f7bb0eef16 turn program into batch_size parrallel downloaders 2025-07-16 11:47:42 -06:00
865f9be8c0 Merge pull request 'works 😄' (#16) from tempfiles into main
Reviewed-on: #16
2025-07-16 02:26:14 +00:00
48abc73092 works 😄 2025-07-15 20:25:44 -06:00
0061866976 Merge pull request 'traces and new metrics work' (#13) from better_metrics into main
Reviewed-on: #13
2025-07-16 00:58:47 +00:00
9662b68b0d traces and new metrics work 2025-07-10 23:44:23 -06:00
6f98001d8e Merge pull request 'status_codes' (#8) from status_codes into main
Reviewed-on: #8
2025-07-11 00:49:27 +00:00
6790061e22 helper code 2025-07-09 15:58:22 -06:00
50606bb69e It isnt quite working yet 2025-04-17 09:59:23 -06:00
5850f19cab Merge pull request 'stream_response' (#6) from stream_response into main
Reviewed-on: #6
2025-04-17 15:39:49 +00:00
2c8546e30a logging cleanup 2025-04-17 09:36:27 -06:00
4e619d0ebc logging cleanup 2025-04-17 09:36:13 -06:00
647c4cd324 work off content-type header 2025-04-17 09:35:57 -06:00
7fab961d76 no longer how this is working 2025-04-17 09:35:26 -06:00
d3fff194f4 logging updates 2025-04-17 08:17:37 -06:00
3497312fd4 de-enshitified file saving logic 2025-04-17 08:17:29 -06:00
0fd76b1734 Merge pull request 'stream_response' (#4) from stream_response into main
Reviewed-on: #4
2025-04-15 21:23:54 +00:00
9bfa8f9108 batch_size 2025-04-15 13:38:28 -06:00
bdb1094a30 steam data to the disk 2025-04-15 13:07:47 -06:00
9aa2d9ce22 code settings 2025-04-15 13:06:53 -06:00
4b557a923c Merge pull request 'foss_storage' (#3) from foss_storage into main
Reviewed-on: #3
2025-04-15 15:11:59 +00:00
c08a20ac00 cleanup and more accuratly use metrics 2025-04-15 09:07:16 -06:00
94912e9125 change up how files are discovered 2025-04-15 09:06:57 -06:00
a9465dda6e add instructions 2025-03-31 15:05:18 -06:00
add6f00ed6 no recomp needed 2025-03-31 14:53:10 -06:00
4a433a1a77 This function sometimes throws errors, this logging should help 2025-03-31 14:18:37 -06:00
03cbcd9ae0 remove minio code 2025-03-31 14:18:11 -06:00
6fc71c7a78 add speed improvements 2025-03-21 12:14:29 -06:00
96a3ca092a :) 2025-03-21 12:11:05 -06:00
b750d88d48 working filesystem storage 2025-03-21 11:42:43 -06:00
808790a7c3 file patch; 2025-03-21 07:11:51 +00:00
2de01b2a0e remove removed code 2025-03-21 06:48:39 +00:00
be0fd5505b i think the files work better 2025-03-21 06:48:17 +00:00
a23429104c dead code removal 2025-03-21 06:03:34 +00:00
66581cc453 getting there 2025-03-21 05:59:40 +00:00
7df19a480f updates 2025-03-20 15:11:01 -06:00
b9c1f0b492 readme updates 2025-03-19 15:05:32 -06:00
71b7b2d7bc it works and it is awesome 2025-03-19 15:04:00 -06:00
bac3cd9d1d add most recent long run 2025-03-19 15:03:49 -06:00
1f6a0acce3 shutup spellchecker 2025-03-19 15:03:39 -06:00
53dbf53ab9 newest settings 2025-03-19 15:03:24 -06:00
0477bb26e4 viz improvements 2025-03-19 15:03:11 -06:00
6409baaffb Reducted trips to surreal by x500 2025-03-19 12:41:08 -06:00
135a7e4957 Merge pull request 'multithreading' (#2) from multithreading into main
Reviewed-on: #2
2025-03-19 05:00:59 +00:00
9aa34b3eee epic metrics 2025-03-19 04:59:50 +00:00
de80418c00 better logging 2025-03-18 16:09:46 -06:00
e3e4175f51 logging improvements 2025-03-18 15:25:56 -06:00
d11e7dd27c the biggest 1 line improvement ever 2025-03-18 15:25:40 -06:00
f2a3e836a0 spelling and clippy 2025-03-18 15:08:29 -06:00
3b4e6a40ce minimize vec resizing 2025-03-18 15:07:50 -06:00
bd0b946245 fixed tracing 2025-03-18 15:02:32 -06:00
b7540a4680 checkpoint - onto profiling 2025-03-18 10:53:06 -06:00
Oliver Atkinson
82929fd0fc updating for base64 2024-12-13 13:28:24 -07:00
Oliver Atkinson
f42e770a10 moved to other repo 2024-12-13 11:01:35 -07:00
Oliver Atkinson
611a1e923b starting on the extension 2024-12-12 15:32:04 -07:00
Oliver Atkinson
298ad39a79 rename 2024-12-12 14:59:54 -07:00
Oliver Atkinson
215056e493 use contains operator for better output 2024-12-12 14:26:49 -07:00
Oliver Atkinson
22be3b2f61 updating deps 2024-12-12 14:14:38 -07:00
Oliver Atkinson
c1c8cf07bb unifed settings for testing 2024-12-12 11:42:07 -07:00
0f8a3d7215 using a custom parser now :) 2024-11-12 23:08:09 -07:00
574a370f30 readme updates 2024-11-12 21:24:57 -07:00
eaa79b749e prepare get function for s3 2024-11-12 21:19:05 -07:00
2c28d69d55 add s3 support 2024-11-12 21:03:58 -07:00
23 changed files with 3251 additions and 918 deletions

6
.gitignore vendored
View File

@@ -1,5 +1,9 @@
/target /target
/.surrealdb /.surrealdb
/.minio
perf.data perf.data
flamegraph.svg flamegraph.svg
perf.data.old perf.data.old
/docker/logs/*
/downloaded
/Crawler.toml

19
.vscode/launch.json vendored
View File

@@ -7,18 +7,15 @@
{ {
"type": "lldb", "type": "lldb",
"request": "launch", "request": "launch",
"name": "Debug executable 'surreal_spider'", "name": "Debug executable 'internet_mapper'",
"env": {
"RUST_LOG": "surreal_spider=debug,reqwest=info",
},
"cargo": { "cargo": {
"args": [ "args": [
"build", "build",
"--bin=surreal_spider", "--bin=internet_mapper",
"--package=surreal_spider" "--package=internet_mapper"
], ],
"filter": { "filter": {
"name": "surreal_spider", "name": "internet_mapper",
"kind": "bin" "kind": "bin"
} }
}, },
@@ -28,16 +25,16 @@
{ {
"type": "lldb", "type": "lldb",
"request": "launch", "request": "launch",
"name": "Debug unit tests in executable 'surreal_spider'", "name": "Debug unit tests in executable 'internet_mapper'",
"cargo": { "cargo": {
"args": [ "args": [
"test", "test",
"--no-run", "--no-run",
"--bin=surreal_spider", "--bin=internet_mapper",
"--package=surreal_spider" "--package=internet_mapper"
], ],
"filter": { "filter": {
"name": "surreal_spider", "name": "internet_mapper",
"kind": "bin" "kind": "bin"
} }
}, },

8
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,8 @@
{
"cSpell.words": [
"creds",
"reqwest",
"rustls",
"surql",
]
}

2003
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,15 +1,23 @@
[package] [package]
name = "surreal_spider" name = "internet_mapper"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
html5ever = "0.29.0" base64 = "0.22.1"
markup5ever_rcdom = "0.5.0-unofficial" futures-util = "0.3.31"
reqwest = "0.12.9" html5ever = "0.29"
serde = { version = "1.0.214", features = ["derive"] } metrics = "0.24.1"
surrealdb = "2.0.4" metrics-exporter-prometheus = { version = "0.16.2", features=["http-listener"]}
opentelemetry = "0.30.0"
opentelemetry-otlp = { version = "0.30.0", features = ["metrics", "trace", "logs", "grpc-tonic"] }
opentelemetry_sdk = "0.30.0"
rand = "0.9.1"
reqwest = { version = "0.12", features = ["gzip", "default", "rustls-tls", "stream"] }
serde = { version = "1.0", features = ["derive"] }
surrealdb = "2.2"
tokio = { version="1.41.0", features = ["full"] } tokio = { version="1.41.0", features = ["full"] }
tracing = "0.1.40" toml = "0.8.20"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } tracing = "0.1"
url = { version = "2.5.3", features = ["serde"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "local-time", "json"] }
url = { version = "2.5", features = ["serde"] }

22
Crawler.toml Normal file
View File

@@ -0,0 +1,22 @@
# Visability config
# Alloy (for Tempo)
tracing_endpoint = "http://localhost:4317"
# Prometheus
metrics_endpoint = "http://localhost:9090/api/v1/otlp/v1/metrics"
# Alloy (for Loki)
log_file = "./docker/logs/tracing.log"
# Surreal config
surreal_url = "localhost:8000"
surreal_username = "root"
surreal_password = "root"
surreal_ns = "test"
surreal_db = "v1.21.1"
# Crawler config
crawl_filter = "https://ftpgeoinfo.msl.mt.gov/Data/Spatial/MSDI/Imagery/2023_NAIP/UTM_County_Mosaics/"
# crawl_filter = "https://oliveratkinson.net"
start_url = "https://ftpgeoinfo.msl.mt.gov/Data/Spatial/MSDI/Imagery/2023_NAIP/UTM_County_Mosaics/"
# start_url = "https://oliveratkinson.net"
budget = 100
batch_size = 2

View File

@@ -1,23 +1,59 @@
# Surreal Crawler # Surreal Crawler
Mapping with a budget of 1000 (crawl 1000 sites, so many more links are actually discovered), on [my webiste](https://oliveratkinson.net) on 8/26/2024 took 1m9s. Crawls sites saving all the found links to a surrealdb database. It then proceeds to take batches of 100 uncrawled links untill the crawl budget is reached. It saves the data of each site in a minio database.
This is including the crawl and loading into the database and linking sites. (Locally hosted surreal db instance) ## How to use
This run created 4299 site links with 23286 links between the sites. (It found my this git site which really bolsters those numbers.) 1. Clone the repo and `cd` into it.
2. Build the repo with `cargo build -r`
3. Start the docker conatiners
1. cd into the docker folder `cd docker`
2. Bring up the docker containers `docker compose up -d`
4. From the project's root, edit the `Crawler.toml` file to your liking.
5. Run with `./target/release/internet_mapper`
## Install / Build You can view stats of the project at `http://<your-ip>:3000/dashboards`
* You will need rust to compile the crawler [rustup.rs](https://rustup.rs) ```bash
* You need python3 (will come installed on most linux distros) and poetry for dependancy management. # Untested script but probably works
* Install `pipx`, `python3` git clone https://git.oliveratkinson.net/Oliver/internet_mapper.git
* Then: `pipx install poetry` cd internet_mapper
* Then: `poetry install` to install the project dependancies
* You need to install [surrealdb](https://surrealdb.com)
## Use cargo build -r
Just run `./crawl.sh {url}` and it will start crawling. You can tweak the budget inside [crawl.sh](https://git.oliveratkinson.net/Oliver/internet_mapper/src/branch/main/crawl.sh) if you want. cd docker
docker compose up -d
cd ..
You can also prefix the command with `time` to benchmark the system, such as: `time ./crawl.sh https://discord.com`. $EDITOR Crawler.toml
./target/release/internet_mapper
```
### TODO
- [x] Domain filtering - prevent the crawler from going on alternate versions of wikipedia.
- [ ] Conditionally save content - based on filename or file contents
- [x] GUI / TUI ? - Graphana
- [x] Better asynchronous getting of the sites. Currently it all happens serially.
- [x] Allow for storing asynchronously - dropping the "links to" logic fixes this need
- [x] Control crawler via config file (no recompliation needed)
### Feats
3/17/25: Took >1hr to crawl 100 pages.
3/19/25: Took 20min to crawl 1000 pages.
This ment we stored 1000 pages, 142,997 urls, and 1,425,798 links between the two.
3/20/25: Took 5min to crawl 1000 pages.
3/21/25: Took 3min to crawl 1000 pages.
7/.../25: Downloaded just shy of 12TB of data from a remote server.
# About
![Screenshot](/pngs/graphana.png)

View File

@@ -1,16 +0,0 @@
services:
db:
image: surrealdb/surrealdb:latest-dev
ports:
- 8000:8000
volumes:
- ./.surrealdb/:/mydata
command:
- start
- --log
- debug
- --user
- root
- --pass
- root
- rocksdb:/mydata/database.db

36
docker/alloy.conf Normal file
View File

@@ -0,0 +1,36 @@
local.file_match "tmplogs" {
path_targets = [{"__path__" = "/tmp/alloy-logs/*.log"}]
}
loki.source.file "local_files" {
targets = local.file_match.tmplogs.targets
forward_to = [loki.write.local_loki.receiver]
}
loki.write "local_loki" {
endpoint {
url = "http://loki:3100/loki/api/v1/push"
}
}
otelcol.receiver.otlp "otlp_receiver" {
grpc {
endpoint = "0.0.0.0:4317"
}
http {
endpoint = "0.0.0.0:4318"
}
output {
traces = [otelcol.exporter.otlp.tempo.input,]
}
}
otelcol.exporter.otlp "tempo" {
client {
endpoint = "tempo:4317"
tls {
insecure = true
}
}
}

92
docker/compose.yml Normal file
View File

@@ -0,0 +1,92 @@
services:
# Database
surreal:
image: surrealdb/surrealdb:latest-dev
ports:
- 8000:8000
volumes:
- surrealdb_storage:/mydata
command:
- start
- --log
- debug
- --user
- root
- --pass
- root
- rocksdb:/mydata/database.db
# Tracing
tempo:
image: grafana/tempo:latest
command: [ "-config.file=/etc/tempo.yaml" ]
volumes:
- ./tempo.yaml:/etc/tempo.yaml
- tempo_storage:/var/tempo
ports:
- 3200:3200 # self metrics for prometheus
- 4317:4317 # otlp grpc - (alloy)
# Log scraper
alloy:
image: grafana/alloy:latest
ports:
- 12345:12345
volumes:
# if you change this, you also need to change it in the alloy config file
- ./logs/:/tmp/alloy-logs
- ./alloy.conf:/etc/alloy/config.alloy
- alloy_storage:/var/lib/alloy
command:
- run
- --server.http.listen-addr=0.0.0.0:12345
- --storage.path=/var/lib/alloy/data
- /etc/alloy/config.alloy
# Log storage / analysis
loki:
image: grafana/loki:latest
ports:
- 3100:3100
command: -config.file=/etc/loki/local-config.yaml
volumes:
- ./loki.yaml:/etc/loki/local-config.yaml
# Metrics
prometheus:
image: prom/prometheus:latest
ports:
- 9090:9090
volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml
# persist data
# - prometheus_storage:/prometheus
command:
- --enable-feature=native-histograms
- --web.enable-remote-write-receiver
- --web.enable-lifecycle
- --web.enable-otlp-receiver
- --config.file=/etc/prometheus/prometheus.yml
# Everything viewer
grafana:
image: grafana/grafana:latest
volumes:
- ./grafana.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
- ./dashboards:/var/lib/grafana/dashboards
- grafana_storage:/var/lib/grafana
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
- GF_AUTH_DISABLE_LOGIN_FORM=true
- GF_FEATURE_TOGGLES_ENABLE=traceqlEditor
ports:
- 3000:3000
volumes:
prometheus_storage:
grafana_storage:
alloy_storage:
surrealdb_storage:
tempo_storage:

View File

@@ -0,0 +1,648 @@
{
"__inputs": [
{
"name": "DS_PROMETHEUS",
"label": "Prometheus",
"description": "",
"type": "datasource",
"pluginId": "prometheus",
"pluginName": "Prometheus"
},
{
"name": "DS_LOKI",
"label": "Loki",
"description": "",
"type": "datasource",
"pluginId": "loki",
"pluginName": "Loki"
}
],
"__elements": {},
"__requires": [
{
"type": "grafana",
"id": "grafana",
"name": "Grafana",
"version": "11.3.1"
},
{
"type": "panel",
"id": "logs",
"name": "Logs",
"version": ""
},
{
"type": "datasource",
"id": "loki",
"name": "Loki",
"version": "1.0.0"
},
{
"type": "datasource",
"id": "prometheus",
"name": "Prometheus",
"version": "1.0.0"
},
{
"type": "panel",
"id": "stat",
"name": "Stat",
"version": ""
},
{
"type": "panel",
"id": "timeseries",
"name": "Time series",
"version": ""
}
],
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": null,
"links": [],
"panels": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": 300000,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 8,
"x": 0,
"y": 0
},
"id": 5,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.3.1",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "surql_trips",
"fullMetaSearch": false,
"includeNullMetadata": true,
"legendFormat": "Trips to Surreal",
"range": true,
"refId": "A",
"useBackend": false
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "s3_trips",
"fullMetaSearch": false,
"hide": false,
"includeNullMetadata": true,
"instant": false,
"legendFormat": "Trips to S3",
"range": true,
"refId": "B",
"useBackend": false
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "pages_crawled",
"fullMetaSearch": false,
"hide": false,
"includeNullMetadata": true,
"instant": false,
"legendFormat": "total crawled",
"range": true,
"refId": "C",
"useBackend": false
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "pages_being_processed",
"fullMetaSearch": false,
"hide": false,
"includeNullMetadata": true,
"instant": false,
"legendFormat": "Pages being processed",
"range": true,
"refId": "E",
"useBackend": false
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "gets_in_flight",
"fullMetaSearch": false,
"hide": false,
"includeNullMetadata": true,
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "D",
"useBackend": false
}
],
"title": "Crawler stats",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": 300000,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 9,
"x": 8,
"y": 0
},
"id": 6,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.3.1",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "surql_trips",
"fullMetaSearch": false,
"includeNullMetadata": true,
"legendFormat": "Trips to Surreal",
"range": true,
"refId": "A",
"useBackend": false
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "surql_link_calls",
"fullMetaSearch": false,
"hide": false,
"includeNullMetadata": true,
"instant": false,
"legendFormat": "link calls",
"range": true,
"refId": "B",
"useBackend": false
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "surql_store_calls",
"fullMetaSearch": false,
"hide": false,
"includeNullMetadata": true,
"instant": false,
"legendFormat": "store calls",
"range": true,
"refId": "C",
"useBackend": false
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "pages_being_processed",
"fullMetaSearch": false,
"hide": false,
"includeNullMetadata": true,
"instant": false,
"legendFormat": "Pages being processed",
"range": true,
"refId": "E",
"useBackend": false
}
],
"title": "Surreal stats",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "This is across all threads, so this isn't wall clock time",
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
}
]
},
"unit": "ms"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 7,
"x": 17,
"y": 0
},
"id": 7,
"options": {
"colorMode": "value",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"percentChangeColorMode": "standard",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "",
"values": false
},
"showPercentChange": false,
"textMode": "auto",
"wideLayout": true
},
"pluginVersion": "11.3.1",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "surql_lock_waiting_ms",
"fullMetaSearch": false,
"includeNullMetadata": true,
"legendFormat": "__auto",
"range": true,
"refId": "A",
"useBackend": false
}
],
"title": "Time spend waiting on lock",
"type": "stat"
},
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"gridPos": {
"h": 18,
"w": 24,
"x": 0,
"y": 8
},
"id": 1,
"options": {
"dedupStrategy": "none",
"enableLogDetails": true,
"prettifyLogMessage": false,
"showCommonLabels": false,
"showLabels": false,
"showTime": false,
"sortOrder": "Descending",
"wrapLogMessage": false
},
"pluginVersion": "11.3.1",
"targets": [
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"editorMode": "code",
"expr": "{filename=\"/tmp/alloy-logs/tracing.log\"} | json | level = `ERROR` | line_format \"{{.threadId}} {{.filename_extracted}}:{{.line_number}} {{.fields_message}}\"",
"queryType": "range",
"refId": "A"
}
],
"title": "Errors",
"type": "logs"
},
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 26
},
"id": 2,
"options": {
"dedupStrategy": "none",
"enableLogDetails": true,
"prettifyLogMessage": false,
"showCommonLabels": false,
"showLabels": false,
"showTime": false,
"sortOrder": "Descending",
"wrapLogMessage": false
},
"pluginVersion": "11.3.1",
"targets": [
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"editorMode": "code",
"expr": "{filename=\"/tmp/alloy-logs/tracing.log\"} | json | level = `DEBUG` | line_format \"{{.fields_message}}\"",
"queryType": "range",
"refId": "A"
}
],
"title": "Debug",
"type": "logs"
},
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"gridPos": {
"h": 16,
"w": 12,
"x": 12,
"y": 26
},
"id": 4,
"options": {
"dedupStrategy": "none",
"enableLogDetails": true,
"prettifyLogMessage": false,
"showCommonLabels": false,
"showLabels": false,
"showTime": false,
"sortOrder": "Descending",
"wrapLogMessage": false
},
"pluginVersion": "11.3.1",
"targets": [
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"editorMode": "code",
"expr": "{filename=\"/tmp/alloy-logs/tracing.log\"} | json | level = `TRACE` | line_format \"{{.fields_message}}\"",
"queryType": "range",
"refId": "A"
}
],
"title": "Trace",
"type": "logs"
},
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 34
},
"id": 3,
"options": {
"dedupStrategy": "none",
"enableLogDetails": true,
"prettifyLogMessage": false,
"showCommonLabels": false,
"showLabels": false,
"showTime": false,
"sortOrder": "Descending",
"wrapLogMessage": false
},
"pluginVersion": "11.3.1",
"targets": [
{
"datasource": {
"type": "loki",
"uid": "${DS_LOKI}"
},
"editorMode": "code",
"expr": "{filename=\"/tmp/alloy-logs/tracing.log\"} | json | level = `WARN` | line_format \"{{.fields_message}}\"",
"queryType": "range",
"refId": "A"
}
],
"title": "Warnings",
"type": "logs"
}
],
"schemaVersion": 40,
"tags": [],
"templating": {
"list": [
{
"datasource": {
"type": "loki",
"uid": "P8E80F9AEF21F6940"
},
"filters": [],
"name": "Filters",
"type": "adhoc"
}
]
},
"time": {
"from": "now-5m",
"to": "now"
},
"timepicker": {},
"timezone": "browser",
"title": "Crawler",
"uid": "ceg90x34pqgowd",
"version": 21,
"weekStart": ""
}

41
docker/grafana.yaml Normal file
View File

@@ -0,0 +1,41 @@
apiVersion: 1
datasources:
- name: Loki
type: loki
access: proxy
orgId: 1
url: http://loki:3100
basicAuth: false
isDefault: true
version: 1
editable: false
- name: Prometheus
type: prometheus
uid: prometheus
access: proxy
orgId: 1
url: http://prometheus:9090
basicAuth: false
isDefault: false
version: 1
editable: false
jsonData:
httpMethod: GET
- name: Tempo
type: tempo
access: proxy
orgId: 1
url: http://tempo:3200
basicAuth: false
isDefault: false
version: 1
editable: true
apiVersion: 1
uid: tempo
jsonData:
httpMethod: GET
serviceMap:
datasourceUid: prometheus
streamingEnabled:
search: true

62
docker/loki.yaml Normal file
View File

@@ -0,0 +1,62 @@
# this is mostly the default config from grafana's website
auth_enabled: false
server:
http_listen_port: 3100
grpc_listen_port: 9096
log_level: info
grpc_server_max_concurrent_streams: 1000
common:
instance_addr: 127.0.0.1
path_prefix: /tmp/loki
storage:
filesystem:
chunks_directory: /tmp/loki/chunks
rules_directory: /tmp/loki/rules
replication_factor: 1
ring:
kvstore:
store: inmemory
query_range:
results_cache:
cache:
embedded_cache:
enabled: true
max_size_mb: 100
limits_config:
metric_aggregation_enabled: true
schema_config:
configs:
- from: 2020-10-24
store: tsdb
object_store: filesystem
schema: v13
index:
prefix: index_
period: 24h
pattern_ingester:
enabled: true
metric_aggregation:
loki_address: localhost:3100
frontend:
encoding: protobuf
# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration
# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/
#
# Statistics help us better understand how Loki is used, and they show us performance
# levels for most users. This helps us prioritize features and documentation.
# For more information on what's sent, look at
# https://github.com/grafana/loki/blob/main/pkg/analytics/stats.go
# Refer to the buildReport method to see what goes into a report.
#
# If you would like to disable reporting, uncomment the following lines:
analytics:
reporting_enabled: false

15
docker/prometheus.yaml Normal file
View File

@@ -0,0 +1,15 @@
global:
scrape_interval: 60s
query_log_file: /etc/prometheus/query.log
scrape_configs:
# Crawler configs get pushed with OTLP
# - job_name: 'loki'
# static_configs:
# - targets: ['loki:3100']
# - job_name: 'prometheus'
# static_configs:
# - targets: ['localhost:9090']
- job_name: 'tempo'
static_configs:
- targets: ['tempo:3200']

48
docker/tempo.yaml Normal file
View File

@@ -0,0 +1,48 @@
stream_over_http_enabled: true
server:
http_listen_port: 3200
log_level: info
query_frontend:
search:
duration_slo: 5s
throughput_bytes_slo: 1.073741824e+09
metadata_slo:
duration_slo: 5s
throughput_bytes_slo: 1.073741824e+09
trace_by_id:
duration_slo: 5s
distributor:
receivers:
otlp:
protocols:
grpc:
endpoint: "tempo:4317"
metrics_generator:
registry:
external_labels:
source: tempo
cluster: docker-compose
storage:
path: /var/tempo/generator/wal
remote_write:
- url: http://prometheus:9090/api/v1/write
send_exemplars: true
traces_storage:
path: /var/tempo/generator/traces
storage:
trace:
backend: local # backend configuration to use
wal:
path: /var/tempo/wal # where to store the wal locally
local:
path: /var/tempo/blocks
overrides:
defaults:
metrics_generator:
processors: [service-graphs, span-metrics, local-blocks] # enables metrics generator
generate_native_histograms: both

16
jsconfig.json Normal file
View File

@@ -0,0 +1,16 @@
{
"compilerOptions": {
"module": "ESNext",
"moduleResolution": "Bundler",
"target": "ES2022",
"jsx": "react",
"allowImportingTsExtensions": true,
"strictNullChecks": true,
"strictFunctionTypes": true
},
"exclude": [
"node_modules",
"**/node_modules/*"
],
"typeAcquisition": {"include": ["firefox-webext-browser"]}
}

BIN
pngs/graphana.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 264 KiB

View File

@@ -1,2 +0,0 @@
DEFINE TABLE website SCHEMALESS;
DEFINE FIELD accessed_at ON TABLE website VALUE time::now();

207
src/db.rs
View File

@@ -1,23 +1,40 @@
use metrics::counter;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{fmt::Debug, time::Duration};
use surrealdb::{ use surrealdb::{
engine::remote::ws::{Client, Ws}, engine::remote::ws::{Client, Ws},
opt::auth::Root, opt::auth::Root,
sql::Thing, sql::Thing,
Response, Surreal, Surreal,
}; };
use tracing::{error, instrument, trace, warn}; use tokio::time::sleep;
use tracing::{error, instrument, trace};
use url::Url; use url::Url;
use crate::Timer; use crate::Config;
#[derive(Debug, Serialize, Deserialize, Clone)] const STORE: &str = "surql_store_calls";
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash)]
pub struct Website { pub struct Website {
pub id: Option<Thing>,
/// The url that this data is found at /// The url that this data is found at
site: Url, pub site: Url,
/// Wether or not this link has been crawled yet /// Wether or not this link has been crawled yet
pub crawled: bool, pub crawled: bool,
#[serde(skip_serializing)] /// 200, 404, etc
id: Option<Thing>, pub status_code: u16,
}
// manual impl to make tracing look nicer
impl Debug for Website {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Website")
.field("host", &self.site.host())
.field("path", &self.site.path())
.field("status_code", &self.status_code)
.finish()
}
} }
impl Website { impl Website {
@@ -28,140 +45,112 @@ impl Website {
Err(_) => todo!(), Err(_) => todo!(),
}; };
Self { Self {
id: None,
crawled, crawled,
site, site,
status_code: 0,
id: None,
} }
} }
pub fn set_crawled(&mut self) { // Insert ever item in the vec into surreal, crawled state will be preserved as TRUE
trace!("Set crawled to true"); // if already in the database as such or incoming data is TRUE.
self.crawled = true #[instrument(skip(db))]
} pub async fn store_all(all: Vec<Self>, db: &Surreal<Client>) -> Vec<Thing> {
counter!(STORE).increment(1);
let mut things = Vec::with_capacity(all.len());
pub fn mut_url(&mut self) -> &mut Url {
&mut self.site
}
#[instrument(skip_all)]
pub async fn links_to(&self, other: Vec<Thing>, db: &Surreal<Client>) {
let len = other.len();
if len == 0 {return}
let from = self.site.to_string();
// let to = other.site.to_string();
trace!("Linking {from} to {} other pages.", other.len());
let msg = format!("Linked {len} pages");
let timer = Timer::start(&msg);
// prevent the timer from being dropped instantly.
let _ = timer;
match db match db
.query("COUNT(RELATE (SELECT id FROM website WHERE site = $in) -> links_to -> $out)") .query(
.bind(("in", from)) "INSERT INTO website $array
.bind(("out", other)) ON DUPLICATE KEY UPDATE
accessed_at = time::now(),
status_code = $input.status_code,
processing = false,
crawled = crawled OR $input.crawled
RETURN VALUE id;
",
)
.bind(("array", all))
.await .await
{ {
Ok(mut e) => { Ok(mut id) => match id.take::<Vec<Thing>>(0) {
// The relate could technically "fail" (not relate anything), this just means that Ok(mut x) => things.append(&mut x),
// the query was ok. Err(err) => error!("{:?}", err),
let _: Response = e;
if let Ok(vec) = e.take(0) {
let _: Vec<usize> = vec;
if let Some(num) = vec.get(0) {
if *num == len {
trace!("Link OK");
return;
} else {
warn!("Didn't link all the records. {num}/{len}");
return;
}
}
}
warn!("Linking request succeeded but couldn't verify the results.");
}, },
Err(e) => { Err(err) => {
error!("{}", e.to_string()); error!("{:?}", err);
},
}
}
#[instrument(skip_all)]
pub async fn store(&mut self, db: &Surreal<Client>) -> Option<Thing> {
// check if it's been gone thru before
let mut response = db
.query("SELECT * FROM ONLY website WHERE site = $site LIMIT 1")
.bind(("site", self.site.to_string()))
.await
.unwrap();
if let Some(old) = response.take::<Option<Website>>(0).unwrap() {
// site exists already
if let Some(id) = old.id {
// make sure to preserve the "crawled status"
let mut new = self.clone();
new.crawled = old.crawled | new.crawled;
// update the record
match db.upsert((id.tb, id.id.to_string())).content(new).await {
Ok(e) => {
if let Some(a) = e {
let _: Record = a;
return Some(a.id);
}
}
Err(e) => {
error!("{}", e);
}
};
} }
} else {
// sites hasn't existed yet
match db.create("website").content(self.clone()).await {
Ok(e) => {
let _: Option<Record> = e;
if let Some(a) = e {
let _: Record = a;
return Some(a.id);
}
}
Err(a) => error!("{:?}", a),
};
} }
None things
} }
} }
impl ToString for Website { /// Returns uncrawled links
fn to_string(&self) -> String { #[instrument(skip(db, config))]
self.site.to_string() pub async fn get_next(db: &Surreal<Client>, config: &Config) -> Option<Website> {
let mut res: Option<Website> = None;
let mut fails = 0;
while res == None {
let mut response = db
.query("fn::get_next($format)")
.bind(("format", config.crawl_filter.to_string()))
.await
.expect("Hard-coded query failed..?");
res = match response.take(0) {
Ok(ok) => ok,
Err(_err) => {
// basically just CSMA/CA
let delay = rand::random_range(10..10_000);
sleep(Duration::from_millis(delay)).await;
fails += 1;
// Don't get stuck here forever, failing...
// (most I've seen is 1)
if fails > 5 {
error!("Max attempts to get_next() reached... ({fails})");
return None
}
None
}
};
} }
res
} }
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
#[allow(dead_code)]
pub struct Email { pub struct Email {
pub email: String, pub email: String,
pub on: String, pub on: String,
} }
#[derive(Debug, Deserialize)] #[instrument(skip_all, name = "SurrealDB")]
pub struct Record { pub async fn connect(config: &Config) -> surrealdb::Result<Surreal<Client>> {
#[allow(dead_code)] trace!("Establishing connection to surreal...");
pub id: Thing,
}
pub async fn connect() -> surrealdb::Result<Surreal<Client>> {
// Connect to the server // Connect to the server
let db = Surreal::new::<Ws>("127.0.0.1:8000").await?; let db = Surreal::new::<Ws>(&config.surreal_url).await?;
trace!("Logging in...");
// Signin as a namespace, database, or root user // Signin as a namespace, database, or root user
db.signin(Root { db.signin(Root {
username: "root", username: &config.surreal_username,
password: "root", password: &config.surreal_password,
}) })
.await?; .await?;
// Select a specific namespace / database // Select a specific namespace / database
db.use_ns("test").use_db("v1.2").await?; db.use_ns(&config.surreal_ns)
.use_db(&config.surreal_db)
.await?;
let setup = include_bytes!("setup.surql");
let init_commands = setup.iter().map(|c| *c as char).collect::<String>();
db.query(init_commands)
.await
.expect("Failed to setup surreal tables.");
Ok(db) Ok(db)
} }

105
src/filesystem.rs Normal file
View File

@@ -0,0 +1,105 @@
use std::{io::ErrorKind, path::PathBuf};
use reqwest::header::HeaderValue;
use tokio::fs;
use tracing::{error, trace, warn};
use url::Url;
pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf {
// extract data from url to save it accurately
let mut url_path = PathBuf::from("./downloaded/".to_string() + url.domain().unwrap_or("UnknownDomain") + url.path());
if let Ok(header) = content_type.to_str() {
// text/html; charset=UTF-8; option=value
let ttype = if let Some((t, _)) = header.split_once(';') {
t
} else {
header
};
if let Some((ttype, subtype)) = ttype.split_once('/') {
trace!(url = url.to_string(), main_type = ttype, sub_type = subtype, "Found Content-Type to be: {ttype}/{subtype}");
// If the Content-Type header is "*/html" (most likely "text/html") and the path's
// extension is anything but html:
if subtype=="html" && !url_path.extension().is_some_and(|f| f=="html" || f=="htm" ) {
// time to slap a index.html to the end of that path there!
url_path = url_path.join("index.html");
}
}
} else {
warn!("Header: {:?} couldn't be parsed into a string!", content_type);
}
trace!(url = url.to_string(), path = &*url_path.to_string_lossy(), "Converted URL into path");
url_path
}
pub async fn check_file_length(file: &PathBuf) -> Option<u64> {
match tokio::fs::OpenOptions::new()
.write(false)
.read(true)
.create(false)
.open(file).await
{
Ok(file) => {
match file.metadata().await {
Ok(meta) => {
return Some(meta.len())
},
Err(err) => {
error!("Failed to get metadata. {}", err)
},
}
},
Err(err) => {
match err.kind() {
ErrorKind::NotFound => {/* ignore */},
_ => warn!("Failed to open file to check length. {:?} {}", file, err),
}
},
}
None
}
pub async fn init(filename: &PathBuf) -> Option<fs::File> {
let file = async || tokio::fs::OpenOptions::new()
.write(true)
.append(false)
.create(true)
.open(&filename).await;
match file().await {
Ok(ok) => {
trace!("Initialized file {}", filename.to_str().unwrap_or("N/A"));
Some(ok)
},
Err(err) => {
// the file/folder isn't found
if err.kind() == ErrorKind::NotFound {
if let Some(parent ) = &filename.parent() {
// create the folders
if let Err(err) = fs::create_dir_all(&parent).await {
error!("Dir creation: {err} {:?}", filename);
} else if let Ok(ok) = file().await {
return Some(ok);
}
} else {
error!("Couldn't get file's parents: {:?}", &filename);
}
} else if err.kind() == ErrorKind::NotADirectory {
// Example:
// 1. example.com/user
// 2. example.com/user/post
// If file 1 exists it will prevent file 2 from existing
// FIXME
error!("One of the parent directories is actually a file...")
} else {
error!("File open error: {err} {:?}", filename);
}
// we don't care about other errors, we can't/shouldn't fix them
None
}
}
}

View File

@@ -1,230 +1,415 @@
#![feature(ip_from)]
#![feature(path_add_extension)]
#![deny(clippy::unwrap_used)]
extern crate html5ever; extern crate html5ever;
extern crate markup5ever_rcdom as rcdom;
use std::{
collections::HashSet,
fs::File,
io::Read,
sync::{Arc, LazyLock},
};
use db::{connect, Website}; use db::{connect, Website};
use html5ever::{ use futures_util::StreamExt;
local_name, parse_document, tendril::TendrilSink, tree_builder::TreeBuilderOpts, ParseOpts, use opentelemetry::{
global::{self},
metrics::{Counter, Meter, UpDownCounter},
}; };
use rcdom::RcDom; use opentelemetry_otlp::{Protocol, WithExportConfig};
use std::time::Instant; use opentelemetry_sdk::{metrics::SdkMeterProvider, trace::SdkTracerProvider};
use surrealdb::{engine::remote::ws::Client, sql::Thing, Surreal}; use serde::Deserialize;
use tracing::{debug, info, instrument, trace, trace_span}; use surrealdb::{engine::remote::ws::Client, Surreal};
use tracing_subscriber::EnvFilter; use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufWriter},
sync::RwLock,
task::JoinSet,
};
use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, warn};
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry};
use crate::db::get_next;
mod db; mod db;
mod filesystem;
mod parser;
static METER: LazyLock<Meter> = LazyLock::new(|| global::meter("Internet_Mapper"));
static BATCH_SIZE: LazyLock<Counter<u64>> =
LazyLock::new(|| METER.u64_counter("crawler_batch_size").build());
static BEING_PROCESSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER
.i64_up_down_counter("crawler_pages_being_processed")
.build()
});
static BEING_PARSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER
.i64_up_down_counter("crawler_pages_being_parsed")
.build()
});
static BEING_STREAMED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER
.i64_up_down_counter("crawler_pages_being_streamed")
.build()
});
static GET_IN_FLIGHT: LazyLock<UpDownCounter<i64>> =
LazyLock::new(|| METER.i64_up_down_counter("crawler_gets_in_flight").build());
static TOTAL_BYTES_DOWN: LazyLock<Counter<u64>> =
LazyLock::new(|| METER.u64_counter("crawler_total_bytes_down").build());
static SITES_CRAWLED: LazyLock<Counter<u64>> =
LazyLock::new(|| METER.u64_counter("crawler_total_sites_crawled").build());
static CONFIG: LazyLock<Config> = LazyLock::new(|| {
let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml");
let mut buf = String::new();
let _ = file.read_to_string(&mut buf);
let config: Config = toml::from_str(&buf).expect("Failed to parse Crawler.toml");
config
});
// FIXME Traces aren't working on multiple threads, they block
// static TRACER: LazyLock<BoxedTracer> = LazyLock::new(|| global::tracer("Internet_Mapper"));
#[derive(Deserialize)]
struct Config {
tracing_endpoint: String,
metrics_endpoint: String,
log_file: String,
surreal_ns: String,
surreal_db: String,
surreal_url: String,
surreal_username: String,
surreal_password: String,
crawl_filter: String,
start_url: String,
budget: usize,
batch_size: usize,
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
tracing_subscriber::fmt() println!("Logs and metrics are provided to the Grafana dashboard");
.with_env_filter(EnvFilter::from_default_env())
.with_line_number(true)
.without_time()
.init();
debug!("Starting...");
// Would probably take these in as parameters from a cli // Start TRACE / LOGGING / METRICS
let url = "https://oliveratkinson.net/"; load_logging(&CONFIG); // this seems to be working ok
// let url = "http://localhost:5500"; global::set_tracer_provider(load_tracing(&CONFIG));
let budget = 1000; global::set_meter_provider(load_metrics(&CONFIG));
let mut crawled = 0;
let db = connect().await.expect("Failed to connect to db, aborting."); BATCH_SIZE.add(CONFIG.batch_size as u64, &[]);
let client = reqwest::Client::builder() // When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
// let crawl_filter = "en.wikipedia.org/";
// let budget = 50;
let crawled = Arc::new(RwLock::new(0));
let starting_url = &CONFIG.start_url;
let db = connect(&CONFIG)
.await
.expect("Failed to connect to surreal, aborting.");
let reqwest = reqwest::Client::builder()
// .use_rustls_tls() // .use_rustls_tls()
.gzip(true)
.build() .build()
.unwrap(); .expect("Failed to build reqwest client.");
// Kick off the whole machine - This Website object doesn't matter, it's just to allow for // Kick off the whole machine - This Website object doesn't matter, it's just to allow for
// get() to work. // get() to work.
let span = trace_span!("Pre-Loop"); // let mut span = TRACER.start("Pre-Loop");
let pre_loop_span = span.enter(); let site = Website::new(starting_url, false);
// Download the site process(site, db.clone(), reqwest.clone()).await;
let mut site = Website::new(&url, false); // span.end();
get(&mut site, &db, &client, &mut crawled).await;
drop(pre_loop_span); // let mut main_loop_span= TRACER.start("Main-Loop");
let mut futures = JoinSet::new();
for _ in 0..CONFIG.batch_size {
futures.spawn(process_single_thread(
&CONFIG,
db.clone(),
reqwest.clone(),
crawled.clone(),
));
}
let span = trace_span!("Loop"); while let Some(_) = futures.join_next().await {
let span = span.enter(); // Budget - Threads - This thread (1)
while crawled < budget { // Would roughly be the acceptable amount at which a thread should exit
let get_num = if budget - crawled < 100 { if *(crawled.read().await) < CONFIG.budget - CONFIG.batch_size - 1 {
budget - crawled warn!("Thread terminated early, restarting");
} else { futures.spawn(process_single_thread(
100 &CONFIG,
}; db.clone(),
reqwest.clone(),
let uncrawled = get_uncrawled_links(&db, get_num).await; crawled.clone(),
if uncrawled.len() == 0 { ));
info!("Had more budget but finished crawling everything.");
return;
}
debug!("Crawling {} pages...", uncrawled.len());
let span = trace_span!("Crawling");
let _ = span.enter();
for mut site in uncrawled {
get(&mut site, &db, &client, &mut crawled).await;
let percent = format!("{:.2}%", (crawled as f32 / budget as f32) * 100f32);
info!("Crawled {crawled} out of {budget} pages. ({percent})");
} }
} }
drop(span);
futures.join_all().await;
// main_loop_span.end();
info!("Done"); info!("Done");
} }
#[instrument(skip_all)] async fn process_single_thread(
/// A quick helper function for downloading a url config: &Config,
async fn get( db: Surreal<Client>,
site: &mut Website, reqwest: reqwest::Client,
db: &Surreal<Client>, crawled: Arc<RwLock<usize>>,
request_client: &reqwest::Client,
count: &mut usize,
) { ) {
trace!("Get: {}", site.to_string()); while *(crawled.read().await) < config.budget {
let timer = Timer::start("Got page"); let uncrawled = get_next(&db.clone(), &config).await;
match uncrawled {
if let Ok(response) = request_client.get(site.to_string()).send().await { Some(site) => {
timer.stop(); process(site, db.clone(), reqwest.clone()).await;
SITES_CRAWLED.add(1, &[]);
// Get body // Somehow this write doesn't hang on the while's read?
let data = response.text().await.unwrap(); let mut c = crawled.write().await;
let opts = ParseOpts { *c += 1;
tree_builder: TreeBuilderOpts { }
drop_doctype: true, None => {
..Default::default() warn!("fn::get_next() returned None");
}, return;
..Default::default()
};
// Get DOM
let dom = parse_document(RcDom::default(), opts)
.from_utf8()
.read_from(&mut data.as_bytes())
.unwrap();
// TODO save the dom to minio if a flag is set
// Modify record in database
site.set_crawled();
site.store(db).await;
trace!("Got: {}", site.to_string());
// Walk all the children nodes, searching for links to other pages.
let mut buffer = Vec::new();
let timer = Timer::start("Walked");
walk(&dom.document, &db, &site, &mut buffer).await;
timer.stop();
// Put all the found links into the database.
site.links_to(buffer, &db).await;
*count += 1;
}
trace!("Failed to get: {}", site.to_string());
}
/// Walks the givin site, placing it's findings in the database
async fn walk(
node: &rcdom::Handle,
db: &Surreal<Client>,
site: &Website,
links_to: &mut Vec<Thing>,
) {
let span = trace_span!("Walk");
let span = span.enter();
// Match each node - node basically means element.
match &node.data {
rcdom::NodeData::Element { name, attrs, .. } => {
for attr in attrs.borrow().clone() {
match name.local {
local_name!("a")
| local_name!("audio")
| local_name!("area")
| local_name!("img")
| local_name!("link")
| local_name!("object")
| local_name!("source")
| local_name!("base")
| local_name!("video") => {
let attribute_name = attr.name.local.to_string();
if attribute_name == "src"
|| attribute_name == "href"
|| attribute_name == "data"
{
// Get clone of the current site object
let mut web = site.clone();
// Set url
let url = web.mut_url();
url.set_fragment(None); // removes #xyz
let joined = url.join(&attr.value).unwrap();
*url = joined;
// Set other attributes
web.crawled = false;
// TODO set element name
// let element_name = name.local.to_string();
if let Some(id) = web.store(db).await {
links_to.push(id);
}
}
}
local_name!("button") | local_name!("meta") | local_name!("iframe") => {
// dbg!(attrs);
}
_ => {}
};
} }
} }
_ => {}
};
drop(span);
for child in node.children.borrow().iter() {
Box::pin(walk(child, db, site, links_to)).await;
} }
} }
/// Returns uncrawled links #[instrument(skip(db, reqwest))]
async fn get_uncrawled_links(db: &Surreal<Client>, mut count: usize) -> Vec<Website> { /// Downloads and crawls and stores a webpage.
if count > 100 { /// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver
count = 100 async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client) {
} // METRICS
debug!(url = &site.site.as_str(), "Process: {}", &site.site);
BEING_PROCESSED.add(1, &[]);
// let mut process_span = TRACER.start("Process");
let mut response = db // Build the request
.query("SELECT * FROM website WHERE crawled = false LIMIT $count") let request_builder = reqwest.get(site.site.to_string());
.bind(("count", count))
.await
.expect("Hard-coded query failed..?");
response
.take(0)
.expect("Returned websites couldn't be parsed")
}
pub struct Timer<'a> { // Send the http request (get)
start: Instant, GET_IN_FLIGHT.add(1, &[]);
msg: &'a str, if let Ok(response) = request_builder.send().await {
} let mut skip_download = false;
impl<'a> Timer<'a> { GET_IN_FLIGHT.add(-1, &[]);
#[inline]
pub fn start(msg: &'a str) -> Self { let headers = response.headers();
Self { let code = response.status();
start: Instant::now(), if code != 200 {
msg, warn!("{code} for {}", site.site.as_str());
} }
#[allow(non_snake_case)]
let CT = headers.get("Content-Type");
let ct = headers.get("content-type");
let ct = match (CT, ct) {
(None, None) => {
warn!(
"Server did not respond with Content-Type header. Url: {} Headers: ({:?})",
site.site.to_string(),
headers
);
return;
}
(None, Some(a)) => a,
(Some(a), None) => a,
(Some(a), Some(_)) => a,
};
// create filepath (handles / -> /index.html)
let real_path = filesystem::as_path(&site.site, ct);
let mut tmp_path = real_path.clone();
if !(tmp_path.add_extension("crawl_temp")) {
warn!("Failed to add extension to file");
// fallback ig
tmp_path = tmp_path.with_extension("crawl_temp");
}
// CODE FOR UPDATING DOWNLOADED CONTENT:
// Check the Content-Length header (we assume the server is telling the truth) (I don't see
// a reason for it to lie in this case).
// And see if the file on the disk is the same length.
// Yes, technically this isn't perfect, but the other option is storing ETags, which I
// don't want to do right now.
if let Some(len) = headers.get("Content-Length") {
if let Ok(s) = len.to_str() {
// length is in bytes
if let Ok(len) = s.parse::<u64>() {
if let Some(disk_len) = filesystem::check_file_length(&real_path).await {
if disk_len == len {
skip_download = true;
}
} else {
// File not found (or other error).
// Program will continue on it's way, downloading content.
}
}
}
}
// make sure that the file is good to go
if let Some(file) = filesystem::init(&tmp_path).await {
// Get body from response
// stream the response onto the disk
let mut stream = response.bytes_stream();
let should_parse = real_path.to_string_lossy().ends_with(".html");
let mut buf: Vec<u8> = Vec::new();
if skip_download && should_parse {
// since we are skipping the download we will just read the file off the disk to
// parse it
if let Ok(mut file) = tokio::fs::OpenOptions::new()
.read(true)
.open(&real_path).await
{
if let Err(err) = file.read_to_end(&mut buf).await {
warn!("Failed to read file off disk for parsing, {}", err);
}
}
}
// !!!DOWNLOADING TIME!!!
if !skip_download {
let mut writer = BufWriter::new(file);
// Write file to disk
trace!("Writing at: {:?}", tmp_path);
BEING_STREAMED.add(1, &[]);
// let mut stream_span = TRACER.start("Stream");
while let Some(data) = stream.next().await {
match data {
Ok(data) => {
TOTAL_BYTES_DOWN.add(data.len() as u64, &[]);
let _ = writer.write_all(&data).await;
// If we are going to parse this file later, we will save it
// into memory as well as the disk.
// We do this because the data here might be incomplete
if should_parse {
data.iter().for_each(|f| buf.push(*f));
}
}
Err(err) => {
error!("{err}")
}
}
}
let _ = writer.flush().await;
// rename the temp file into the real file name
if let Err(err) = tokio::fs::rename(&tmp_path, &real_path).await {
error!(
from = &*tmp_path.to_string_lossy(),
to = &*real_path.to_string_lossy(),
"Error renaming file: {}",
err
);
}
// stream_span.end();
BEING_STREAMED.add(-1, &[]);
}
// (If needed) Parse the file
if should_parse {
BEING_PARSED.add(1, &[]);
// let mut parsing_span = TRACER.start("Parsing");
// Parse document and get relationships
let sites = parser::parse(&site, &buf).await;
// De-duplicate this list
let prev_len = sites.len();
let set = sites.into_iter().fold(HashSet::new(), |mut set, item| {
set.insert(item);
set
});
let de_dupe_sites: Vec<Website> = set.into_iter().collect();
let diff = prev_len - de_dupe_sites.len();
trace!("Saved {diff} from being entered into the db by de-duping");
// Store all the other sites so that we can link to them.
let _ = Website::store_all(de_dupe_sites, &db).await;
// parsing_span.end();
BEING_PARSED.add(-1, &[]);
} else {
trace!(url = site.site.as_str(), "Parse = False");
}
// update self in db
site.crawled = true;
site.status_code = code.as_u16();
Website::store_all(vec![site.clone()], &db).await;
}
} else {
error!(url = site.site.as_str(), "Failed to get: {}", &site.site);
} }
pub fn stop(&self) -> f64 {
let dif = self.start.elapsed().as_micros(); // process_span.end();
let ms = dif as f64 / 1000.; BEING_PROCESSED.add(-1, &[]);
debug!("{}", format!("{} in {:.3}ms", self.msg, ms));
ms
}
} }
impl Drop for Timer<'_> { fn load_tracing(config: &Config) -> SdkTracerProvider {
fn drop(&mut self) { // Send spans to Alloy (which will send them to Tempo)
self.stop(); let otlp_span = opentelemetry_otlp::SpanExporter::builder()
} .with_tonic()
.with_endpoint(config.tracing_endpoint.clone())
.build()
.unwrap();
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_simple_exporter(otlp_span)
.build();
tracer_provider
}
fn load_logging(config: &Config) {
// let otlp_log = opentelemetry_otlp::LogExporter::builder()
// .with_tonic()
// .with_endpoint(endpoint)
// .build()
// .unwrap();
// let logger_provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder()
// .with_simple_exporter(otlp_log)
// .build();
let writer = std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(config.log_file.clone())
.expect("Couldn't make log file!");
let filter = EnvFilter::builder()
.with_default_directive(LevelFilter::DEBUG.into())
.from_env_lossy();
let registry = Registry::default().with(
fmt::layer()
.with_line_number(true)
.with_thread_ids(true)
.with_file(true)
.json()
.with_writer(writer)
.with_filter(filter),
);
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
}
fn load_metrics(config: &Config) -> SdkMeterProvider {
// Send metrics to Prometheus
let otlp_metrics = opentelemetry_otlp::MetricExporter::builder()
.with_http()
.with_protocol(Protocol::HttpBinary)
.with_endpoint(config.metrics_endpoint.clone())
.build()
.unwrap();
let metrics_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
.with_periodic_exporter(otlp_metrics) // default delay is 60s, turn down to like 15
.build();
metrics_provider
} }

150
src/parser.rs Normal file
View File

@@ -0,0 +1,150 @@
use std::default::Default;
use html5ever::tokenizer::{BufferQueue, TokenizerResult};
use html5ever::tokenizer::{StartTag, TagToken};
use html5ever::tokenizer::{Token, TokenSink, TokenSinkResult, Tokenizer, TokenizerOpts};
use html5ever::{local_name, tendril::*};
use tracing::{error, instrument, trace, warn};
use url::Url;
use crate::db::Website;
impl TokenSink for Website {
type Handle = Vec<Website>;
fn process_token(&self, token: Token, _line_number: u64) -> TokenSinkResult<Self::Handle> {
match token {
TagToken(tag) => {
if tag.kind == StartTag {
match tag.name {
// this should be all the html elements that have links
local_name!("a")
| local_name!("audio")
| local_name!("area")
| local_name!("img")
| local_name!("link")
| local_name!("object")
| local_name!("source")
| local_name!("base")
| local_name!("video") => {
let mut links = Vec::new();
for attr in &tag.attrs {
let attr_name = attr.name.local.to_string();
if attr_name == "src" || attr_name == "href" || attr_name == "data"
{
trace!(url = self.site.as_str(),"Found `{}` in html `{}` tag", &attr.value, tag.name);
let url = try_get_url(&self.site, &attr.value);
if let Some(mut parsed) = url {
parsed.set_query(None);
parsed.set_fragment(None);
trace!(url = self.site.as_str(), "Final cleaned URL: `{}`", parsed.to_string());
let web = Website::new(&parsed.to_string(), false);
links.push(web);
}
}
}
return TokenSinkResult::Script(links);
}
local_name!("button") | local_name!("meta") | local_name!("iframe") => {
// dbg!(attrs);
}
_ => {}
}
}
}
_ => {}
}
TokenSinkResult::Continue
}
}
#[instrument(skip(data))]
/// Parses the passed site and returns all the sites it links to.
pub async fn parse(site: &Website, data: &[u8]) -> Vec<Website> {
trace!(url = site.site.as_str(), "Parsing {}", site.site.to_string());
// prep work
let mut other_sites: Vec<Website> = Vec::new();
// change data into something that can be tokenized
let s: Result<Tendril<fmt::UTF8>, ()> = Tendril::try_from_byte_slice(data);
if let Ok(chunk) = s {
// create buffer of tokens and push our input into it
let token_buffer = BufferQueue::default();
token_buffer.push_back(
chunk
.try_reinterpret::<fmt::UTF8>()
.expect("Failed to reinterpret chunk!"),
);
// create the tokenizer
let tokenizer = Tokenizer::new(site.clone(), TokenizerOpts::default());
// go thru buffer
while let TokenizerResult::Script(mut sites) = tokenizer.feed(&token_buffer) {
other_sites.append(&mut sites);
// other_sites.push(sites);
}
assert!(token_buffer.is_empty());
tokenizer.end();
} else {
warn!(url = site.site.as_str(), "Tendril failed to parse on: {}", site.site.to_string());
}
other_sites
}
#[instrument]
fn try_get_url(parent: &Url, link: &str) -> Option<Url> {
match Url::parse(link) {
Ok(ok) => Some(ok),
Err(e) => {
if link.starts_with('#') {
trace!(url = parent.as_str(), "Rejecting # url");
None
} else if link.starts_with("//") {
// if a url starts with "//" is assumed that it will adopt
// the same scheme as it's parent
// https://stackoverflow.com/questions/9646407/two-forward-slashes-in-a-url-src-href-attribute
let scheme = parent.scheme();
match Url::parse(&format!("{scheme}://{link}")) {
Ok(url) => Some(url),
Err(err) => {
error!("Failed parsing relative scheme url: {}", err);
None
}
}
} else {
// # This is some sort of relative url, gonna try patching it up into an absolute
// url
match e {
url::ParseError::RelativeUrlWithoutBase => {
// Is: scheme://host:port
let mut origin = parent.origin().ascii_serialization();
if !origin.ends_with('/') && !link.starts_with('/') {
origin += "/";
}
let url = origin.clone() + link;
if let Ok(url) = Url::parse(&url) {
trace!(url = parent.as_str(), "Built `{url}` from `{origin} + `{}`", link.to_string());
Some(url)
} else {
error!(
"Failed to reconstruct a url from relative url: `{}` on site: `{}`. Failed url was: {}",
link,
parent.to_string(),
url
);
None
}
}
_ => {
error!("MISC error: {:?} {:?}", e, link);
None
}
}
}
}
}
}

18
src/setup.surql Normal file
View File

@@ -0,0 +1,18 @@
DEFINE TABLE IF NOT EXISTS website SCHEMALESS;
DEFINE FIELD IF NOT EXISTS site ON TABLE website TYPE string;
DEFINE INDEX IF NOT EXISTS idx ON TABLE website COLUMNS site UNIQUE;
DEFINE FIELD IF NOT EXISTS crawled ON TABLE website TYPE bool;
DEFINE FIELD IF NOT EXISTS processing ON TABLE website TYPE bool DEFAULT false;
DEFINE FIELD IF NOT EXISTS accessed_at ON TABLE website VALUE time::now();
DEFINE FIELD IF NOT EXISTS first_accessed_at ON TABLE website VALUE time::now();
DEFINE FUNCTION OVERWRITE fn::get_next($filter: string) {
LET $site = SELECT * FROM ONLY website WHERE crawled = false AND processing = false AND site ~ type::string($filter) LIMIT 1;
UPDATE $site.id SET processing = true;
RETURN $site
};
UPDATE website SET processing = false WHERE processing = true;