Compare commits

35 Commits

Author SHA1 Message Date
2c339a36f9 handle checking for file better 2025-10-09 23:00:11 -06:00
73216f7003 fix the issue where nothing works 2025-10-09 22:35:01 -06:00
1e59ebd5c4 even when not downloading, update the database 2025-10-09 22:13:06 -06:00
52d5e101d0 bragging 2025-10-09 22:03:19 -06:00
5b728bacd6 close #24, make program aware of the files already on disk 2025-10-09 21:52:41 -06:00
b0fe7f4761 close #18, format 2025-10-09 21:52:06 -06:00
5ade5e36df closes #11 2025-08-08 23:39:44 -06:00
95b8af0356 Restart threads that prematurely ended 2025-08-08 23:35:01 -06:00
ad8d7c606d increase csma/ca time 2025-08-08 23:34:45 -06:00
f3a51065b5 remove fixme 2025-07-17 09:37:03 -06:00
343d3a7570 better logging 2025-07-17 09:36:37 -06:00
e535bcc295 Merge branch 'main' of https://git.oliveratkinson.net/Oliver/internet_mapper 2025-07-17 08:59:32 -06:00
a0fd81d956 better config file 2025-07-17 08:58:30 -06:00
5cbba33a09 update how the database interactions work 2025-07-17 08:52:47 -06:00
83def7ba27 close #10 2025-07-16 16:07:37 -06:00
76e78cc745 better logging 2025-07-16 16:02:16 -06:00
b4038b76dd fix prometheus lol 2025-07-16 16:02:07 -06:00
caa523f1eb cleanup 2025-07-16 11:48:23 -06:00
f7bb0eef16 turn program into batch_size parrallel downloaders 2025-07-16 11:47:42 -06:00
865f9be8c0 Merge pull request 'works 😄' (#16) from tempfiles into main
Reviewed-on: #16
2025-07-16 02:26:14 +00:00
48abc73092 works 😄 2025-07-15 20:25:44 -06:00
0061866976 Merge pull request 'traces and new metrics work' (#13) from better_metrics into main
Reviewed-on: #13
2025-07-16 00:58:47 +00:00
9662b68b0d traces and new metrics work 2025-07-10 23:44:23 -06:00
6f98001d8e Merge pull request 'status_codes' (#8) from status_codes into main
Reviewed-on: #8
2025-07-11 00:49:27 +00:00
6790061e22 helper code 2025-07-09 15:58:22 -06:00
50606bb69e It isnt quite working yet 2025-04-17 09:59:23 -06:00
5850f19cab Merge pull request 'stream_response' (#6) from stream_response into main
Reviewed-on: #6
2025-04-17 15:39:49 +00:00
2c8546e30a logging cleanup 2025-04-17 09:36:27 -06:00
4e619d0ebc logging cleanup 2025-04-17 09:36:13 -06:00
647c4cd324 work off content-type header 2025-04-17 09:35:57 -06:00
7fab961d76 no longer how this is working 2025-04-17 09:35:26 -06:00
d3fff194f4 logging updates 2025-04-17 08:17:37 -06:00
3497312fd4 de-enshitified file saving logic 2025-04-17 08:17:29 -06:00
0fd76b1734 Merge pull request 'stream_response' (#4) from stream_response into main
Reviewed-on: #4
2025-04-15 21:23:54 +00:00
9bfa8f9108 batch_size 2025-04-15 13:38:28 -06:00
16 changed files with 780 additions and 1958 deletions

1
.gitignore vendored
View File

@@ -6,3 +6,4 @@ flamegraph.svg
perf.data.old perf.data.old
/docker/logs/* /docker/logs/*
/downloaded /downloaded
/Crawler.toml

429
Cargo.lock generated
View File

@@ -103,62 +103,18 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "anstream"
version = "0.6.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is_terminal_polyfill",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9"
[[package]]
name = "anstyle-parse"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e"
dependencies = [
"anstyle",
"once_cell",
"windows-sys 0.59.0",
]
[[package]] [[package]]
name = "any_ascii" name = "any_ascii"
version = "0.3.2" version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea50b14b7a4b9343f8c627a7a53c52076482bd4bdad0a24fd3ec533ed616cc2c" checksum = "ea50b14b7a4b9343f8c627a7a53c52076482bd4bdad0a24fd3ec533ed616cc2c"
[[package]]
name = "anyhow"
version = "1.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
[[package]] [[package]]
name = "approx" name = "approx"
version = "0.4.0" version = "0.4.0"
@@ -325,17 +281,6 @@ dependencies = [
"serde_json", "serde_json",
] ]
[[package]]
name = "async-recursion"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]] [[package]]
name = "async-stream" name = "async-stream"
version = "0.3.6" version = "0.3.6"
@@ -815,12 +760,6 @@ dependencies = [
"cc", "cc",
] ]
[[package]]
name = "colorchoice"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
[[package]] [[package]]
name = "concurrent-queue" name = "concurrent-queue"
version = "2.5.0" version = "2.5.0"
@@ -871,21 +810,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "crc"
version = "3.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636"
dependencies = [
"crc-catalog",
]
[[package]]
name = "crc-catalog"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
[[package]] [[package]]
name = "crc32fast" name = "crc32fast"
version = "1.4.2" version = "1.4.2"
@@ -984,20 +908,6 @@ dependencies = [
"parking_lot_core", "parking_lot_core",
] ]
[[package]]
name = "dashmap"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]] [[package]]
name = "data-encoding" name = "data-encoding"
version = "2.8.0" version = "2.8.0"
@@ -1014,17 +924,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "derivative"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]] [[package]]
name = "deunicode" name = "deunicode"
version = "1.6.1" version = "1.6.1"
@@ -1135,29 +1034,6 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d"
[[package]]
name = "env_filter"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0"
dependencies = [
"log",
"regex",
]
[[package]]
name = "env_logger"
version = "0.11.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3716d7a920fb4fac5d84e9d4bce8ceb321e9414b4409da61b07b75c1e3d0697"
dependencies = [
"anstream",
"anstyle",
"env_filter",
"jiff",
"log",
]
[[package]] [[package]]
name = "equivalent" name = "equivalent"
version = "1.0.2" version = "1.0.2"
@@ -1727,6 +1603,19 @@ dependencies = [
"webpki-roots", "webpki-roots",
] ]
[[package]]
name = "hyper-timeout"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0"
dependencies = [
"hyper",
"hyper-util",
"pin-project-lite",
"tokio",
"tower-service",
]
[[package]] [[package]]
name = "hyper-tls" name = "hyper-tls"
version = "0.6.0" version = "0.6.0"
@@ -1970,7 +1859,10 @@ dependencies = [
"html5ever 0.29.1", "html5ever 0.29.1",
"metrics", "metrics",
"metrics-exporter-prometheus", "metrics-exporter-prometheus",
"minio", "opentelemetry",
"opentelemetry-otlp",
"opentelemetry_sdk",
"rand 0.9.1",
"reqwest", "reqwest",
"serde", "serde",
"surrealdb", "surrealdb",
@@ -1987,12 +1879,6 @@ version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130"
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]] [[package]]
name = "itertools" name = "itertools"
version = "0.10.5" version = "0.10.5"
@@ -2026,30 +1912,6 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
[[package]]
name = "jiff"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d699bc6dfc879fb1bf9bdff0d4c56f0884fc6f0d0eb0fba397a6d00cd9a6b85e"
dependencies = [
"jiff-static",
"log",
"portable-atomic",
"portable-atomic-util",
"serde",
]
[[package]]
name = "jiff-static"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d16e75759ee0aa64c57a56acbf43916987b20c77373cb7e808979e02b93c9f9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]] [[package]]
name = "jobserver" name = "jobserver"
version = "0.1.32" version = "0.1.32"
@@ -2294,12 +2156,6 @@ dependencies = [
"digest", "digest",
] ]
[[package]]
name = "md5"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.7.4" version = "2.7.4"
@@ -2398,46 +2254,6 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "minio"
version = "0.2.0-alpha"
source = "git+https://github.com/minio/minio-rs.git?rev=c28f576#c28f576cb8f8cf47fb941bb9db62b2cbd6f080c1"
dependencies = [
"async-recursion",
"async-trait",
"base64 0.22.1",
"byteorder",
"bytes",
"chrono",
"crc",
"dashmap 6.1.0",
"derivative",
"env_logger",
"futures-util",
"hex",
"hmac",
"home",
"http",
"hyper",
"lazy_static",
"log",
"md5",
"multimap",
"os_info",
"percent-encoding",
"rand 0.8.5",
"regex",
"reqwest",
"serde",
"serde_json",
"sha2",
"tokio",
"tokio-stream",
"tokio-util",
"urlencoding",
"xmltree",
]
[[package]] [[package]]
name = "miniz_oxide" name = "miniz_oxide"
version = "0.8.5" version = "0.8.5"
@@ -2475,15 +2291,6 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "multimap"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "nanoid" name = "nanoid"
version = "0.4.0" version = "0.4.0"
@@ -2736,14 +2543,77 @@ dependencies = [
] ]
[[package]] [[package]]
name = "os_info" name = "opentelemetry"
version = "3.10.0" version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a604e53c24761286860eba4e2c8b23a0161526476b1de520139d69cdb85a6b5" checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6"
dependencies = [ dependencies = [
"log", "futures-core",
"serde", "futures-sink",
"windows-sys 0.52.0", "js-sys",
"pin-project-lite",
"thiserror 2.0.12",
"tracing",
]
[[package]]
name = "opentelemetry-http"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d"
dependencies = [
"async-trait",
"bytes",
"http",
"opentelemetry",
"reqwest",
]
[[package]]
name = "opentelemetry-otlp"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b"
dependencies = [
"http",
"opentelemetry",
"opentelemetry-http",
"opentelemetry-proto",
"opentelemetry_sdk",
"prost",
"reqwest",
"thiserror 2.0.12",
"tokio",
"tonic",
"tracing",
]
[[package]]
name = "opentelemetry-proto"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc"
dependencies = [
"opentelemetry",
"opentelemetry_sdk",
"prost",
"tonic",
]
[[package]]
name = "opentelemetry_sdk"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b"
dependencies = [
"futures-channel",
"futures-executor",
"futures-util",
"opentelemetry",
"percent-encoding",
"rand 0.9.1",
"serde_json",
"thiserror 2.0.12",
] ]
[[package]] [[package]]
@@ -2923,6 +2793,26 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5be167a7af36ee22fe3115051bc51f6e6c7054c9348e28deb4f49bd6f705a315" checksum = "5be167a7af36ee22fe3115051bc51f6e6c7054c9348e28deb4f49bd6f705a315"
[[package]]
name = "pin-project"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.16" version = "0.2.16"
@@ -2947,15 +2837,6 @@ version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e"
[[package]]
name = "portable-atomic-util"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507"
dependencies = [
"portable-atomic",
]
[[package]] [[package]]
name = "powerfmt" name = "powerfmt"
version = "0.2.0" version = "0.2.0"
@@ -3005,6 +2886,29 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "prost"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-derive"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [
"anyhow",
"itertools 0.13.0",
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]] [[package]]
name = "psl-types" name = "psl-types"
version = "2.0.11" version = "2.0.11"
@@ -3095,7 +2999,7 @@ checksum = "b820744eb4dc9b57a3398183639c511b5a26d2ed702cedd3febaa1393caa22cc"
dependencies = [ dependencies = [
"bytes", "bytes",
"getrandom 0.3.2", "getrandom 0.3.2",
"rand 0.9.0", "rand 0.9.1",
"ring", "ring",
"rustc-hash 2.1.1", "rustc-hash 2.1.1",
"rustls", "rustls",
@@ -3166,13 +3070,12 @@ dependencies = [
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.9.0" version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97"
dependencies = [ dependencies = [
"rand_chacha 0.9.0", "rand_chacha 0.9.0",
"rand_core 0.9.3", "rand_core 0.9.3",
"zerocopy 0.8.23",
] ]
[[package]] [[package]]
@@ -3366,6 +3269,7 @@ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"bytes", "bytes",
"encoding_rs", "encoding_rs",
"futures-channel",
"futures-core", "futures-core",
"futures-util", "futures-util",
"h2", "h2",
@@ -4196,7 +4100,7 @@ dependencies = [
"cedar-policy", "cedar-policy",
"chrono", "chrono",
"ciborium", "ciborium",
"dashmap 5.5.3", "dashmap",
"deunicode", "deunicode",
"dmp", "dmp",
"fst", "fst",
@@ -4619,6 +4523,32 @@ dependencies = [
"winnow", "winnow",
] ]
[[package]]
name = "tonic"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
dependencies = [
"async-trait",
"base64 0.22.1",
"bytes",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-timeout",
"hyper-util",
"percent-encoding",
"pin-project",
"prost",
"tokio",
"tokio-stream",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.5.2" version = "0.5.2"
@@ -4627,11 +4557,15 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-util", "futures-util",
"indexmap 2.8.0",
"pin-project-lite", "pin-project-lite",
"slab",
"sync_wrapper", "sync_wrapper",
"tokio", "tokio",
"tokio-util",
"tower-layer", "tower-layer",
"tower-service", "tower-service",
"tracing",
] ]
[[package]] [[package]]
@@ -4777,7 +4711,7 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe" checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe"
dependencies = [ dependencies = [
"rand 0.9.0", "rand 0.9.1",
"serde", "serde",
"web-time", "web-time",
] ]
@@ -4873,12 +4807,6 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "utf8parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.16.0" version = "1.16.0"
@@ -5419,21 +5347,6 @@ dependencies = [
"tap", "tap",
] ]
[[package]]
name = "xml-rs"
version = "0.8.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5b940ebc25896e71dd073bad2dbaa2abfe97b0a391415e22ad1326d9c54e3c4"
[[package]]
name = "xmltree"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b619f8c85654798007fb10afa5125590b43b088c225a25fc2fec100a9fad0fc6"
dependencies = [
"xml-rs",
]
[[package]] [[package]]
name = "yoke" name = "yoke"
version = "0.7.5" version = "0.7.5"

View File

@@ -9,8 +9,10 @@ futures-util = "0.3.31"
html5ever = "0.29" html5ever = "0.29"
metrics = "0.24.1" metrics = "0.24.1"
metrics-exporter-prometheus = { version = "0.16.2", features=["http-listener"]} metrics-exporter-prometheus = { version = "0.16.2", features=["http-listener"]}
# minio = "0.1.0" opentelemetry = "0.30.0"
minio = {git="https://github.com/minio/minio-rs.git", rev = "c28f576"} opentelemetry-otlp = { version = "0.30.0", features = ["metrics", "trace", "logs", "grpc-tonic"] }
opentelemetry_sdk = "0.30.0"
rand = "0.9.1"
reqwest = { version = "0.12", features = ["gzip", "default", "rustls-tls", "stream"] } reqwest = { version = "0.12", features = ["gzip", "default", "rustls-tls", "stream"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
surrealdb = "2.2" surrealdb = "2.2"

View File

@@ -1,11 +1,22 @@
# Visability config
# Alloy (for Tempo)
tracing_endpoint = "http://localhost:4317"
# Prometheus
metrics_endpoint = "http://localhost:9090/api/v1/otlp/v1/metrics"
# Alloy (for Loki)
log_file = "./docker/logs/tracing.log"
# Surreal config # Surreal config
surreal_url = "localhost:8000" surreal_url = "localhost:8000"
surreal_username = "root" surreal_username = "root"
surreal_password = "root" surreal_password = "root"
surreal_ns = "test" surreal_ns = "test"
surreal_db = "v1.19.5" surreal_db = "v1.21.1"
# Crawler config # Crawler config
crawl_filter = "en.wikipedia.org" crawl_filter = "https://ftpgeoinfo.msl.mt.gov/Data/Spatial/MSDI/Imagery/2023_NAIP/UTM_County_Mosaics/"
start_url = "https://en.wikipedia.org" # crawl_filter = "https://oliveratkinson.net"
start_url = "https://ftpgeoinfo.msl.mt.gov/Data/Spatial/MSDI/Imagery/2023_NAIP/UTM_County_Mosaics/"
# start_url = "https://oliveratkinson.net"
budget = 100 budget = 100
batch_size = 2

View File

@@ -40,14 +40,18 @@ $EDITOR Crawler.toml
- [x] Allow for storing asynchronously - dropping the "links to" logic fixes this need - [x] Allow for storing asynchronously - dropping the "links to" logic fixes this need
- [x] Control crawler via config file (no recompliation needed) - [x] Control crawler via config file (no recompliation needed)
3/17/25: Took >1hr to crawl 100 pages ### Feats
3/19/25: Took 20min to crawl 1000 pages 3/17/25: Took >1hr to crawl 100 pages.
3/19/25: Took 20min to crawl 1000 pages.
This ment we stored 1000 pages, 142,997 urls, and 1,425,798 links between the two. This ment we stored 1000 pages, 142,997 urls, and 1,425,798 links between the two.
3/20/25: Took 5min to crawl 1000 pages 3/20/25: Took 5min to crawl 1000 pages.
3/21/25: Took 3min to crawl 1000 pages 3/21/25: Took 3min to crawl 1000 pages.
7/.../25: Downloaded just shy of 12TB of data from a remote server.
# About # About

View File

@@ -12,3 +12,25 @@ loki.write "local_loki" {
url = "http://loki:3100/loki/api/v1/push" url = "http://loki:3100/loki/api/v1/push"
} }
} }
otelcol.receiver.otlp "otlp_receiver" {
grpc {
endpoint = "0.0.0.0:4317"
}
http {
endpoint = "0.0.0.0:4318"
}
output {
traces = [otelcol.exporter.otlp.tempo.input,]
}
}
otelcol.exporter.otlp "tempo" {
client {
endpoint = "tempo:4317"
tls {
insecure = true
}
}
}

View File

@@ -1,4 +1,6 @@
services: services:
# Database
surreal: surreal:
image: surrealdb/surrealdb:latest-dev image: surrealdb/surrealdb:latest-dev
ports: ports:
@@ -15,6 +17,18 @@ services:
- root - root
- rocksdb:/mydata/database.db - rocksdb:/mydata/database.db
# Tracing
tempo:
image: grafana/tempo:latest
command: [ "-config.file=/etc/tempo.yaml" ]
volumes:
- ./tempo.yaml:/etc/tempo.yaml
- tempo_storage:/var/tempo
ports:
- 3200:3200 # self metrics for prometheus
- 4317:4317 # otlp grpc - (alloy)
# Log scraper
alloy: alloy:
image: grafana/alloy:latest image: grafana/alloy:latest
ports: ports:
@@ -24,9 +38,13 @@ services:
- ./logs/:/tmp/alloy-logs - ./logs/:/tmp/alloy-logs
- ./alloy.conf:/etc/alloy/config.alloy - ./alloy.conf:/etc/alloy/config.alloy
- alloy_storage:/var/lib/alloy - alloy_storage:/var/lib/alloy
command: run --server.http.listen-addr=0.0.0.0:12345 --storage.path=/var/lib/alloy/data /etc/alloy/config.alloy command:
- run
- --server.http.listen-addr=0.0.0.0:12345
- --storage.path=/var/lib/alloy/data
- /etc/alloy/config.alloy
#logs # Log storage / analysis
loki: loki:
image: grafana/loki:latest image: grafana/loki:latest
ports: ports:
@@ -35,16 +53,21 @@ services:
volumes: volumes:
- ./loki.yaml:/etc/loki/local-config.yaml - ./loki.yaml:/etc/loki/local-config.yaml
# Metrics collector # Metrics
prometheus: prometheus:
image: prom/prometheus:latest image: prom/prometheus:latest
expose: ports:
- 9090 - 9090:9090
volumes: volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml - ./prometheus.yaml:/etc/prometheus/prometheus.yml
# persist data # persist data
- prometheus_storage:/prometheus # - prometheus_storage:/prometheus
command: --web.enable-lifecycle --config.file=/etc/prometheus/prometheus.yml command:
- --enable-feature=native-histograms
- --web.enable-remote-write-receiver
- --web.enable-lifecycle
- --web.enable-otlp-receiver
- --config.file=/etc/prometheus/prometheus.yml
# Everything viewer # Everything viewer
grafana: grafana:
@@ -66,3 +89,4 @@ volumes:
grafana_storage: grafana_storage:
alloy_storage: alloy_storage:
surrealdb_storage: surrealdb_storage:
tempo_storage:

View File

@@ -22,3 +22,20 @@ datasources:
editable: false editable: false
jsonData: jsonData:
httpMethod: GET httpMethod: GET
- name: Tempo
type: tempo
access: proxy
orgId: 1
url: http://tempo:3200
basicAuth: false
isDefault: false
version: 1
editable: true
apiVersion: 1
uid: tempo
jsonData:
httpMethod: GET
serviceMap:
datasourceUid: prometheus
streamingEnabled:
search: true

View File

@@ -1,17 +1,15 @@
global: global:
scrape_interval: 5s scrape_interval: 60s
query_log_file: /etc/prometheus/query.log query_log_file: /etc/prometheus/query.log
scrape_configs: scrape_configs:
- job_name: crawler # Crawler configs get pushed with OTLP
# - job_name: 'loki'
# static_configs:
# - targets: ['loki:3100']
# - job_name: 'prometheus'
# static_configs:
# - targets: ['localhost:9090']
- job_name: 'tempo'
static_configs: static_configs:
# change this your machine's ip, localhost won't work - targets: ['tempo:3200']
# because localhost refers to the docker container.
- targets: ['172.20.239.48:2500']
#- targets: ['192.168.8.209:2500']
- job_name: loki
static_configs:
- targets: ['loki:3100']
- job_name: prometheus
static_configs:
- targets: ['localhost:9090']

48
docker/tempo.yaml Normal file
View File

@@ -0,0 +1,48 @@
stream_over_http_enabled: true
server:
http_listen_port: 3200
log_level: info
query_frontend:
search:
duration_slo: 5s
throughput_bytes_slo: 1.073741824e+09
metadata_slo:
duration_slo: 5s
throughput_bytes_slo: 1.073741824e+09
trace_by_id:
duration_slo: 5s
distributor:
receivers:
otlp:
protocols:
grpc:
endpoint: "tempo:4317"
metrics_generator:
registry:
external_labels:
source: tempo
cluster: docker-compose
storage:
path: /var/tempo/generator/wal
remote_write:
- url: http://prometheus:9090/api/v1/write
send_exemplars: true
traces_storage:
path: /var/tempo/generator/traces
storage:
trace:
backend: local # backend configuration to use
wal:
path: /var/tempo/wal # where to store the wal locally
local:
path: /var/tempo/blocks
overrides:
defaults:
metrics_generator:
processors: [service-graphs, span-metrics, local-blocks] # enables metrics generator
generate_native_histograms: both

View File

@@ -1,12 +1,13 @@
use metrics::counter; use metrics::counter;
use std::fmt::Debug;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{fmt::Debug, time::Duration};
use surrealdb::{ use surrealdb::{
engine::remote::ws::{Client, Ws}, engine::remote::ws::{Client, Ws},
opt::auth::Root, opt::auth::Root,
sql::Thing, sql::Thing,
Surreal, Surreal,
}; };
use tokio::time::sleep;
use tracing::{error, instrument, trace}; use tracing::{error, instrument, trace};
use url::Url; use url::Url;
@@ -16,16 +17,23 @@ const STORE: &str = "surql_store_calls";
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash)] #[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash)]
pub struct Website { pub struct Website {
pub id: Option<Thing>,
/// The url that this data is found at /// The url that this data is found at
pub site: Url, pub site: Url,
/// Wether or not this link has been crawled yet /// Wether or not this link has been crawled yet
pub crawled: bool, pub crawled: bool,
/// 200, 404, etc
pub status_code: u16,
} }
// manual impl to make tracing look nicer // manual impl to make tracing look nicer
impl Debug for Website { impl Debug for Website {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Website").field("site", &self.site).finish() f.debug_struct("Website")
.field("host", &self.site.host())
.field("path", &self.site.path())
.field("status_code", &self.status_code)
.finish()
} }
} }
@@ -38,15 +46,12 @@ impl Website {
}; };
Self { Self {
crawled, crawled,
site site,
status_code: 0,
id: None,
} }
} }
pub fn set_crawled(&mut self) {
trace!("Set crawled to true");
self.crawled = true
}
// Insert ever item in the vec into surreal, crawled state will be preserved as TRUE // Insert ever item in the vec into surreal, crawled state will be preserved as TRUE
// if already in the database as such or incoming data is TRUE. // if already in the database as such or incoming data is TRUE.
#[instrument(skip(db))] #[instrument(skip(db))]
@@ -59,6 +64,8 @@ impl Website {
"INSERT INTO website $array "INSERT INTO website $array
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
accessed_at = time::now(), accessed_at = time::now(),
status_code = $input.status_code,
processing = false,
crawled = crawled OR $input.crawled crawled = crawled OR $input.crawled
RETURN VALUE id; RETURN VALUE id;
", ",
@@ -78,18 +85,47 @@ impl Website {
} }
} }
/// Returns uncrawled links
#[instrument(skip(db, config))]
pub async fn get_next(db: &Surreal<Client>, config: &Config) -> Option<Website> {
let mut res: Option<Website> = None;
let mut fails = 0;
while res == None {
let mut response = db
.query("fn::get_next($format)")
.bind(("format", config.crawl_filter.to_string()))
.await
.expect("Hard-coded query failed..?");
res = match response.take(0) {
Ok(ok) => ok,
Err(_err) => {
// basically just CSMA/CA
let delay = rand::random_range(10..10_000);
sleep(Duration::from_millis(delay)).await;
fails += 1;
// Don't get stuck here forever, failing...
// (most I've seen is 1)
if fails > 5 {
error!("Max attempts to get_next() reached... ({fails})");
return None
}
None
}
};
}
res
}
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
#[allow(dead_code)]
pub struct Email { pub struct Email {
pub email: String, pub email: String,
pub on: String, pub on: String,
} }
#[derive(Debug, Deserialize)]
pub struct Record {
#[allow(dead_code)]
pub id: Thing,
}
#[instrument(skip_all, name = "SurrealDB")] #[instrument(skip_all, name = "SurrealDB")]
pub async fn connect(config: &Config) -> surrealdb::Result<Surreal<Client>> { pub async fn connect(config: &Config) -> surrealdb::Result<Surreal<Client>> {
trace!("Establishing connection to surreal..."); trace!("Establishing connection to surreal...");
@@ -110,12 +146,11 @@ pub async fn connect(config: &Config) -> surrealdb::Result<Surreal<Client>> {
.await?; .await?;
let setup = include_bytes!("setup.surql"); let setup = include_bytes!("setup.surql");
let file = setup.iter().map(|c| *c as char).collect::<String>(); let init_commands = setup.iter().map(|c| *c as char).collect::<String>();
db.query(file) db.query(init_commands)
.await .await
.expect("Failed to setup surreal tables."); .expect("Failed to setup surreal tables.");
Ok(db) Ok(db)
} }

View File

@@ -1,40 +1,79 @@
use std::{ffi::OsStr, io::ErrorKind, path::PathBuf}; use std::{io::ErrorKind, path::PathBuf};
use reqwest::header::HeaderValue;
use tokio::fs; use tokio::fs;
use tracing::{error, trace}; use tracing::{error, trace, warn};
use url::Url; use url::Url;
pub fn as_path(url: &Url) -> PathBuf { pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf {
// extract data from url to save it accurately // extract data from url to save it accurately
let url_path = PathBuf::from("./downloaded/".to_string() + url.domain().unwrap_or("UnknownDomain") + url.path()); let mut url_path = PathBuf::from("./downloaded/".to_string() + url.domain().unwrap_or("UnknownDomain") + url.path());
// if it's a file if let Ok(header) = content_type.to_str() {
let (basepath, filename) = if url_path.extension().filter(valid_file_extension).is_some() { // text/html; charset=UTF-8; option=value
// get everything up till the file let ttype = if let Some((t, _)) = header.split_once(';') {
let basepath = url_path.ancestors().skip(1).take(1).collect::<PathBuf>(); t
// get the file name
let filename = url_path.file_name().expect("This should exist").to_string_lossy();
trace!("Save path: {:?} and base path: {:?}", &url_path, &basepath);
(basepath, filename.to_string())
} else { } else {
(url_path.clone(), "index.html".into()) header
}; };
let mut path = PathBuf::new(); if let Some((ttype, subtype)) = ttype.split_once('/') {
path = path.join(basepath); trace!(url = url.to_string(), main_type = ttype, sub_type = subtype, "Found Content-Type to be: {ttype}/{subtype}");
path = path.join(filename); // If the Content-Type header is "*/html" (most likely "text/html") and the path's
// extension is anything but html:
if subtype=="html" && !url_path.extension().is_some_and(|f| f=="html" || f=="htm" ) {
// time to slap a index.html to the end of that path there!
url_path = url_path.join("index.html");
}
}
} else {
warn!("Header: {:?} couldn't be parsed into a string!", content_type);
}
trace!(url = url.to_string(), path = &*url_path.to_string_lossy(), "Converted URL into path");
url_path
}
pub async fn check_file_length(file: &PathBuf) -> Option<u64> {
match tokio::fs::OpenOptions::new()
.write(false)
.read(true)
.create(false)
.open(file).await
{
Ok(file) => {
match file.metadata().await {
Ok(meta) => {
return Some(meta.len())
},
Err(err) => {
error!("Failed to get metadata. {}", err)
},
}
},
Err(err) => {
match err.kind() {
ErrorKind::NotFound => {/* ignore */},
_ => warn!("Failed to open file to check length. {:?} {}", file, err),
}
},
}
None
path
} }
pub async fn init(filename: &PathBuf) -> Option<fs::File> { pub async fn init(filename: &PathBuf) -> Option<fs::File> {
let file = async || tokio::fs::OpenOptions::new() let file = async || tokio::fs::OpenOptions::new()
.append(true) .write(true)
.append(false)
.create(true) .create(true)
.open(&filename).await; .open(&filename).await;
match file().await { match file().await {
Ok(ok) => Some(ok), Ok(ok) => {
trace!("Initialized file {}", filename.to_str().unwrap_or("N/A"));
Some(ok)
},
Err(err) => { Err(err) => {
// the file/folder isn't found // the file/folder isn't found
if err.kind() == ErrorKind::NotFound { if err.kind() == ErrorKind::NotFound {
@@ -42,37 +81,25 @@ pub async fn init(filename: &PathBuf) -> Option<fs::File> {
// create the folders // create the folders
if let Err(err) = fs::create_dir_all(&parent).await { if let Err(err) = fs::create_dir_all(&parent).await {
error!("Dir creation: {err} {:?}", filename); error!("Dir creation: {err} {:?}", filename);
eprintln!("{}", err)
} else if let Ok(ok) = file().await { } else if let Ok(ok) = file().await {
return Some(ok); return Some(ok);
} }
} else { } else {
error!("Couldn't get file's parents: {:?}", &filename); error!("Couldn't get file's parents: {:?}", &filename);
} }
} else if err.kind() == ErrorKind::NotADirectory {
// Example:
// 1. example.com/user
// 2. example.com/user/post
// If file 1 exists it will prevent file 2 from existing
// FIXME
error!("One of the parent directories is actually a file...")
} else { } else {
error!("File creation: {err} {:?}", filename); error!("File open error: {err} {:?}", filename);
} }
// we don't care about other errors, we can't/shouldn't fix them // we don't care about other errors, we can't/shouldn't fix them
None None
} }
} }
} }
fn valid_file_extension(take: &&OsStr) -> bool {
let los = take.to_string_lossy();
let all = los.split('.');
match all.last() {
Some(s) => {
// FIXME it's worth noting that the dumb tlds like .zip are in here,
// which could cause problems
let all_domains = include_str!("tlds-alpha-by-domain.txt");
// check if it is a domain
match all_domains.lines().map(str::to_lowercase).find(|x| x==s.to_lowercase().as_str()) {
Some(_) => false,
None => true
}
},
None => false,
}
}

View File

@@ -1,39 +1,83 @@
#![feature(ip_from)] #![feature(ip_from)]
#![feature(async_closure)] #![feature(path_add_extension)]
#![warn(clippy::expect_used)]
#![deny(clippy::unwrap_used)] #![deny(clippy::unwrap_used)]
extern crate html5ever; extern crate html5ever;
use futures_util::StreamExt;
use std::{ use std::{
collections::HashSet, collections::HashSet,
fs::File, fs::File,
io::Read, io::Read,
net::{IpAddr, Ipv4Addr}, sync::{Arc, LazyLock},
}; };
use db::{connect, Website}; use db::{connect, Website};
use metrics::{counter, gauge}; use futures_util::StreamExt;
use metrics_exporter_prometheus::PrometheusBuilder; use opentelemetry::{
global::{self},
metrics::{Counter, Meter, UpDownCounter},
};
use opentelemetry_otlp::{Protocol, WithExportConfig};
use opentelemetry_sdk::{metrics::SdkMeterProvider, trace::SdkTracerProvider};
use serde::Deserialize; use serde::Deserialize;
use surrealdb::{engine::remote::ws::Client, Surreal}; use surrealdb::{engine::remote::ws::Client, Surreal};
use tokio::{io::AsyncWriteExt, task::JoinSet}; use tokio::{
use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, trace_span}; io::{AsyncReadExt, AsyncWriteExt, BufWriter},
sync::RwLock,
task::JoinSet,
};
use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, warn};
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry}; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry};
use crate::db::get_next;
mod db; mod db;
mod filesystem; mod filesystem;
mod parser; mod parser;
const GET_METRIC: &str = "total_gets"; static METER: LazyLock<Meter> = LazyLock::new(|| global::meter("Internet_Mapper"));
const GET_IN_FLIGHT: &str = "gets_in_flight"; static BATCH_SIZE: LazyLock<Counter<u64>> =
const SITES_CRAWLED: &str = "pages_crawled"; LazyLock::new(|| METER.u64_counter("crawler_batch_size").build());
const BEING_PROCESSED: &str = "pages_being_processed"; static BEING_PROCESSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER
.i64_up_down_counter("crawler_pages_being_processed")
.build()
});
static BEING_PARSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER
.i64_up_down_counter("crawler_pages_being_parsed")
.build()
});
static BEING_STREAMED: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
METER
.i64_up_down_counter("crawler_pages_being_streamed")
.build()
});
static GET_IN_FLIGHT: LazyLock<UpDownCounter<i64>> =
LazyLock::new(|| METER.i64_up_down_counter("crawler_gets_in_flight").build());
static TOTAL_BYTES_DOWN: LazyLock<Counter<u64>> =
LazyLock::new(|| METER.u64_counter("crawler_total_bytes_down").build());
static SITES_CRAWLED: LazyLock<Counter<u64>> =
LazyLock::new(|| METER.u64_counter("crawler_total_sites_crawled").build());
static CONFIG: LazyLock<Config> = LazyLock::new(|| {
let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml");
let mut buf = String::new();
let _ = file.read_to_string(&mut buf);
let config: Config = toml::from_str(&buf).expect("Failed to parse Crawler.toml");
config
});
// FIXME Traces aren't working on multiple threads, they block
// static TRACER: LazyLock<BoxedTracer> = LazyLock::new(|| global::tracer("Internet_Mapper"));
#[derive(Deserialize)] #[derive(Deserialize)]
struct Config { struct Config {
tracing_endpoint: String,
metrics_endpoint: String,
log_file: String,
surreal_ns: String, surreal_ns: String,
surreal_db: String, surreal_db: String,
surreal_url: String, surreal_url: String,
@@ -50,52 +94,21 @@ struct Config {
async fn main() { async fn main() {
println!("Logs and metrics are provided to the Grafana dashboard"); println!("Logs and metrics are provided to the Grafana dashboard");
let writer = std::fs::OpenOptions::new() // Start TRACE / LOGGING / METRICS
.append(true) load_logging(&CONFIG); // this seems to be working ok
.create(true) global::set_tracer_provider(load_tracing(&CONFIG));
.open("./docker/logs/tracing.log") global::set_meter_provider(load_metrics(&CONFIG));
.expect("Couldn't make log file!");
let filter = EnvFilter::builder() BATCH_SIZE.add(CONFIG.batch_size as u64, &[]);
.with_default_directive(LevelFilter::DEBUG.into())
.from_env_lossy();
let registry = Registry::default().with(
fmt::layer()
.with_line_number(true)
.with_thread_ids(true)
.with_file(true)
.json()
.with_writer(writer)
.with_filter(filter)
);
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
let builder = PrometheusBuilder::new();
builder
.with_http_listener(std::net::SocketAddr::new(
IpAddr::V4(Ipv4Addr::from_octets([0, 0, 0, 0])),
2500,
))
.install()
.expect("failed to install recorder/exporter");
info!("Starting...");
// When getting uncrawled pages, name must contain this variable. "" will effectively get ignored. // When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
// let crawl_filter = "en.wikipedia.org/"; // let crawl_filter = "en.wikipedia.org/";
// let budget = 50; // let budget = 50;
let mut crawled = 0; let crawled = Arc::new(RwLock::new(0));
let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml"); let starting_url = &CONFIG.start_url;
let mut buf = String::new();
let _ = file.read_to_string(&mut buf);
let config: Config = toml::from_str(&buf).expect("Failed to parse Crawler.toml"); let db = connect(&CONFIG)
let starting_url = &config.start_url;
let db = connect(&config)
.await .await
.expect("Failed to connect to surreal, aborting."); .expect("Failed to connect to surreal, aborting.");
@@ -107,101 +120,208 @@ async fn main() {
// Kick off the whole machine - This Website object doesn't matter, it's just to allow for // Kick off the whole machine - This Website object doesn't matter, it's just to allow for
// get() to work. // get() to work.
let span = trace_span!("Pre-Loop"); // let mut span = TRACER.start("Pre-Loop");
let pre_loop_span = span.enter();
// Download the site
let site = Website::new(starting_url, false); let site = Website::new(starting_url, false);
process(site, db.clone(), reqwest.clone()).await; process(site, db.clone(), reqwest.clone()).await;
// span.end();
drop(pre_loop_span); // let mut main_loop_span= TRACER.start("Main-Loop");
let span = trace_span!("Loop");
let span = span.enter();
while crawled < config.budget {
let uncrawled =
get_uncrawled_links(&db, config.budget - crawled, config.crawl_filter.clone(), &config).await;
if uncrawled.is_empty() {
info!("Had more budget but finished crawling everything.");
return;
}
{
let mut futures = JoinSet::new(); let mut futures = JoinSet::new();
for site in uncrawled { for _ in 0..CONFIG.batch_size {
gauge!(BEING_PROCESSED).increment(1); futures.spawn(process_single_thread(
futures.spawn(process(site, db.clone(), reqwest.clone())); &CONFIG,
// let percent = format!("{:.2}%", (crawled as f32 / budget as f32) * 100f32); db.clone(),
// info!("Crawled {crawled} out of {budget} pages. ({percent})"); reqwest.clone(),
crawled.clone(),
));
} }
let c = counter!(SITES_CRAWLED); while let Some(_) = futures.join_next().await {
// As futures complete runs code in while block // Budget - Threads - This thread (1)
while futures.join_next().await.is_some() { // Would roughly be the acceptable amount at which a thread should exit
c.increment(1); if *(crawled.read().await) < CONFIG.budget - CONFIG.batch_size - 1 {
gauge!(BEING_PROCESSED).decrement(1); warn!("Thread terminated early, restarting");
crawled += 1; futures.spawn(process_single_thread(
&CONFIG,
db.clone(),
reqwest.clone(),
crawled.clone(),
));
} }
} }
}
drop(span);
if let Ok(mut ok) = db futures.join_all().await;
.query("count(select id from website where crawled = true)") // main_loop_span.end();
.await
{
let res = ok.take::<Option<usize>>(0);
if let Ok(Some(n)) = res {
info!("Total crawled pages now equals {n}");
}
}
info!("Done"); info!("Done");
} }
async fn process_single_thread(
config: &Config,
db: Surreal<Client>,
reqwest: reqwest::Client,
crawled: Arc<RwLock<usize>>,
) {
while *(crawled.read().await) < config.budget {
let uncrawled = get_next(&db.clone(), &config).await;
match uncrawled {
Some(site) => {
process(site, db.clone(), reqwest.clone()).await;
SITES_CRAWLED.add(1, &[]);
// Somehow this write doesn't hang on the while's read?
let mut c = crawled.write().await;
*c += 1;
}
None => {
warn!("fn::get_next() returned None");
return;
}
}
}
}
#[instrument(skip(db, reqwest))] #[instrument(skip(db, reqwest))]
/// Downloads and crawls and stores a webpage. /// Downloads and crawls and stores a webpage.
/// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver /// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver
async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client) { async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client) {
// METRICS // METRICS
trace!("Process: {}", &site.site); debug!(url = &site.site.as_str(), "Process: {}", &site.site);
BEING_PROCESSED.add(1, &[]);
// let mut process_span = TRACER.start("Process");
// Build the request // Build the request
let request_builder = reqwest.get(site.site.to_string()); let request_builder = reqwest.get(site.site.to_string());
// METRICS
let g = gauge!(GET_IN_FLIGHT);
g.increment(1);
// Send the http request (get) // Send the http request (get)
GET_IN_FLIGHT.add(1, &[]);
if let Ok(response) = request_builder.send().await { if let Ok(response) = request_builder.send().await {
// Get body from response let mut skip_download = false;
let path = filesystem::as_path(&site.site); GET_IN_FLIGHT.add(-1, &[]);
let headers = response.headers();
let code = response.status();
if code != 200 {
warn!("{code} for {}", site.site.as_str());
}
#[allow(non_snake_case)]
let CT = headers.get("Content-Type");
let ct = headers.get("content-type");
let ct = match (CT, ct) {
(None, None) => {
warn!(
"Server did not respond with Content-Type header. Url: {} Headers: ({:?})",
site.site.to_string(),
headers
);
return;
}
(None, Some(a)) => a,
(Some(a), None) => a,
(Some(a), Some(_)) => a,
};
// create filepath (handles / -> /index.html)
let real_path = filesystem::as_path(&site.site, ct);
let mut tmp_path = real_path.clone();
if !(tmp_path.add_extension("crawl_temp")) {
warn!("Failed to add extension to file");
// fallback ig
tmp_path = tmp_path.with_extension("crawl_temp");
}
// CODE FOR UPDATING DOWNLOADED CONTENT:
// Check the Content-Length header (we assume the server is telling the truth) (I don't see
// a reason for it to lie in this case).
// And see if the file on the disk is the same length.
// Yes, technically this isn't perfect, but the other option is storing ETags, which I
// don't want to do right now.
if let Some(len) = headers.get("Content-Length") {
if let Ok(s) = len.to_str() {
// length is in bytes
if let Ok(len) = s.parse::<u64>() {
if let Some(disk_len) = filesystem::check_file_length(&real_path).await {
if disk_len == len {
skip_download = true;
}
} else {
// File not found (or other error).
// Program will continue on it's way, downloading content.
}
}
}
}
// make sure that the file is good to go // make sure that the file is good to go
if let Some(mut file) = filesystem::init(&path).await { if let Some(file) = filesystem::init(&tmp_path).await {
let should_parse = path.to_string_lossy().ends_with(".html"); // Get body from response
let mut buf: Vec<u8> = Vec::new();
// stream the response onto the disk // stream the response onto the disk
let mut stream = response.bytes_stream(); let mut stream = response.bytes_stream();
let should_parse = real_path.to_string_lossy().ends_with(".html");
let mut buf: Vec<u8> = Vec::new();
if skip_download && should_parse {
// since we are skipping the download we will just read the file off the disk to
// parse it
if let Ok(mut file) = tokio::fs::OpenOptions::new()
.read(true)
.open(&real_path).await
{
if let Err(err) = file.read_to_end(&mut buf).await {
warn!("Failed to read file off disk for parsing, {}", err);
}
}
}
// !!!DOWNLOADING TIME!!!
if !skip_download {
let mut writer = BufWriter::new(file);
// Write file to disk
trace!("Writing at: {:?}", tmp_path);
BEING_STREAMED.add(1, &[]);
// let mut stream_span = TRACER.start("Stream");
while let Some(data) = stream.next().await { while let Some(data) = stream.next().await {
match data { match data {
Ok(data) => { Ok(data) => {
debug!("Writing at: {:?}", path); TOTAL_BYTES_DOWN.add(data.len() as u64, &[]);
let _ = file.write_all(&data).await; let _ = writer.write_all(&data).await;
// If we are going to parse this file later, we will save it // If we are going to parse this file later, we will save it
// into memory as well as the disk. // into memory as well as the disk.
// We do this because the data here might be incomplete
if should_parse { if should_parse {
data.iter().for_each(|f| buf.push(*f)); data.iter().for_each(|f| buf.push(*f));
} }
},
Err(err) => {
eprintln!("{}", err)
},
} }
Err(err) => {
error!("{err}")
}
}
}
let _ = writer.flush().await;
// rename the temp file into the real file name
if let Err(err) = tokio::fs::rename(&tmp_path, &real_path).await {
error!(
from = &*tmp_path.to_string_lossy(),
to = &*real_path.to_string_lossy(),
"Error renaming file: {}",
err
);
} }
// stream_span.end();
BEING_STREAMED.add(-1, &[]);
}
// (If needed) Parse the file
if should_parse { if should_parse {
BEING_PARSED.add(1, &[]);
// let mut parsing_span = TRACER.start("Parsing");
// Parse document and get relationships // Parse document and get relationships
let sites = parser::parse(&site, &buf).await; let sites = parser::parse(&site, &buf).await;
// De-duplicate this list // De-duplicate this list
@@ -215,44 +335,81 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
trace!("Saved {diff} from being entered into the db by de-duping"); trace!("Saved {diff} from being entered into the db by de-duping");
// Store all the other sites so that we can link to them. // Store all the other sites so that we can link to them.
let _ = Website::store_all(de_dupe_sites, &db).await; let _ = Website::store_all(de_dupe_sites, &db).await;
}
// METRICS // parsing_span.end();
g.decrement(1); BEING_PARSED.add(-1, &[]);
counter!(GET_METRIC).increment(1); } else {
trace!(url = site.site.as_str(), "Parse = False");
}
// update self in db // update self in db
site.set_crawled(); site.crawled = true;
Website::store_all(vec![site], &db).await; site.status_code = code.as_u16();
} else { Website::store_all(vec![site.clone()], &db).await;
error!("File failed to cooperate: {:?}", path);
} }
} else { } else {
error!("Failed to get: {}", &site.site); error!(url = site.site.as_str(), "Failed to get: {}", &site.site);
}
} }
/// Returns uncrawled links // process_span.end();
#[instrument(skip(db, config))] BEING_PROCESSED.add(-1, &[]);
async fn get_uncrawled_links(
db: &Surreal<Client>,
mut count: usize,
filter: String,
config: &Config,
) -> Vec<Website> {
if count > config.batch_size {
count = config.batch_size;
} }
debug!("Getting uncrawled links"); fn load_tracing(config: &Config) -> SdkTracerProvider {
// Send spans to Alloy (which will send them to Tempo)
let mut response = db let otlp_span = opentelemetry_otlp::SpanExporter::builder()
.query("SELECT * FROM website WHERE crawled = false AND site ~ type::string($format) LIMIT $count;") .with_tonic()
.bind(("format", filter)) .with_endpoint(config.tracing_endpoint.clone())
.bind(("count", count)) .build()
.await .unwrap();
.expect("Hard-coded query failed..?"); let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
response .with_simple_exporter(otlp_span)
.take(0) .build();
.expect("Returned websites couldn't be parsed") tracer_provider
}
fn load_logging(config: &Config) {
// let otlp_log = opentelemetry_otlp::LogExporter::builder()
// .with_tonic()
// .with_endpoint(endpoint)
// .build()
// .unwrap();
// let logger_provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder()
// .with_simple_exporter(otlp_log)
// .build();
let writer = std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(config.log_file.clone())
.expect("Couldn't make log file!");
let filter = EnvFilter::builder()
.with_default_directive(LevelFilter::DEBUG.into())
.from_env_lossy();
let registry = Registry::default().with(
fmt::layer()
.with_line_number(true)
.with_thread_ids(true)
.with_file(true)
.json()
.with_writer(writer)
.with_filter(filter),
);
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
}
fn load_metrics(config: &Config) -> SdkMeterProvider {
// Send metrics to Prometheus
let otlp_metrics = opentelemetry_otlp::MetricExporter::builder()
.with_http()
.with_protocol(Protocol::HttpBinary)
.with_endpoint(config.metrics_endpoint.clone())
.build()
.unwrap();
let metrics_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
.with_periodic_exporter(otlp_metrics) // default delay is 60s, turn down to like 15
.build();
metrics_provider
} }

View File

@@ -4,7 +4,7 @@ use html5ever::tokenizer::{BufferQueue, TokenizerResult};
use html5ever::tokenizer::{StartTag, TagToken}; use html5ever::tokenizer::{StartTag, TagToken};
use html5ever::tokenizer::{Token, TokenSink, TokenSinkResult, Tokenizer, TokenizerOpts}; use html5ever::tokenizer::{Token, TokenSink, TokenSinkResult, Tokenizer, TokenizerOpts};
use html5ever::{local_name, tendril::*}; use html5ever::{local_name, tendril::*};
use tracing::{debug, error, instrument, trace, warn}; use tracing::{error, instrument, trace, warn};
use url::Url; use url::Url;
use crate::db::Website; use crate::db::Website;
@@ -12,7 +12,6 @@ use crate::db::Website;
impl TokenSink for Website { impl TokenSink for Website {
type Handle = Vec<Website>; type Handle = Vec<Website>;
#[instrument(skip(token, _line_number))]
fn process_token(&self, token: Token, _line_number: u64) -> TokenSinkResult<Self::Handle> { fn process_token(&self, token: Token, _line_number: u64) -> TokenSinkResult<Self::Handle> {
match token { match token {
TagToken(tag) => { TagToken(tag) => {
@@ -33,13 +32,13 @@ impl TokenSink for Website {
let attr_name = attr.name.local.to_string(); let attr_name = attr.name.local.to_string();
if attr_name == "src" || attr_name == "href" || attr_name == "data" if attr_name == "src" || attr_name == "href" || attr_name == "data"
{ {
trace!("Found `{}` in html `{}` tag", &attr.value, tag.name); trace!(url = self.site.as_str(),"Found `{}` in html `{}` tag", &attr.value, tag.name);
let url = try_get_url(&self.site, &attr.value); let url = try_get_url(&self.site, &attr.value);
if let Some(mut parsed) = url { if let Some(mut parsed) = url {
parsed.set_query(None); parsed.set_query(None);
parsed.set_fragment(None); parsed.set_fragment(None);
debug!("Final cleaned URL: `{}`", parsed.to_string()); trace!(url = self.site.as_str(), "Final cleaned URL: `{}`", parsed.to_string());
let web = Website::new(&parsed.to_string(), false); let web = Website::new(&parsed.to_string(), false);
links.push(web); links.push(web);
} }
@@ -60,10 +59,10 @@ impl TokenSink for Website {
} }
} }
#[instrument(skip_all)] #[instrument(skip(data))]
/// Parses the passed site and returns all the sites it links to. /// Parses the passed site and returns all the sites it links to.
pub async fn parse(site: &Website, data: &[u8]) -> Vec<Website> { pub async fn parse(site: &Website, data: &[u8]) -> Vec<Website> {
debug!("Parsing {}", site.site.to_string()); trace!(url = site.site.as_str(), "Parsing {}", site.site.to_string());
// prep work // prep work
let mut other_sites: Vec<Website> = Vec::new(); let mut other_sites: Vec<Website> = Vec::new();
@@ -88,7 +87,7 @@ pub async fn parse(site: &Website, data: &[u8]) -> Vec<Website> {
assert!(token_buffer.is_empty()); assert!(token_buffer.is_empty());
tokenizer.end(); tokenizer.end();
} else { } else {
warn!("Tendril failed to parse on: {}", site.site.to_string()); warn!(url = site.site.as_str(), "Tendril failed to parse on: {}", site.site.to_string());
} }
other_sites other_sites
@@ -100,7 +99,7 @@ fn try_get_url(parent: &Url, link: &str) -> Option<Url> {
Ok(ok) => Some(ok), Ok(ok) => Some(ok),
Err(e) => { Err(e) => {
if link.starts_with('#') { if link.starts_with('#') {
trace!("Rejecting # url"); trace!(url = parent.as_str(), "Rejecting # url");
None None
} else if link.starts_with("//") { } else if link.starts_with("//") {
// if a url starts with "//" is assumed that it will adopt // if a url starts with "//" is assumed that it will adopt
@@ -108,7 +107,7 @@ fn try_get_url(parent: &Url, link: &str) -> Option<Url> {
// https://stackoverflow.com/questions/9646407/two-forward-slashes-in-a-url-src-href-attribute // https://stackoverflow.com/questions/9646407/two-forward-slashes-in-a-url-src-href-attribute
let scheme = parent.scheme(); let scheme = parent.scheme();
match Url::parse(&format!("{scheme}://{}", link)) { match Url::parse(&format!("{scheme}://{link}")) {
Ok(url) => Some(url), Ok(url) => Some(url),
Err(err) => { Err(err) => {
error!("Failed parsing relative scheme url: {}", err); error!("Failed parsing relative scheme url: {}", err);
@@ -116,7 +115,7 @@ fn try_get_url(parent: &Url, link: &str) -> Option<Url> {
} }
} }
} else { } else {
// # This is some sort of realative url, gonna try patching it up into an absolute // # This is some sort of relative url, gonna try patching it up into an absolute
// url // url
match e { match e {
url::ParseError::RelativeUrlWithoutBase => { url::ParseError::RelativeUrlWithoutBase => {
@@ -127,16 +126,15 @@ fn try_get_url(parent: &Url, link: &str) -> Option<Url> {
} }
let url = origin.clone() + link; let url = origin.clone() + link;
trace!("Built `{url}` from `{origin} + `{}`", link.to_string());
if let Ok(url) = Url::parse(&url) { if let Ok(url) = Url::parse(&url) {
trace!("Saved relative url `{}` AS: `{}`", link, url); trace!(url = parent.as_str(), "Built `{url}` from `{origin} + `{}`", link.to_string());
Some(url) Some(url)
} else { } else {
error!( error!(
"Failed to reconstruct a url from relative url: `{}` on site: `{}`", "Failed to reconstruct a url from relative url: `{}` on site: `{}`. Failed url was: {}",
link, link,
parent.to_string() parent.to_string(),
url
); );
None None
} }

View File

@@ -4,6 +4,15 @@ DEFINE FIELD IF NOT EXISTS site ON TABLE website TYPE string;
DEFINE INDEX IF NOT EXISTS idx ON TABLE website COLUMNS site UNIQUE; DEFINE INDEX IF NOT EXISTS idx ON TABLE website COLUMNS site UNIQUE;
DEFINE FIELD IF NOT EXISTS crawled ON TABLE website TYPE bool; DEFINE FIELD IF NOT EXISTS crawled ON TABLE website TYPE bool;
DEFINE FIELD IF NOT EXISTS processing ON TABLE website TYPE bool DEFAULT false;
DEFINE FIELD IF NOT EXISTS accessed_at ON TABLE website VALUE time::now(); DEFINE FIELD IF NOT EXISTS accessed_at ON TABLE website VALUE time::now();
DEFINE FIELD IF NOT EXISTS first_accessed_at ON TABLE website VALUE time::now(); DEFINE FIELD IF NOT EXISTS first_accessed_at ON TABLE website VALUE time::now();
DEFINE FUNCTION OVERWRITE fn::get_next($filter: string) {
LET $site = SELECT * FROM ONLY website WHERE crawled = false AND processing = false AND site ~ type::string($filter) LIMIT 1;
UPDATE $site.id SET processing = true;
RETURN $site
};
UPDATE website SET processing = false WHERE processing = true;

File diff suppressed because it is too large Load Diff