Compare commits

44 Commits

Author SHA1 Message Date
2c339a36f9 handle checking for file better 2025-10-09 23:00:11 -06:00
73216f7003 fix the issue where nothing works 2025-10-09 22:35:01 -06:00
1e59ebd5c4 even when not downloading, update the database 2025-10-09 22:13:06 -06:00
52d5e101d0 bragging 2025-10-09 22:03:19 -06:00
5b728bacd6 close #24, make program aware of the files already on disk 2025-10-09 21:52:41 -06:00
b0fe7f4761 close #18, format 2025-10-09 21:52:06 -06:00
5ade5e36df closes #11 2025-08-08 23:39:44 -06:00
95b8af0356 Restart threads that prematurely ended 2025-08-08 23:35:01 -06:00
ad8d7c606d increase csma/ca time 2025-08-08 23:34:45 -06:00
f3a51065b5 remove fixme 2025-07-17 09:37:03 -06:00
343d3a7570 better logging 2025-07-17 09:36:37 -06:00
e535bcc295 Merge branch 'main' of https://git.oliveratkinson.net/Oliver/internet_mapper 2025-07-17 08:59:32 -06:00
a0fd81d956 better config file 2025-07-17 08:58:30 -06:00
5cbba33a09 update how the database interactions work 2025-07-17 08:52:47 -06:00
83def7ba27 close #10 2025-07-16 16:07:37 -06:00
76e78cc745 better logging 2025-07-16 16:02:16 -06:00
b4038b76dd fix prometheus lol 2025-07-16 16:02:07 -06:00
caa523f1eb cleanup 2025-07-16 11:48:23 -06:00
f7bb0eef16 turn program into batch_size parrallel downloaders 2025-07-16 11:47:42 -06:00
865f9be8c0 Merge pull request 'works 😄' (#16) from tempfiles into main
Reviewed-on: #16
2025-07-16 02:26:14 +00:00
48abc73092 works 😄 2025-07-15 20:25:44 -06:00
0061866976 Merge pull request 'traces and new metrics work' (#13) from better_metrics into main
Reviewed-on: #13
2025-07-16 00:58:47 +00:00
9662b68b0d traces and new metrics work 2025-07-10 23:44:23 -06:00
6f98001d8e Merge pull request 'status_codes' (#8) from status_codes into main
Reviewed-on: #8
2025-07-11 00:49:27 +00:00
6790061e22 helper code 2025-07-09 15:58:22 -06:00
50606bb69e It isnt quite working yet 2025-04-17 09:59:23 -06:00
5850f19cab Merge pull request 'stream_response' (#6) from stream_response into main
Reviewed-on: #6
2025-04-17 15:39:49 +00:00
2c8546e30a logging cleanup 2025-04-17 09:36:27 -06:00
4e619d0ebc logging cleanup 2025-04-17 09:36:13 -06:00
647c4cd324 work off content-type header 2025-04-17 09:35:57 -06:00
7fab961d76 no longer how this is working 2025-04-17 09:35:26 -06:00
d3fff194f4 logging updates 2025-04-17 08:17:37 -06:00
3497312fd4 de-enshitified file saving logic 2025-04-17 08:17:29 -06:00
0fd76b1734 Merge pull request 'stream_response' (#4) from stream_response into main
Reviewed-on: #4
2025-04-15 21:23:54 +00:00
9bfa8f9108 batch_size 2025-04-15 13:38:28 -06:00
bdb1094a30 steam data to the disk 2025-04-15 13:07:47 -06:00
9aa2d9ce22 code settings 2025-04-15 13:06:53 -06:00
4b557a923c Merge pull request 'foss_storage' (#3) from foss_storage into main
Reviewed-on: #3
2025-04-15 15:11:59 +00:00
c08a20ac00 cleanup and more accuratly use metrics 2025-04-15 09:07:16 -06:00
94912e9125 change up how files are discovered 2025-04-15 09:06:57 -06:00
a9465dda6e add instructions 2025-03-31 15:05:18 -06:00
add6f00ed6 no recomp needed 2025-03-31 14:53:10 -06:00
4a433a1a77 This function sometimes throws errors, this logging should help 2025-03-31 14:18:37 -06:00
03cbcd9ae0 remove minio code 2025-03-31 14:18:11 -06:00
17 changed files with 933 additions and 585 deletions

1
.gitignore vendored
View File

@@ -6,3 +6,4 @@ flamegraph.svg
perf.data.old perf.data.old
/docker/logs/* /docker/logs/*
/downloaded /downloaded
/Crawler.toml

19
.vscode/launch.json vendored
View File

@@ -7,18 +7,15 @@
{ {
"type": "lldb", "type": "lldb",
"request": "launch", "request": "launch",
"name": "Debug executable 'surreal_spider'", "name": "Debug executable 'internet_mapper'",
"env": {
"RUST_LOG": "surreal_spider=trace,reqwest=info",
},
"cargo": { "cargo": {
"args": [ "args": [
"build", "build",
"--bin=surreal_spider", "--bin=internet_mapper",
"--package=surreal_spider" "--package=internet_mapper"
], ],
"filter": { "filter": {
"name": "surreal_spider", "name": "internet_mapper",
"kind": "bin" "kind": "bin"
} }
}, },
@@ -28,16 +25,16 @@
{ {
"type": "lldb", "type": "lldb",
"request": "launch", "request": "launch",
"name": "Debug unit tests in executable 'surreal_spider'", "name": "Debug unit tests in executable 'internet_mapper'",
"cargo": { "cargo": {
"args": [ "args": [
"test", "test",
"--no-run", "--no-run",
"--bin=surreal_spider", "--bin=internet_mapper",
"--package=surreal_spider" "--package=internet_mapper"
], ],
"filter": { "filter": {
"name": "surreal_spider", "name": "internet_mapper",
"kind": "bin" "kind": "bin"
} }
}, },

View File

@@ -3,6 +3,6 @@
"creds", "creds",
"reqwest", "reqwest",
"rustls", "rustls",
"surql" "surql",
] ]
} }

430
Cargo.lock generated
View File

@@ -103,62 +103,18 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "anstream"
version = "0.6.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is_terminal_polyfill",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9"
[[package]]
name = "anstyle-parse"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e"
dependencies = [
"anstyle",
"once_cell",
"windows-sys 0.59.0",
]
[[package]] [[package]]
name = "any_ascii" name = "any_ascii"
version = "0.3.2" version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea50b14b7a4b9343f8c627a7a53c52076482bd4bdad0a24fd3ec533ed616cc2c" checksum = "ea50b14b7a4b9343f8c627a7a53c52076482bd4bdad0a24fd3ec533ed616cc2c"
[[package]]
name = "anyhow"
version = "1.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
[[package]] [[package]]
name = "approx" name = "approx"
version = "0.4.0" version = "0.4.0"
@@ -325,17 +281,6 @@ dependencies = [
"serde_json", "serde_json",
] ]
[[package]]
name = "async-recursion"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]] [[package]]
name = "async-stream" name = "async-stream"
version = "0.3.6" version = "0.3.6"
@@ -815,12 +760,6 @@ dependencies = [
"cc", "cc",
] ]
[[package]]
name = "colorchoice"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
[[package]] [[package]]
name = "concurrent-queue" name = "concurrent-queue"
version = "2.5.0" version = "2.5.0"
@@ -871,21 +810,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "crc"
version = "3.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636"
dependencies = [
"crc-catalog",
]
[[package]]
name = "crc-catalog"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
[[package]] [[package]]
name = "crc32fast" name = "crc32fast"
version = "1.4.2" version = "1.4.2"
@@ -984,20 +908,6 @@ dependencies = [
"parking_lot_core", "parking_lot_core",
] ]
[[package]]
name = "dashmap"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]] [[package]]
name = "data-encoding" name = "data-encoding"
version = "2.8.0" version = "2.8.0"
@@ -1014,17 +924,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "derivative"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]] [[package]]
name = "deunicode" name = "deunicode"
version = "1.6.1" version = "1.6.1"
@@ -1135,29 +1034,6 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d"
[[package]]
name = "env_filter"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0"
dependencies = [
"log",
"regex",
]
[[package]]
name = "env_logger"
version = "0.11.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3716d7a920fb4fac5d84e9d4bce8ceb321e9414b4409da61b07b75c1e3d0697"
dependencies = [
"anstream",
"anstyle",
"env_filter",
"jiff",
"log",
]
[[package]] [[package]]
name = "equivalent" name = "equivalent"
version = "1.0.2" version = "1.0.2"
@@ -1727,6 +1603,19 @@ dependencies = [
"webpki-roots", "webpki-roots",
] ]
[[package]]
name = "hyper-timeout"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0"
dependencies = [
"hyper",
"hyper-util",
"pin-project-lite",
"tokio",
"tower-service",
]
[[package]] [[package]]
name = "hyper-tls" name = "hyper-tls"
version = "0.6.0" version = "0.6.0"
@@ -1966,10 +1855,14 @@ name = "internet_mapper"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"futures-util",
"html5ever 0.29.1", "html5ever 0.29.1",
"metrics", "metrics",
"metrics-exporter-prometheus", "metrics-exporter-prometheus",
"minio", "opentelemetry",
"opentelemetry-otlp",
"opentelemetry_sdk",
"rand 0.9.1",
"reqwest", "reqwest",
"serde", "serde",
"surrealdb", "surrealdb",
@@ -1986,12 +1879,6 @@ version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130"
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]] [[package]]
name = "itertools" name = "itertools"
version = "0.10.5" version = "0.10.5"
@@ -2025,30 +1912,6 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
[[package]]
name = "jiff"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d699bc6dfc879fb1bf9bdff0d4c56f0884fc6f0d0eb0fba397a6d00cd9a6b85e"
dependencies = [
"jiff-static",
"log",
"portable-atomic",
"portable-atomic-util",
"serde",
]
[[package]]
name = "jiff-static"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d16e75759ee0aa64c57a56acbf43916987b20c77373cb7e808979e02b93c9f9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]] [[package]]
name = "jobserver" name = "jobserver"
version = "0.1.32" version = "0.1.32"
@@ -2293,12 +2156,6 @@ dependencies = [
"digest", "digest",
] ]
[[package]]
name = "md5"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.7.4" version = "2.7.4"
@@ -2397,46 +2254,6 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "minio"
version = "0.2.0-alpha"
source = "git+https://github.com/minio/minio-rs.git?rev=c28f576#c28f576cb8f8cf47fb941bb9db62b2cbd6f080c1"
dependencies = [
"async-recursion",
"async-trait",
"base64 0.22.1",
"byteorder",
"bytes",
"chrono",
"crc",
"dashmap 6.1.0",
"derivative",
"env_logger",
"futures-util",
"hex",
"hmac",
"home",
"http",
"hyper",
"lazy_static",
"log",
"md5",
"multimap",
"os_info",
"percent-encoding",
"rand 0.8.5",
"regex",
"reqwest",
"serde",
"serde_json",
"sha2",
"tokio",
"tokio-stream",
"tokio-util",
"urlencoding",
"xmltree",
]
[[package]] [[package]]
name = "miniz_oxide" name = "miniz_oxide"
version = "0.8.5" version = "0.8.5"
@@ -2474,15 +2291,6 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "multimap"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "nanoid" name = "nanoid"
version = "0.4.0" version = "0.4.0"
@@ -2735,14 +2543,77 @@ dependencies = [
] ]
[[package]] [[package]]
name = "os_info" name = "opentelemetry"
version = "3.10.0" version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a604e53c24761286860eba4e2c8b23a0161526476b1de520139d69cdb85a6b5" checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6"
dependencies = [ dependencies = [
"log", "futures-core",
"serde", "futures-sink",
"windows-sys 0.52.0", "js-sys",
"pin-project-lite",
"thiserror 2.0.12",
"tracing",
]
[[package]]
name = "opentelemetry-http"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d"
dependencies = [
"async-trait",
"bytes",
"http",
"opentelemetry",
"reqwest",
]
[[package]]
name = "opentelemetry-otlp"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b"
dependencies = [
"http",
"opentelemetry",
"opentelemetry-http",
"opentelemetry-proto",
"opentelemetry_sdk",
"prost",
"reqwest",
"thiserror 2.0.12",
"tokio",
"tonic",
"tracing",
]
[[package]]
name = "opentelemetry-proto"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc"
dependencies = [
"opentelemetry",
"opentelemetry_sdk",
"prost",
"tonic",
]
[[package]]
name = "opentelemetry_sdk"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b"
dependencies = [
"futures-channel",
"futures-executor",
"futures-util",
"opentelemetry",
"percent-encoding",
"rand 0.9.1",
"serde_json",
"thiserror 2.0.12",
] ]
[[package]] [[package]]
@@ -2922,6 +2793,26 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5be167a7af36ee22fe3115051bc51f6e6c7054c9348e28deb4f49bd6f705a315" checksum = "5be167a7af36ee22fe3115051bc51f6e6c7054c9348e28deb4f49bd6f705a315"
[[package]]
name = "pin-project"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.16" version = "0.2.16"
@@ -2946,15 +2837,6 @@ version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e"
[[package]]
name = "portable-atomic-util"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507"
dependencies = [
"portable-atomic",
]
[[package]] [[package]]
name = "powerfmt" name = "powerfmt"
version = "0.2.0" version = "0.2.0"
@@ -3004,6 +2886,29 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "prost"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-derive"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [
"anyhow",
"itertools 0.13.0",
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]] [[package]]
name = "psl-types" name = "psl-types"
version = "2.0.11" version = "2.0.11"
@@ -3094,7 +2999,7 @@ checksum = "b820744eb4dc9b57a3398183639c511b5a26d2ed702cedd3febaa1393caa22cc"
dependencies = [ dependencies = [
"bytes", "bytes",
"getrandom 0.3.2", "getrandom 0.3.2",
"rand 0.9.0", "rand 0.9.1",
"ring", "ring",
"rustc-hash 2.1.1", "rustc-hash 2.1.1",
"rustls", "rustls",
@@ -3165,13 +3070,12 @@ dependencies = [
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.9.0" version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97"
dependencies = [ dependencies = [
"rand_chacha 0.9.0", "rand_chacha 0.9.0",
"rand_core 0.9.3", "rand_core 0.9.3",
"zerocopy 0.8.23",
] ]
[[package]] [[package]]
@@ -3365,6 +3269,7 @@ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"bytes", "bytes",
"encoding_rs", "encoding_rs",
"futures-channel",
"futures-core", "futures-core",
"futures-util", "futures-util",
"h2", "h2",
@@ -4195,7 +4100,7 @@ dependencies = [
"cedar-policy", "cedar-policy",
"chrono", "chrono",
"ciborium", "ciborium",
"dashmap 5.5.3", "dashmap",
"deunicode", "deunicode",
"dmp", "dmp",
"fst", "fst",
@@ -4618,6 +4523,32 @@ dependencies = [
"winnow", "winnow",
] ]
[[package]]
name = "tonic"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
dependencies = [
"async-trait",
"base64 0.22.1",
"bytes",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-timeout",
"hyper-util",
"percent-encoding",
"pin-project",
"prost",
"tokio",
"tokio-stream",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.5.2" version = "0.5.2"
@@ -4626,11 +4557,15 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-util", "futures-util",
"indexmap 2.8.0",
"pin-project-lite", "pin-project-lite",
"slab",
"sync_wrapper", "sync_wrapper",
"tokio", "tokio",
"tokio-util",
"tower-layer", "tower-layer",
"tower-service", "tower-service",
"tracing",
] ]
[[package]] [[package]]
@@ -4776,7 +4711,7 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe" checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe"
dependencies = [ dependencies = [
"rand 0.9.0", "rand 0.9.1",
"serde", "serde",
"web-time", "web-time",
] ]
@@ -4872,12 +4807,6 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "utf8parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.16.0" version = "1.16.0"
@@ -5418,21 +5347,6 @@ dependencies = [
"tap", "tap",
] ]
[[package]]
name = "xml-rs"
version = "0.8.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5b940ebc25896e71dd073bad2dbaa2abfe97b0a391415e22ad1326d9c54e3c4"
[[package]]
name = "xmltree"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b619f8c85654798007fb10afa5125590b43b088c225a25fc2fec100a9fad0fc6"
dependencies = [
"xml-rs",
]
[[package]] [[package]]
name = "yoke" name = "yoke"
version = "0.7.5" version = "0.7.5"

View File

@@ -5,12 +5,15 @@ edition = "2021"
[dependencies] [dependencies]
base64 = "0.22.1" base64 = "0.22.1"
futures-util = "0.3.31"
html5ever = "0.29" html5ever = "0.29"
metrics = "0.24.1" metrics = "0.24.1"
metrics-exporter-prometheus = { version = "0.16.2", features=["http-listener"]} metrics-exporter-prometheus = { version = "0.16.2", features=["http-listener"]}
# minio = "0.1.0" opentelemetry = "0.30.0"
minio = {git="https://github.com/minio/minio-rs.git", rev = "c28f576"} opentelemetry-otlp = { version = "0.30.0", features = ["metrics", "trace", "logs", "grpc-tonic"] }
reqwest = { version = "0.12", features = ["gzip", "default", "rustls-tls"] } opentelemetry_sdk = "0.30.0"
rand = "0.9.1"
reqwest = { version = "0.12", features = ["gzip", "default", "rustls-tls", "stream"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
surrealdb = "2.2" surrealdb = "2.2"
tokio = { version="1.41.0", features = ["full"] } tokio = { version="1.41.0", features = ["full"] }

View File

@@ -1,10 +1,22 @@
# Visability config
# Alloy (for Tempo)
tracing_endpoint = "http://localhost:4317"
# Prometheus
metrics_endpoint = "http://localhost:9090/api/v1/otlp/v1/metrics"
# Alloy (for Loki)
log_file = "./docker/logs/tracing.log"
# Surreal config # Surreal config
surreal_url = "localhost:8000" surreal_url = "localhost:8000"
surreal_username = "root" surreal_username = "root"
surreal_password = "root" surreal_password = "root"
surreal_ns = "test" surreal_ns = "test"
surreal_db = "v1.19.2" surreal_db = "v1.21.1"
# Crawler config # Crawler config
crawl_filter = "en.wikipedia.com" crawl_filter = "https://ftpgeoinfo.msl.mt.gov/Data/Spatial/MSDI/Imagery/2023_NAIP/UTM_County_Mosaics/"
budget = 1000 # crawl_filter = "https://oliveratkinson.net"
start_url = "https://ftpgeoinfo.msl.mt.gov/Data/Spatial/MSDI/Imagery/2023_NAIP/UTM_County_Mosaics/"
# start_url = "https://oliveratkinson.net"
budget = 100
batch_size = 2

View File

@@ -2,22 +2,56 @@
Crawls sites saving all the found links to a surrealdb database. It then proceeds to take batches of 100 uncrawled links untill the crawl budget is reached. It saves the data of each site in a minio database. Crawls sites saving all the found links to a surrealdb database. It then proceeds to take batches of 100 uncrawled links untill the crawl budget is reached. It saves the data of each site in a minio database.
## How to use
1. Clone the repo and `cd` into it.
2. Build the repo with `cargo build -r`
3. Start the docker conatiners
1. cd into the docker folder `cd docker`
2. Bring up the docker containers `docker compose up -d`
4. From the project's root, edit the `Crawler.toml` file to your liking.
5. Run with `./target/release/internet_mapper`
You can view stats of the project at `http://<your-ip>:3000/dashboards`
```bash
# Untested script but probably works
git clone https://git.oliveratkinson.net/Oliver/internet_mapper.git
cd internet_mapper
cargo build -r
cd docker
docker compose up -d
cd ..
$EDITOR Crawler.toml
./target/release/internet_mapper
```
### TODO ### TODO
- [ ] Domain filtering - prevent the crawler from going on alternate versions of wikipedia. - [x] Domain filtering - prevent the crawler from going on alternate versions of wikipedia.
- [ ] Conditionally save content - based on filename or file contents - [ ] Conditionally save content - based on filename or file contents
- [x] GUI / TUI ? - Graphana - [x] GUI / TUI ? - Graphana
- [x] Better asynchronous getting of the sites. Currently it all happens serially. - [x] Better asynchronous getting of the sites. Currently it all happens serially.
- [ ] Allow for storing asynchronously - [x] Allow for storing asynchronously - dropping the "links to" logic fixes this need
- [x] Control crawler via config file (no recompliation needed)
3/17/25: Took >1hr to crawl 100 pages ### Feats
3/19/25: Took 20min to crawl 1000 pages 3/17/25: Took >1hr to crawl 100 pages.
3/19/25: Took 20min to crawl 1000 pages.
This ment we stored 1000 pages, 142,997 urls, and 1,425,798 links between the two. This ment we stored 1000 pages, 142,997 urls, and 1,425,798 links between the two.
3/20/25: Took 5min to crawl 1000 pages 3/20/25: Took 5min to crawl 1000 pages.
3/21/25: Took 3min to crawl 1000 pages 3/21/25: Took 3min to crawl 1000 pages.
7/.../25: Downloaded just shy of 12TB of data from a remote server.
# About # About

View File

@@ -3,8 +3,8 @@ local.file_match "tmplogs" {
} }
loki.source.file "local_files" { loki.source.file "local_files" {
targets = local.file_match.tmplogs.targets targets = local.file_match.tmplogs.targets
forward_to = [loki.write.local_loki.receiver] forward_to = [loki.write.local_loki.receiver]
} }
loki.write "local_loki" { loki.write "local_loki" {
@@ -12,3 +12,25 @@ loki.write "local_loki" {
url = "http://loki:3100/loki/api/v1/push" url = "http://loki:3100/loki/api/v1/push"
} }
} }
otelcol.receiver.otlp "otlp_receiver" {
grpc {
endpoint = "0.0.0.0:4317"
}
http {
endpoint = "0.0.0.0:4318"
}
output {
traces = [otelcol.exporter.otlp.tempo.input,]
}
}
otelcol.exporter.otlp "tempo" {
client {
endpoint = "tempo:4317"
tls {
insecure = true
}
}
}

View File

@@ -1,4 +1,6 @@
services: services:
# Database
surreal: surreal:
image: surrealdb/surrealdb:latest-dev image: surrealdb/surrealdb:latest-dev
ports: ports:
@@ -15,6 +17,18 @@ services:
- root - root
- rocksdb:/mydata/database.db - rocksdb:/mydata/database.db
# Tracing
tempo:
image: grafana/tempo:latest
command: [ "-config.file=/etc/tempo.yaml" ]
volumes:
- ./tempo.yaml:/etc/tempo.yaml
- tempo_storage:/var/tempo
ports:
- 3200:3200 # self metrics for prometheus
- 4317:4317 # otlp grpc - (alloy)
# Log scraper
alloy: alloy:
image: grafana/alloy:latest image: grafana/alloy:latest
ports: ports:
@@ -24,9 +38,13 @@ services:
- ./logs/:/tmp/alloy-logs - ./logs/:/tmp/alloy-logs
- ./alloy.conf:/etc/alloy/config.alloy - ./alloy.conf:/etc/alloy/config.alloy
- alloy_storage:/var/lib/alloy - alloy_storage:/var/lib/alloy
command: run --server.http.listen-addr=0.0.0.0:12345 --storage.path=/var/lib/alloy/data /etc/alloy/config.alloy command:
- run
- --server.http.listen-addr=0.0.0.0:12345
- --storage.path=/var/lib/alloy/data
- /etc/alloy/config.alloy
#logs # Log storage / analysis
loki: loki:
image: grafana/loki:latest image: grafana/loki:latest
ports: ports:
@@ -35,16 +53,21 @@ services:
volumes: volumes:
- ./loki.yaml:/etc/loki/local-config.yaml - ./loki.yaml:/etc/loki/local-config.yaml
# Metrics collector # Metrics
prometheus: prometheus:
image: prom/prometheus:latest image: prom/prometheus:latest
expose: ports:
- 9090 - 9090:9090
volumes: volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml - ./prometheus.yaml:/etc/prometheus/prometheus.yml
# persist data # persist data
- prometheus_storage:/prometheus # - prometheus_storage:/prometheus
command: --web.enable-lifecycle --config.file=/etc/prometheus/prometheus.yml command:
- --enable-feature=native-histograms
- --web.enable-remote-write-receiver
- --web.enable-lifecycle
- --web.enable-otlp-receiver
- --config.file=/etc/prometheus/prometheus.yml
# Everything viewer # Everything viewer
grafana: grafana:
@@ -66,4 +89,4 @@ volumes:
grafana_storage: grafana_storage:
alloy_storage: alloy_storage:
surrealdb_storage: surrealdb_storage:
minio_storage: tempo_storage:

View File

@@ -22,3 +22,20 @@ datasources:
editable: false editable: false
jsonData: jsonData:
httpMethod: GET httpMethod: GET
- name: Tempo
type: tempo
access: proxy
orgId: 1
url: http://tempo:3200
basicAuth: false
isDefault: false
version: 1
editable: true
apiVersion: 1
uid: tempo
jsonData:
httpMethod: GET
serviceMap:
datasourceUid: prometheus
streamingEnabled:
search: true

View File

@@ -1,17 +1,15 @@
global: global:
scrape_interval: 5s scrape_interval: 60s
query_log_file: /etc/prometheus/query.log query_log_file: /etc/prometheus/query.log
scrape_configs: scrape_configs:
- job_name: crawler # Crawler configs get pushed with OTLP
# - job_name: 'loki'
# static_configs:
# - targets: ['loki:3100']
# - job_name: 'prometheus'
# static_configs:
# - targets: ['localhost:9090']
- job_name: 'tempo'
static_configs: static_configs:
# change this your machine's ip, localhost won't work - targets: ['tempo:3200']
# because localhost refers to the docker container.
- targets: ['172.20.239.48:2500']
#- targets: ['192.168.8.209:2500']
- job_name: loki
static_configs:
- targets: ['loki:3100']
- job_name: prometheus
static_configs:
- targets: ['localhost:9090']

48
docker/tempo.yaml Normal file
View File

@@ -0,0 +1,48 @@
stream_over_http_enabled: true
server:
http_listen_port: 3200
log_level: info
query_frontend:
search:
duration_slo: 5s
throughput_bytes_slo: 1.073741824e+09
metadata_slo:
duration_slo: 5s
throughput_bytes_slo: 1.073741824e+09
trace_by_id:
duration_slo: 5s
distributor:
receivers:
otlp:
protocols:
grpc:
endpoint: "tempo:4317"
metrics_generator:
registry:
external_labels:
source: tempo
cluster: docker-compose
storage:
path: /var/tempo/generator/wal
remote_write:
- url: http://prometheus:9090/api/v1/write
send_exemplars: true
traces_storage:
path: /var/tempo/generator/traces
storage:
trace:
backend: local # backend configuration to use
wal:
path: /var/tempo/wal # where to store the wal locally
local:
path: /var/tempo/blocks
overrides:
defaults:
metrics_generator:
processors: [service-graphs, span-metrics, local-blocks] # enables metrics generator
generate_native_histograms: both

View File

@@ -1,12 +1,13 @@
use metrics::counter; use metrics::counter;
use std::fmt::Debug;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{fmt::Debug, time::Duration};
use surrealdb::{ use surrealdb::{
engine::remote::ws::{Client, Ws}, engine::remote::ws::{Client, Ws},
opt::auth::Root, opt::auth::Root,
sql::Thing, sql::Thing,
Surreal, Surreal,
}; };
use tokio::time::sleep;
use tracing::{error, instrument, trace}; use tracing::{error, instrument, trace};
use url::Url; use url::Url;
@@ -16,16 +17,23 @@ const STORE: &str = "surql_store_calls";
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash)] #[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash)]
pub struct Website { pub struct Website {
pub id: Option<Thing>,
/// The url that this data is found at /// The url that this data is found at
pub site: Url, pub site: Url,
/// Wether or not this link has been crawled yet /// Wether or not this link has been crawled yet
pub crawled: bool, pub crawled: bool,
/// 200, 404, etc
pub status_code: u16,
} }
// manual impl to make tracing look nicer // manual impl to make tracing look nicer
impl Debug for Website { impl Debug for Website {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Website").field("site", &self.site).finish() f.debug_struct("Website")
.field("host", &self.site.host())
.field("path", &self.site.path())
.field("status_code", &self.status_code)
.finish()
} }
} }
@@ -38,17 +46,15 @@ impl Website {
}; };
Self { Self {
crawled, crawled,
site site,
status_code: 0,
id: None,
} }
} }
pub fn set_crawled(&mut self) {
trace!("Set crawled to true");
self.crawled = true
}
// Insert ever item in the vec into surreal, crawled state will be preserved as TRUE // Insert ever item in the vec into surreal, crawled state will be preserved as TRUE
// if already in the database as such or incoming data is TRUE. // if already in the database as such or incoming data is TRUE.
#[instrument(skip(db))]
pub async fn store_all(all: Vec<Self>, db: &Surreal<Client>) -> Vec<Thing> { pub async fn store_all(all: Vec<Self>, db: &Surreal<Client>) -> Vec<Thing> {
counter!(STORE).increment(1); counter!(STORE).increment(1);
let mut things = Vec::with_capacity(all.len()); let mut things = Vec::with_capacity(all.len());
@@ -58,6 +64,8 @@ impl Website {
"INSERT INTO website $array "INSERT INTO website $array
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
accessed_at = time::now(), accessed_at = time::now(),
status_code = $input.status_code,
processing = false,
crawled = crawled OR $input.crawled crawled = crawled OR $input.crawled
RETURN VALUE id; RETURN VALUE id;
", ",
@@ -77,18 +85,47 @@ impl Website {
} }
} }
/// Returns uncrawled links
#[instrument(skip(db, config))]
pub async fn get_next(db: &Surreal<Client>, config: &Config) -> Option<Website> {
let mut res: Option<Website> = None;
let mut fails = 0;
while res == None {
let mut response = db
.query("fn::get_next($format)")
.bind(("format", config.crawl_filter.to_string()))
.await
.expect("Hard-coded query failed..?");
res = match response.take(0) {
Ok(ok) => ok,
Err(_err) => {
// basically just CSMA/CA
let delay = rand::random_range(10..10_000);
sleep(Duration::from_millis(delay)).await;
fails += 1;
// Don't get stuck here forever, failing...
// (most I've seen is 1)
if fails > 5 {
error!("Max attempts to get_next() reached... ({fails})");
return None
}
None
}
};
}
res
}
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
#[allow(dead_code)]
pub struct Email { pub struct Email {
pub email: String, pub email: String,
pub on: String, pub on: String,
} }
#[derive(Debug, Deserialize)]
pub struct Record {
#[allow(dead_code)]
pub id: Thing,
}
#[instrument(skip_all, name = "SurrealDB")] #[instrument(skip_all, name = "SurrealDB")]
pub async fn connect(config: &Config) -> surrealdb::Result<Surreal<Client>> { pub async fn connect(config: &Config) -> surrealdb::Result<Surreal<Client>> {
trace!("Establishing connection to surreal..."); trace!("Establishing connection to surreal...");
@@ -109,12 +146,11 @@ pub async fn connect(config: &Config) -> surrealdb::Result<Surreal<Client>> {
.await?; .await?;
let setup = include_bytes!("setup.surql"); let setup = include_bytes!("setup.surql");
let file = setup.iter().map(|c| *c as char).collect::<String>(); let init_commands = setup.iter().map(|c| *c as char).collect::<String>();
db.query(file) db.query(init_commands)
.await .await
.expect("Failed to setup surreal tables."); .expect("Failed to setup surreal tables.");
Ok(db) Ok(db)
} }

View File

@@ -1,70 +1,105 @@
use std::{ffi::OsStr, path::PathBuf}; use std::{io::ErrorKind, path::PathBuf};
use reqwest::header::HeaderValue;
use tokio::fs; use tokio::fs;
use tracing::{debug, error, instrument, trace, warn}; use tracing::{error, trace, warn};
use url::Url; use url::Url;
#[instrument(skip(data))] pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf {
pub async fn store(data: &str, url: &Url) {
// extract data from url to save it accurately // extract data from url to save it accurately
let url_path = PathBuf::from("./downloaded/".to_string() + url.domain().unwrap_or("UnknownDomain") + url.path()); let mut url_path = PathBuf::from("./downloaded/".to_string() + url.domain().unwrap_or("UnknownDomain") + url.path());
// if it's a file if let Ok(header) = content_type.to_str() {
let (basepath, filename) = if url_path.extension().filter(valid_file_extension).is_some() { // text/html; charset=UTF-8; option=value
// get everything up till the file let ttype = if let Some((t, _)) = header.split_once(';') {
let basepath = url_path.ancestors().skip(1).take(1).collect::<PathBuf>(); t
// get the file name } else {
let filename = url_path.file_name().expect("This should exist").to_string_lossy(); header
trace!("Save path: {:?} and base path: {:?}", &url_path, &basepath); };
(basepath, filename.to_string())
if let Some((ttype, subtype)) = ttype.split_once('/') {
trace!(url = url.to_string(), main_type = ttype, sub_type = subtype, "Found Content-Type to be: {ttype}/{subtype}");
// If the Content-Type header is "*/html" (most likely "text/html") and the path's
// extension is anything but html:
if subtype=="html" && !url_path.extension().is_some_and(|f| f=="html" || f=="htm" ) {
// time to slap a index.html to the end of that path there!
url_path = url_path.join("index.html");
}
}
} else { } else {
(url_path.clone(), "index.html".into()) warn!("Header: {:?} couldn't be parsed into a string!", content_type);
}; }
trace!(url = url.to_string(), path = &*url_path.to_string_lossy(), "Converted URL into path");
debug!("Writing at: {:?} {:?}", basepath, filename); url_path
}
// create the folders pub async fn check_file_length(file: &PathBuf) -> Option<u64> {
if let Err(err) = fs::create_dir_all(&basepath).await { match tokio::fs::OpenOptions::new()
error!("Dir creation: {err} {:?}", basepath); .write(false)
} else { .read(true)
// FIXME I don't think this handles index.html files well... .create(false)
// TODO this should probably append .html to non-described files .open(file).await
// create the file if that was successful {
if let Err(err) = fs::write(&basepath.join(filename), data).await { Ok(file) => {
error!("File creation: {err} {:?}", url_path); match file.metadata().await {
Ok(meta) => {
return Some(meta.len())
},
Err(err) => {
error!("Failed to get metadata. {}", err)
},
}
},
Err(err) => {
match err.kind() {
ErrorKind::NotFound => {/* ignore */},
_ => warn!("Failed to open file to check length. {:?} {}", file, err),
}
},
}
None
}
pub async fn init(filename: &PathBuf) -> Option<fs::File> {
let file = async || tokio::fs::OpenOptions::new()
.write(true)
.append(false)
.create(true)
.open(&filename).await;
match file().await {
Ok(ok) => {
trace!("Initialized file {}", filename.to_str().unwrap_or("N/A"));
Some(ok)
},
Err(err) => {
// the file/folder isn't found
if err.kind() == ErrorKind::NotFound {
if let Some(parent ) = &filename.parent() {
// create the folders
if let Err(err) = fs::create_dir_all(&parent).await {
error!("Dir creation: {err} {:?}", filename);
} else if let Ok(ok) = file().await {
return Some(ok);
}
} else {
error!("Couldn't get file's parents: {:?}", &filename);
}
} else if err.kind() == ErrorKind::NotADirectory {
// Example:
// 1. example.com/user
// 2. example.com/user/post
// If file 1 exists it will prevent file 2 from existing
// FIXME
error!("One of the parent directories is actually a file...")
} else {
error!("File open error: {err} {:?}", filename);
}
// we don't care about other errors, we can't/shouldn't fix them
None
} }
} }
} }
fn valid_file_extension(take: &&OsStr) -> bool {
let los = take.to_string_lossy();
let all = los.split('.');
match all.last() {
Some(s) => {
match s.to_lowercase().as_str() {
"html" => true,
"css" => true,
"js" => true,
"ts" => true,
"otf" => true, // font
"png" => true,
"svg" => true,
"jpg" => true,
"jpeg" => true,
"mp4" => true,
"mp3" => true,
"webp" => true,
"pdf" => true,
"json" => true,
"xml" => true,
_ => {
warn!("Might be forgetting a file extension: {s}");
false
}
}
},
None => false,
}
}

View File

@@ -1,31 +1,83 @@
#![feature(ip_from)] #![feature(ip_from)]
#![feature(path_add_extension)]
#![deny(clippy::unwrap_used)]
extern crate html5ever; extern crate html5ever;
use std::{ use std::{
collections::HashSet, fs::File, io::Read, net::{IpAddr, Ipv4Addr} collections::HashSet,
fs::File,
io::Read,
sync::{Arc, LazyLock},
}; };
use db::{connect, Website}; use db::{connect, Website};
use metrics::{counter, gauge}; use futures_util::StreamExt;
use metrics_exporter_prometheus::PrometheusBuilder; use opentelemetry::{
global::{self},
metrics::{Counter, Meter, UpDownCounter},
};
use opentelemetry_otlp::{Protocol, WithExportConfig};
use opentelemetry_sdk::{metrics::SdkMeterProvider, trace::SdkTracerProvider};
use serde::Deserialize; use serde::Deserialize;
use surrealdb::{engine::remote::ws::Client, Surreal}; use surrealdb::{engine::remote::ws::Client, Surreal};
use tokio::task::JoinSet; use tokio::{
use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, trace_span}; io::{AsyncReadExt, AsyncWriteExt, BufWriter},
sync::RwLock,
task::JoinSet,
};
use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, warn};
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry}; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry};
mod db; use crate::db::get_next;
mod parser;
mod filesystem;
const GET_METRIC: &str = "total_gets"; mod db;
const GET_IN_FLIGHT: &str = "gets_in_flight"; mod filesystem;
const SITES_CRAWLED: &str = "pages_crawled"; mod parser;
const BEING_PROCESSED: &str = "pages_being_processed";
static METER: LazyLock<Meter> = LazyLock::new(|| global::meter("Internet_Mapper"));
static BATCH_SIZE: LazyLock<Counter<u64>> =
LazyLock::new(|| METER.u64_counter("crawler_batch_size").build());
static BEING_PROCESSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER
.i64_up_down_counter("crawler_pages_being_processed")
.build()
});
static BEING_PARSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER
.i64_up_down_counter("crawler_pages_being_parsed")
.build()
});
static BEING_STREAMED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER
.i64_up_down_counter("crawler_pages_being_streamed")
.build()
});
static GET_IN_FLIGHT: LazyLock<UpDownCounter<i64>> =
LazyLock::new(|| METER.i64_up_down_counter("crawler_gets_in_flight").build());
static TOTAL_BYTES_DOWN: LazyLock<Counter<u64>> =
LazyLock::new(|| METER.u64_counter("crawler_total_bytes_down").build());
static SITES_CRAWLED: LazyLock<Counter<u64>> =
LazyLock::new(|| METER.u64_counter("crawler_total_sites_crawled").build());
static CONFIG: LazyLock<Config> = LazyLock::new(|| {
let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml");
let mut buf = String::new();
let _ = file.read_to_string(&mut buf);
let config: Config = toml::from_str(&buf).expect("Failed to parse Crawler.toml");
config
});
// FIXME Traces aren't working on multiple threads, they block
// static TRACER: LazyLock<BoxedTracer> = LazyLock::new(|| global::tracer("Internet_Mapper"));
#[derive(Deserialize)] #[derive(Deserialize)]
struct Config { struct Config {
tracing_endpoint: String,
metrics_endpoint: String,
log_file: String,
surreal_ns: String, surreal_ns: String,
surreal_db: String, surreal_db: String,
surreal_url: String, surreal_url: String,
@@ -33,15 +85,302 @@ struct Config {
surreal_password: String, surreal_password: String,
crawl_filter: String, crawl_filter: String,
start_url: String,
budget: usize, budget: usize,
batch_size: usize,
} }
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
println!("Logs and metrics are provided to the Grafana dashboard");
// Start TRACE / LOGGING / METRICS
load_logging(&CONFIG); // this seems to be working ok
global::set_tracer_provider(load_tracing(&CONFIG));
global::set_meter_provider(load_metrics(&CONFIG));
BATCH_SIZE.add(CONFIG.batch_size as u64, &[]);
// When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
// let crawl_filter = "en.wikipedia.org/";
// let budget = 50;
let crawled = Arc::new(RwLock::new(0));
let starting_url = &CONFIG.start_url;
let db = connect(&CONFIG)
.await
.expect("Failed to connect to surreal, aborting.");
let reqwest = reqwest::Client::builder()
// .use_rustls_tls()
.gzip(true)
.build()
.expect("Failed to build reqwest client.");
// Kick off the whole machine - This Website object doesn't matter, it's just to allow for
// get() to work.
// let mut span = TRACER.start("Pre-Loop");
let site = Website::new(starting_url, false);
process(site, db.clone(), reqwest.clone()).await;
// span.end();
// let mut main_loop_span= TRACER.start("Main-Loop");
let mut futures = JoinSet::new();
for _ in 0..CONFIG.batch_size {
futures.spawn(process_single_thread(
&CONFIG,
db.clone(),
reqwest.clone(),
crawled.clone(),
));
}
while let Some(_) = futures.join_next().await {
// Budget - Threads - This thread (1)
// Would roughly be the acceptable amount at which a thread should exit
if *(crawled.read().await) < CONFIG.budget - CONFIG.batch_size - 1 {
warn!("Thread terminated early, restarting");
futures.spawn(process_single_thread(
&CONFIG,
db.clone(),
reqwest.clone(),
crawled.clone(),
));
}
}
futures.join_all().await;
// main_loop_span.end();
info!("Done");
}
async fn process_single_thread(
config: &Config,
db: Surreal<Client>,
reqwest: reqwest::Client,
crawled: Arc<RwLock<usize>>,
) {
while *(crawled.read().await) < config.budget {
let uncrawled = get_next(&db.clone(), &config).await;
match uncrawled {
Some(site) => {
process(site, db.clone(), reqwest.clone()).await;
SITES_CRAWLED.add(1, &[]);
// Somehow this write doesn't hang on the while's read?
let mut c = crawled.write().await;
*c += 1;
}
None => {
warn!("fn::get_next() returned None");
return;
}
}
}
}
#[instrument(skip(db, reqwest))]
/// Downloads and crawls and stores a webpage.
/// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver
async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client) {
// METRICS
debug!(url = &site.site.as_str(), "Process: {}", &site.site);
BEING_PROCESSED.add(1, &[]);
// let mut process_span = TRACER.start("Process");
// Build the request
let request_builder = reqwest.get(site.site.to_string());
// Send the http request (get)
GET_IN_FLIGHT.add(1, &[]);
if let Ok(response) = request_builder.send().await {
let mut skip_download = false;
GET_IN_FLIGHT.add(-1, &[]);
let headers = response.headers();
let code = response.status();
if code != 200 {
warn!("{code} for {}", site.site.as_str());
}
#[allow(non_snake_case)]
let CT = headers.get("Content-Type");
let ct = headers.get("content-type");
let ct = match (CT, ct) {
(None, None) => {
warn!(
"Server did not respond with Content-Type header. Url: {} Headers: ({:?})",
site.site.to_string(),
headers
);
return;
}
(None, Some(a)) => a,
(Some(a), None) => a,
(Some(a), Some(_)) => a,
};
// create filepath (handles / -> /index.html)
let real_path = filesystem::as_path(&site.site, ct);
let mut tmp_path = real_path.clone();
if !(tmp_path.add_extension("crawl_temp")) {
warn!("Failed to add extension to file");
// fallback ig
tmp_path = tmp_path.with_extension("crawl_temp");
}
// CODE FOR UPDATING DOWNLOADED CONTENT:
// Check the Content-Length header (we assume the server is telling the truth) (I don't see
// a reason for it to lie in this case).
// And see if the file on the disk is the same length.
// Yes, technically this isn't perfect, but the other option is storing ETags, which I
// don't want to do right now.
if let Some(len) = headers.get("Content-Length") {
if let Ok(s) = len.to_str() {
// length is in bytes
if let Ok(len) = s.parse::<u64>() {
if let Some(disk_len) = filesystem::check_file_length(&real_path).await {
if disk_len == len {
skip_download = true;
}
} else {
// File not found (or other error).
// Program will continue on it's way, downloading content.
}
}
}
}
// make sure that the file is good to go
if let Some(file) = filesystem::init(&tmp_path).await {
// Get body from response
// stream the response onto the disk
let mut stream = response.bytes_stream();
let should_parse = real_path.to_string_lossy().ends_with(".html");
let mut buf: Vec<u8> = Vec::new();
if skip_download && should_parse {
// since we are skipping the download we will just read the file off the disk to
// parse it
if let Ok(mut file) = tokio::fs::OpenOptions::new()
.read(true)
.open(&real_path).await
{
if let Err(err) = file.read_to_end(&mut buf).await {
warn!("Failed to read file off disk for parsing, {}", err);
}
}
}
// !!!DOWNLOADING TIME!!!
if !skip_download {
let mut writer = BufWriter::new(file);
// Write file to disk
trace!("Writing at: {:?}", tmp_path);
BEING_STREAMED.add(1, &[]);
// let mut stream_span = TRACER.start("Stream");
while let Some(data) = stream.next().await {
match data {
Ok(data) => {
TOTAL_BYTES_DOWN.add(data.len() as u64, &[]);
let _ = writer.write_all(&data).await;
// If we are going to parse this file later, we will save it
// into memory as well as the disk.
// We do this because the data here might be incomplete
if should_parse {
data.iter().for_each(|f| buf.push(*f));
}
}
Err(err) => {
error!("{err}")
}
}
}
let _ = writer.flush().await;
// rename the temp file into the real file name
if let Err(err) = tokio::fs::rename(&tmp_path, &real_path).await {
error!(
from = &*tmp_path.to_string_lossy(),
to = &*real_path.to_string_lossy(),
"Error renaming file: {}",
err
);
}
// stream_span.end();
BEING_STREAMED.add(-1, &[]);
}
// (If needed) Parse the file
if should_parse {
BEING_PARSED.add(1, &[]);
// let mut parsing_span = TRACER.start("Parsing");
// Parse document and get relationships
let sites = parser::parse(&site, &buf).await;
// De-duplicate this list
let prev_len = sites.len();
let set = sites.into_iter().fold(HashSet::new(), |mut set, item| {
set.insert(item);
set
});
let de_dupe_sites: Vec<Website> = set.into_iter().collect();
let diff = prev_len - de_dupe_sites.len();
trace!("Saved {diff} from being entered into the db by de-duping");
// Store all the other sites so that we can link to them.
let _ = Website::store_all(de_dupe_sites, &db).await;
// parsing_span.end();
BEING_PARSED.add(-1, &[]);
} else {
trace!(url = site.site.as_str(), "Parse = False");
}
// update self in db
site.crawled = true;
site.status_code = code.as_u16();
Website::store_all(vec![site.clone()], &db).await;
}
} else {
error!(url = site.site.as_str(), "Failed to get: {}", &site.site);
}
// process_span.end();
BEING_PROCESSED.add(-1, &[]);
}
fn load_tracing(config: &Config) -> SdkTracerProvider {
// Send spans to Alloy (which will send them to Tempo)
let otlp_span = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(config.tracing_endpoint.clone())
.build()
.unwrap();
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_simple_exporter(otlp_span)
.build();
tracer_provider
}
fn load_logging(config: &Config) {
// let otlp_log = opentelemetry_otlp::LogExporter::builder()
// .with_tonic()
// .with_endpoint(endpoint)
// .build()
// .unwrap();
// let logger_provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder()
// .with_simple_exporter(otlp_log)
// .build();
let writer = std::fs::OpenOptions::new() let writer = std::fs::OpenOptions::new()
.append(true) .append(true)
.create(true) .create(true)
.open("./docker/logs/tracing.log") .open(config.log_file.clone())
.expect("Couldn't make log file!"); .expect("Couldn't make log file!");
let filter = EnvFilter::builder() let filter = EnvFilter::builder()
@@ -55,167 +394,22 @@ async fn main() {
.with_file(true) .with_file(true)
.json() .json()
.with_writer(writer) .with_writer(writer)
.with_filter(filter) .with_filter(filter),
); );
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber"); tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
}
let builder = PrometheusBuilder::new(); fn load_metrics(config: &Config) -> SdkMeterProvider {
builder // Send metrics to Prometheus
.with_http_listener(std::net::SocketAddr::new( let otlp_metrics = opentelemetry_otlp::MetricExporter::builder()
IpAddr::V4(Ipv4Addr::from_octets([0, 0, 0, 0])), .with_http()
2500, .with_protocol(Protocol::HttpBinary)
)) .with_endpoint(config.metrics_endpoint.clone())
.install()
.expect("failed to install recorder/exporter");
info!("Starting...");
// Would probably take these in as parameters from a cli
let starting_url = "https://en.wikipedia.org/";
// When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
// let crawl_filter = "en.wikipedia.org/";
// let budget = 50;
let mut crawled = 0;
let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml");
let mut buf = String::new();
let _ = file.read_to_string(&mut buf);
let config: Config = toml::from_str(&buf).expect("Failed to parse Crawler.toml");
let db = connect(&config)
.await
.expect("Failed to connect to surreal, aborting.");
let reqwest = reqwest::Client::builder()
// .use_rustls_tls()
.gzip(true)
.build() .build()
.expect("Failed to build reqwest client."); .unwrap();
let metrics_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
// Kick off the whole machine - This Website object doesn't matter, it's just to allow for .with_periodic_exporter(otlp_metrics) // default delay is 60s, turn down to like 15
// get() to work. .build();
let span = trace_span!("Pre-Loop"); metrics_provider
let pre_loop_span = span.enter();
// Download the site
let site = Website::new(starting_url, false);
process(site, db.clone(), reqwest.clone()).await;
drop(pre_loop_span);
let span = trace_span!("Loop");
let span = span.enter();
while crawled < config.budget {
let get_num = if config.budget - crawled < 100 {
config.budget - crawled
} else {
100
};
let uncrawled = get_uncrawled_links(&db, get_num, config.crawl_filter.clone()).await;
if uncrawled.is_empty() {
info!("Had more budget but finished crawling everything.");
return;
}
{
let mut futures = JoinSet::new();
for site in uncrawled {
gauge!(BEING_PROCESSED).increment(1);
futures.spawn(process(site, db.clone(), reqwest.clone()));
// let percent = format!("{:.2}%", (crawled as f32 / budget as f32) * 100f32);
// info!("Crawled {crawled} out of {budget} pages. ({percent})");
}
let c = counter!(SITES_CRAWLED);
// As futures complete runs code in while block
while futures.join_next().await.is_some() {
c.increment(1);
gauge!(BEING_PROCESSED).decrement(1);
crawled += 1;
}
}
}
drop(span);
info!("Done");
} }
#[instrument(skip(db, reqwest))]
/// Downloads and crawls and stores a webpage.
/// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver
async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client) {
// METRICS
trace!("Process: {}", &site.site);
// Build the request
let request_builder = reqwest.get(site.site.to_string());
// METRICS
let g = gauge!(GET_IN_FLIGHT);
g.increment(1);
// Send the http request (get)
if let Ok(response) = request_builder.send().await {
// METRICS
g.decrement(1);
counter!(GET_METRIC).increment(1);
// Get body from response
let data = response
.text()
.await
.expect("Failed to read http response's body!");
// Store document
filesystem::store(&data, &site.site).await;
// Parse document and get relationships
let sites = parser::parse(&site, &data).await;
// update self in db
site.set_crawled();
Website::store_all(vec![site], &db).await;
// De-duplicate this list
let prev_len = sites.len();
let set = sites.into_iter().fold(HashSet::new(), |mut set,item| {
set.insert(item);
set
});
let de_dupe_sites: Vec<Website> = set.into_iter().collect();
let diff = prev_len - de_dupe_sites.len();
trace!("Saved {diff} from being entered into the db by de-duping");
// Store all the other sites so that we can link to them.
let _ = Website::store_all(de_dupe_sites, &db).await;
} else {
error!("Failed to get: {}", &site.site);
}
}
/// Returns uncrawled links
#[instrument(skip(db))]
async fn get_uncrawled_links(
db: &Surreal<Client>,
mut count: usize,
filter: String,
) -> Vec<Website> {
if count > 100 {
count = 100
}
debug!("Getting uncrawled links");
let mut response = db
.query("SELECT * FROM website WHERE crawled = false AND site ~ type::string($format) LIMIT $count;")
.bind(("format", filter))
.bind(("count", count))
.await
.expect("Hard-coded query failed..?");
response
.take(0)
.expect("Returned websites couldn't be parsed")
}

View File

@@ -1,11 +1,10 @@
use std::default::Default; use std::default::Default;
use std::str::FromStr;
use html5ever::tokenizer::{BufferQueue, TokenizerResult}; use html5ever::tokenizer::{BufferQueue, TokenizerResult};
use html5ever::tokenizer::{StartTag, TagToken}; use html5ever::tokenizer::{StartTag, TagToken};
use html5ever::tokenizer::{Token, TokenSink, TokenSinkResult, Tokenizer, TokenizerOpts}; use html5ever::tokenizer::{Token, TokenSink, TokenSinkResult, Tokenizer, TokenizerOpts};
use html5ever::{local_name, tendril::*}; use html5ever::{local_name, tendril::*};
use tracing::{debug, error, instrument, trace, warn}; use tracing::{error, instrument, trace, warn};
use url::Url; use url::Url;
use crate::db::Website; use crate::db::Website;
@@ -13,7 +12,6 @@ use crate::db::Website;
impl TokenSink for Website { impl TokenSink for Website {
type Handle = Vec<Website>; type Handle = Vec<Website>;
#[instrument(skip(token, _line_number))]
fn process_token(&self, token: Token, _line_number: u64) -> TokenSinkResult<Self::Handle> { fn process_token(&self, token: Token, _line_number: u64) -> TokenSinkResult<Self::Handle> {
match token { match token {
TagToken(tag) => { TagToken(tag) => {
@@ -34,13 +32,13 @@ impl TokenSink for Website {
let attr_name = attr.name.local.to_string(); let attr_name = attr.name.local.to_string();
if attr_name == "src" || attr_name == "href" || attr_name == "data" if attr_name == "src" || attr_name == "href" || attr_name == "data"
{ {
trace!("Found `{}` in html `{}` tag", &attr.value, tag.name); trace!(url = self.site.as_str(),"Found `{}` in html `{}` tag", &attr.value, tag.name);
let url = try_get_url(&self.site, &attr.value); let url = try_get_url(&self.site, &attr.value);
if let Some(mut parsed) = url { if let Some(mut parsed) = url {
parsed.set_query(None); parsed.set_query(None);
parsed.set_fragment(None); parsed.set_fragment(None);
debug!("Final cleaned URL: `{}`", parsed.to_string()); trace!(url = self.site.as_str(), "Final cleaned URL: `{}`", parsed.to_string());
let web = Website::new(&parsed.to_string(), false); let web = Website::new(&parsed.to_string(), false);
links.push(web); links.push(web);
} }
@@ -61,31 +59,36 @@ impl TokenSink for Website {
} }
} }
#[instrument(skip_all)] #[instrument(skip(data))]
/// Parses the passed site and returns all the sites it links to. /// Parses the passed site and returns all the sites it links to.
pub async fn parse(site: &Website, data: &str) -> Vec<Website> { pub async fn parse(site: &Website, data: &[u8]) -> Vec<Website> {
trace!(url = site.site.as_str(), "Parsing {}", site.site.to_string());
// prep work // prep work
let mut other_sites: Vec<Website> = Vec::new(); let mut other_sites: Vec<Website> = Vec::new();
// change data into something that can be tokenized // change data into something that can be tokenized
let chunk = Tendril::from_str(data).expect("Failed to parse string into Tendril!"); let s: Result<Tendril<fmt::UTF8>, ()> = Tendril::try_from_byte_slice(data);
// create buffer of tokens and push our input into it if let Ok(chunk) = s {
let token_buffer = BufferQueue::default(); // create buffer of tokens and push our input into it
token_buffer.push_back( let token_buffer = BufferQueue::default();
chunk token_buffer.push_back(
.try_reinterpret::<fmt::UTF8>() chunk
.expect("Failed to reinterpret chunk!"), .try_reinterpret::<fmt::UTF8>()
); .expect("Failed to reinterpret chunk!"),
// create the tokenizer );
let tokenizer = Tokenizer::new(site.clone(), TokenizerOpts::default()); // create the tokenizer
let tokenizer = Tokenizer::new(site.clone(), TokenizerOpts::default());
// go thru buffer // go thru buffer
while let TokenizerResult::Script(mut sites) = tokenizer.feed(&token_buffer) { while let TokenizerResult::Script(mut sites) = tokenizer.feed(&token_buffer) {
other_sites.append(&mut sites); other_sites.append(&mut sites);
// other_sites.push(sites); // other_sites.push(sites);
}
assert!(token_buffer.is_empty());
tokenizer.end();
} else {
warn!(url = site.site.as_str(), "Tendril failed to parse on: {}", site.site.to_string());
} }
assert!(token_buffer.is_empty());
tokenizer.end();
other_sites other_sites
} }
@@ -96,7 +99,7 @@ fn try_get_url(parent: &Url, link: &str) -> Option<Url> {
Ok(ok) => Some(ok), Ok(ok) => Some(ok),
Err(e) => { Err(e) => {
if link.starts_with('#') { if link.starts_with('#') {
trace!("Rejecting # url"); trace!(url = parent.as_str(), "Rejecting # url");
None None
} else if link.starts_with("//") { } else if link.starts_with("//") {
// if a url starts with "//" is assumed that it will adopt // if a url starts with "//" is assumed that it will adopt
@@ -104,32 +107,34 @@ fn try_get_url(parent: &Url, link: &str) -> Option<Url> {
// https://stackoverflow.com/questions/9646407/two-forward-slashes-in-a-url-src-href-attribute // https://stackoverflow.com/questions/9646407/two-forward-slashes-in-a-url-src-href-attribute
let scheme = parent.scheme(); let scheme = parent.scheme();
match Url::parse(&format!("{scheme}://{}", link)) { match Url::parse(&format!("{scheme}://{link}")) {
Ok(url) => Some(url), Ok(url) => Some(url),
Err(err) => { Err(err) => {
error!("Failed parsing realative scheme url: {}", err); error!("Failed parsing relative scheme url: {}", err);
None None
} }
} }
} else { } else {
// # This is some sort of realative url, gonna try patching it up into an absolute // # This is some sort of relative url, gonna try patching it up into an absolute
// url // url
match e { match e {
url::ParseError::RelativeUrlWithoutBase => { url::ParseError::RelativeUrlWithoutBase => {
// Is: scheme://host:port // Is: scheme://host:port
let origin = parent.origin().ascii_serialization(); let mut origin = parent.origin().ascii_serialization();
if !origin.ends_with('/') && !link.starts_with('/') {
origin += "/";
}
let url = origin.clone() + link; let url = origin.clone() + link;
trace!("Built `{url}` from `{origin} + {}`", link.to_string());
if let Ok(url) = Url::parse(&url) { if let Ok(url) = Url::parse(&url) {
trace!("Saved relative url `{}` AS: `{}`", link, url); trace!(url = parent.as_str(), "Built `{url}` from `{origin} + `{}`", link.to_string());
Some(url) Some(url)
} else { } else {
error!( error!(
"Failed to reconstruct a url from relative url: `{}` on site: `{}`", "Failed to reconstruct a url from relative url: `{}` on site: `{}`. Failed url was: {}",
link, link,
parent.to_string() parent.to_string(),
url
); );
None None
} }

View File

@@ -4,6 +4,15 @@ DEFINE FIELD IF NOT EXISTS site ON TABLE website TYPE string;
DEFINE INDEX IF NOT EXISTS idx ON TABLE website COLUMNS site UNIQUE; DEFINE INDEX IF NOT EXISTS idx ON TABLE website COLUMNS site UNIQUE;
DEFINE FIELD IF NOT EXISTS crawled ON TABLE website TYPE bool; DEFINE FIELD IF NOT EXISTS crawled ON TABLE website TYPE bool;
DEFINE FIELD IF NOT EXISTS processing ON TABLE website TYPE bool DEFAULT false;
DEFINE FIELD IF NOT EXISTS accessed_at ON TABLE website VALUE time::now(); DEFINE FIELD IF NOT EXISTS accessed_at ON TABLE website VALUE time::now();
DEFINE FIELD IF NOT EXISTS first_accessed_at ON TABLE website VALUE time::now(); DEFINE FIELD IF NOT EXISTS first_accessed_at ON TABLE website VALUE time::now();
DEFINE FUNCTION OVERWRITE fn::get_next($filter: string) {
LET $site = SELECT * FROM ONLY website WHERE crawled = false AND processing = false AND site ~ type::string($filter) LIMIT 1;
UPDATE $site.id SET processing = true;
RETURN $site
};
UPDATE website SET processing = false WHERE processing = true;