From 9662b68b0db11b8387401f8d0866a218508081cc Mon Sep 17 00:00:00 2001 From: Oliver Date: Thu, 10 Jul 2025 23:44:23 -0600 Subject: [PATCH] traces and new metrics work --- .gitignore | 3 +- Cargo.lock | 419 ++++++++++++++++------------------------- Cargo.toml | 5 +- docker/alloy.conf | 28 ++- docker/compose.yml | 37 +++- docker/grafana.yaml | 17 ++ docker/prometheus.yaml | 22 +-- docker/tempo.yaml | 48 +++++ src/main.rs | 159 ++++++++++------ src/parser.rs | 2 +- 10 files changed, 408 insertions(+), 332 deletions(-) create mode 100644 docker/tempo.yaml diff --git a/.gitignore b/.gitignore index 87fb73a..1bce188 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ perf.data flamegraph.svg perf.data.old /docker/logs/* -/downloaded \ No newline at end of file +/downloaded +/Crawler.toml diff --git a/Cargo.lock b/Cargo.lock index a60854e..b005446 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -103,62 +103,18 @@ dependencies = [ "libc", ] -[[package]] -name = "anstream" -version = "0.6.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b" -dependencies = [ - "anstyle", - "anstyle-parse", - "anstyle-query", - "anstyle-wincon", - "colorchoice", - "is_terminal_polyfill", - "utf8parse", -] - -[[package]] -name = "anstyle" -version = "1.0.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" - -[[package]] -name = "anstyle-parse" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9" -dependencies = [ - "utf8parse", -] - -[[package]] -name = "anstyle-query" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c" -dependencies = [ - "windows-sys 0.59.0", -] - -[[package]] -name = "anstyle-wincon" -version = "3.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e" -dependencies = [ - "anstyle", - "once_cell", - "windows-sys 0.59.0", -] - [[package]] name = "any_ascii" version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea50b14b7a4b9343f8c627a7a53c52076482bd4bdad0a24fd3ec533ed616cc2c" +[[package]] +name = "anyhow" +version = "1.0.98" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" + [[package]] name = "approx" version = "0.4.0" @@ -325,17 +281,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "async-recursion" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.100", -] - [[package]] name = "async-stream" version = "0.3.6" @@ -815,12 +760,6 @@ dependencies = [ "cc", ] -[[package]] -name = "colorchoice" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" - [[package]] name = "concurrent-queue" version = "2.5.0" @@ -871,21 +810,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc" -version = "3.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" -dependencies = [ - "crc-catalog", -] - -[[package]] -name = "crc-catalog" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" - [[package]] name = "crc32fast" version = "1.4.2" @@ -984,20 +908,6 @@ dependencies = [ "parking_lot_core", ] -[[package]] -name = "dashmap" -version = "6.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" -dependencies = [ - "cfg-if", - "crossbeam-utils", - "hashbrown 0.14.5", - "lock_api", - "once_cell", - "parking_lot_core", -] - [[package]] name = "data-encoding" version = "2.8.0" @@ -1014,17 +924,6 @@ dependencies = [ "serde", ] -[[package]] -name = "derivative" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "deunicode" version = "1.6.1" @@ -1135,29 +1034,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" -[[package]] -name = "env_filter" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0" -dependencies = [ - "log", - "regex", -] - -[[package]] -name = "env_logger" -version = "0.11.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3716d7a920fb4fac5d84e9d4bce8ceb321e9414b4409da61b07b75c1e3d0697" -dependencies = [ - "anstream", - "anstyle", - "env_filter", - "jiff", - "log", -] - [[package]] name = "equivalent" version = "1.0.2" @@ -1727,6 +1603,19 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -1970,7 +1859,9 @@ dependencies = [ "html5ever 0.29.1", "metrics", "metrics-exporter-prometheus", - "minio", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "reqwest", "serde", "surrealdb", @@ -1987,12 +1878,6 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" -[[package]] -name = "is_terminal_polyfill" -version = "1.70.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" - [[package]] name = "itertools" version = "0.10.5" @@ -2026,30 +1911,6 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" -[[package]] -name = "jiff" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d699bc6dfc879fb1bf9bdff0d4c56f0884fc6f0d0eb0fba397a6d00cd9a6b85e" -dependencies = [ - "jiff-static", - "log", - "portable-atomic", - "portable-atomic-util", - "serde", -] - -[[package]] -name = "jiff-static" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d16e75759ee0aa64c57a56acbf43916987b20c77373cb7e808979e02b93c9f9" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.100", -] - [[package]] name = "jobserver" version = "0.1.32" @@ -2294,12 +2155,6 @@ dependencies = [ "digest", ] -[[package]] -name = "md5" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" - [[package]] name = "memchr" version = "2.7.4" @@ -2398,46 +2253,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" -[[package]] -name = "minio" -version = "0.2.0-alpha" -source = "git+https://github.com/minio/minio-rs.git?rev=c28f576#c28f576cb8f8cf47fb941bb9db62b2cbd6f080c1" -dependencies = [ - "async-recursion", - "async-trait", - "base64 0.22.1", - "byteorder", - "bytes", - "chrono", - "crc", - "dashmap 6.1.0", - "derivative", - "env_logger", - "futures-util", - "hex", - "hmac", - "home", - "http", - "hyper", - "lazy_static", - "log", - "md5", - "multimap", - "os_info", - "percent-encoding", - "rand 0.8.5", - "regex", - "reqwest", - "serde", - "serde_json", - "sha2", - "tokio", - "tokio-stream", - "tokio-util", - "urlencoding", - "xmltree", -] - [[package]] name = "miniz_oxide" version = "0.8.5" @@ -2475,15 +2290,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "multimap" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" -dependencies = [ - "serde", -] - [[package]] name = "nanoid" version = "0.4.0" @@ -2736,14 +2542,77 @@ dependencies = [ ] [[package]] -name = "os_info" -version = "3.10.0" +name = "opentelemetry" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a604e53c24761286860eba4e2c8b23a0161526476b1de520139d69cdb85a6b5" +checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" dependencies = [ - "log", - "serde", - "windows-sys 0.52.0", + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.12", + "tracing", +] + +[[package]] +name = "opentelemetry-http" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" +dependencies = [ + "async-trait", + "bytes", + "http", + "opentelemetry", + "reqwest", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" +dependencies = [ + "http", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "reqwest", + "thiserror 2.0.12", + "tokio", + "tonic", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry", + "percent-encoding", + "rand 0.9.0", + "serde_json", + "thiserror 2.0.12", ] [[package]] @@ -2923,6 +2792,26 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5be167a7af36ee22fe3115051bc51f6e6c7054c9348e28deb4f49bd6f705a315" +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -2947,15 +2836,6 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" -[[package]] -name = "portable-atomic-util" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" -dependencies = [ - "portable-atomic", -] - [[package]] name = "powerfmt" version = "0.2.0" @@ -3005,6 +2885,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "psl-types" version = "2.0.11" @@ -3366,6 +3269,7 @@ dependencies = [ "base64 0.22.1", "bytes", "encoding_rs", + "futures-channel", "futures-core", "futures-util", "h2", @@ -4196,7 +4100,7 @@ dependencies = [ "cedar-policy", "chrono", "ciborium", - "dashmap 5.5.3", + "dashmap", "deunicode", "dmp", "fst", @@ -4619,6 +4523,32 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.2" @@ -4627,11 +4557,15 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", + "indexmap 2.8.0", "pin-project-lite", + "slab", "sync_wrapper", "tokio", + "tokio-util", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -4873,12 +4807,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" -[[package]] -name = "utf8parse" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" - [[package]] name = "uuid" version = "1.16.0" @@ -5419,21 +5347,6 @@ dependencies = [ "tap", ] -[[package]] -name = "xml-rs" -version = "0.8.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5b940ebc25896e71dd073bad2dbaa2abfe97b0a391415e22ad1326d9c54e3c4" - -[[package]] -name = "xmltree" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b619f8c85654798007fb10afa5125590b43b088c225a25fc2fec100a9fad0fc6" -dependencies = [ - "xml-rs", -] - [[package]] name = "yoke" version = "0.7.5" diff --git a/Cargo.toml b/Cargo.toml index a2ba16f..f1e599b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,8 +9,9 @@ futures-util = "0.3.31" html5ever = "0.29" metrics = "0.24.1" metrics-exporter-prometheus = { version = "0.16.2", features=["http-listener"]} -# minio = "0.1.0" -minio = {git="https://github.com/minio/minio-rs.git", rev = "c28f576"} +opentelemetry = "0.30.0" +opentelemetry-otlp = { version = "0.30.0", features = ["metrics", "trace", "logs", "grpc-tonic"] } +opentelemetry_sdk = "0.30.0" reqwest = { version = "0.12", features = ["gzip", "default", "rustls-tls", "stream"] } serde = { version = "1.0", features = ["derive"] } surrealdb = "2.2" diff --git a/docker/alloy.conf b/docker/alloy.conf index 35babb3..c2d40e8 100644 --- a/docker/alloy.conf +++ b/docker/alloy.conf @@ -3,12 +3,34 @@ local.file_match "tmplogs" { } loki.source.file "local_files" { - targets = local.file_match.tmplogs.targets - forward_to = [loki.write.local_loki.receiver] + targets = local.file_match.tmplogs.targets + forward_to = [loki.write.local_loki.receiver] } loki.write "local_loki" { endpoint { url = "http://loki:3100/loki/api/v1/push" } -} \ No newline at end of file +} + +otelcol.receiver.otlp "otlp_receiver" { + grpc { + endpoint = "0.0.0.0:4317" + } + http { + endpoint = "0.0.0.0:4318" + } + + output { + traces = [otelcol.exporter.otlp.tempo.input,] + } +} + +otelcol.exporter.otlp "tempo" { + client { + endpoint = "tempo:4317" + tls { + insecure = true + } + } +} diff --git a/docker/compose.yml b/docker/compose.yml index efe16f2..e786a88 100644 --- a/docker/compose.yml +++ b/docker/compose.yml @@ -1,4 +1,6 @@ services: + + # Database surreal: image: surrealdb/surrealdb:latest-dev ports: @@ -15,6 +17,18 @@ services: - root - rocksdb:/mydata/database.db + # Tracing + tempo: + image: grafana/tempo:latest + command: [ "-config.file=/etc/tempo.yaml" ] + volumes: + - ./tempo.yaml:/etc/tempo.yaml + - tempo_storage:/var/tempo + ports: + - 3200:3200 # self metrics for prometheus + - 4317:4317 # otlp grpc - (alloy) + + # Log scraper alloy: image: grafana/alloy:latest ports: @@ -24,9 +38,13 @@ services: - ./logs/:/tmp/alloy-logs - ./alloy.conf:/etc/alloy/config.alloy - alloy_storage:/var/lib/alloy - command: run --server.http.listen-addr=0.0.0.0:12345 --storage.path=/var/lib/alloy/data /etc/alloy/config.alloy + command: + - run + - --server.http.listen-addr=0.0.0.0:12345 + - --storage.path=/var/lib/alloy/data + - /etc/alloy/config.alloy - #logs + # Log storage / analysis loki: image: grafana/loki:latest ports: @@ -35,16 +53,20 @@ services: volumes: - ./loki.yaml:/etc/loki/local-config.yaml - # Metrics collector + # Metrics prometheus: image: prom/prometheus:latest - expose: - - 9090 + ports: + - 9090:9090 volumes: - ./prometheus.yaml:/etc/prometheus/prometheus.yml # persist data - - prometheus_storage:/prometheus - command: --web.enable-lifecycle --config.file=/etc/prometheus/prometheus.yml + # - prometheus_storage:/prometheus + command: + - --enable-feature=native-histograms + - --web.enable-remote-write-receiver + - --web.enable-lifecycle + - --config.file=/etc/prometheus/prometheus.yml # Everything viewer grafana: @@ -66,3 +88,4 @@ volumes: grafana_storage: alloy_storage: surrealdb_storage: + tempo_storage: diff --git a/docker/grafana.yaml b/docker/grafana.yaml index ac68607..91c70cb 100644 --- a/docker/grafana.yaml +++ b/docker/grafana.yaml @@ -22,3 +22,20 @@ datasources: editable: false jsonData: httpMethod: GET +- name: Tempo + type: tempo + access: proxy + orgId: 1 + url: http://tempo:3200 + basicAuth: false + isDefault: false + version: 1 + editable: true + apiVersion: 1 + uid: tempo + jsonData: + httpMethod: GET + serviceMap: + datasourceUid: prometheus + streamingEnabled: + search: true diff --git a/docker/prometheus.yaml b/docker/prometheus.yaml index ffc1e24..968696a 100644 --- a/docker/prometheus.yaml +++ b/docker/prometheus.yaml @@ -1,17 +1,15 @@ global: - scrape_interval: 5s + scrape_interval: 60s query_log_file: /etc/prometheus/query.log scrape_configs: - - job_name: crawler + # Crawler configs get pushed with OTLP +# - job_name: 'loki' +# static_configs: +# - targets: ['loki:3100'] +# - job_name: 'prometheus' +# static_configs: +# - targets: ['localhost:9090'] + - job_name: 'tempo' static_configs: - # change this your machine's ip, localhost won't work - # because localhost refers to the docker container. - - targets: ['172.20.239.48:2500'] - #- targets: ['192.168.8.209:2500'] - - job_name: loki - static_configs: - - targets: ['loki:3100'] - - job_name: prometheus - static_configs: - - targets: ['localhost:9090'] + - targets: ['tempo:3200'] diff --git a/docker/tempo.yaml b/docker/tempo.yaml new file mode 100644 index 0000000..b09caba --- /dev/null +++ b/docker/tempo.yaml @@ -0,0 +1,48 @@ +stream_over_http_enabled: true +server: + http_listen_port: 3200 + log_level: info + +query_frontend: + search: + duration_slo: 5s + throughput_bytes_slo: 1.073741824e+09 + metadata_slo: + duration_slo: 5s + throughput_bytes_slo: 1.073741824e+09 + trace_by_id: + duration_slo: 5s + +distributor: + receivers: + otlp: + protocols: + grpc: + endpoint: "tempo:4317" + +metrics_generator: + registry: + external_labels: + source: tempo + cluster: docker-compose + storage: + path: /var/tempo/generator/wal + remote_write: + - url: http://prometheus:9090/api/v1/write + send_exemplars: true + traces_storage: + path: /var/tempo/generator/traces + +storage: + trace: + backend: local # backend configuration to use + wal: + path: /var/tempo/wal # where to store the wal locally + local: + path: /var/tempo/blocks + +overrides: + defaults: + metrics_generator: + processors: [service-graphs, span-metrics, local-blocks] # enables metrics generator + generate_native_histograms: both diff --git a/src/main.rs b/src/main.rs index 289f351..d5e5d50 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,32 +4,59 @@ extern crate html5ever; -use futures_util::StreamExt; - use std::{ - collections::HashSet, - fs::File, - io::Read, - net::{IpAddr, Ipv4Addr}, + collections::HashSet, fs::File, io::Read, net::{IpAddr, Ipv4Addr}, sync::LazyLock, time::Instant }; +use futures_util::StreamExt; +use opentelemetry::{global::{self, BoxedTracer}, metrics::{Counter, Meter, UpDownCounter}, trace::{Span, Tracer}}; +use opentelemetry_otlp::{Protocol, WithExportConfig}; use db::{connect, Website}; -use metrics::{counter, gauge}; -use metrics_exporter_prometheus::PrometheusBuilder; use serde::Deserialize; use surrealdb::{engine::remote::ws::Client, Surreal}; use tokio::{io::{AsyncWriteExt, BufWriter}, task::JoinSet}; -use tracing::{debug, debug_span, error, info, instrument, level_filters::LevelFilter, trace, trace_span, warn}; +use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, warn}; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry}; mod db; mod filesystem; mod parser; -const GET_METRIC: &str = "total_gets"; -const GET_IN_FLIGHT: &str = "gets_in_flight"; -const SITES_CRAWLED: &str = "pages_crawled"; -const BEING_PROCESSED: &str = "pages_being_processed"; +// TODO prefix all of these with "crawler" or something + +static METER: LazyLock = LazyLock::new(|| global::meter("Internet_Mapper")); +static BEING_PROCESSED: LazyLock> = LazyLock::new(|| + METER + .i64_up_down_counter("pages_being_processed") + .build() +); +static BEING_PARSED: LazyLock> = LazyLock::new(|| + METER + .i64_up_down_counter("pages_being_parsed") + .build() +); +static BEING_STREAMED: LazyLock> = LazyLock::new(|| + METER + .i64_up_down_counter("pages_being_streamed") + .build() +); +static GET_IN_FLIGHT: LazyLock> = LazyLock::new(|| + METER + .i64_up_down_counter("gets_in_flight") + .build() +); +static TOTAL_BYTES_DOWN: LazyLock> = LazyLock::new(|| + METER + .u64_counter("total_bytes_down") + .build() +); +static SITES_CRAWLED: LazyLock> = LazyLock::new(|| + METER + .u64_counter("total_sites_crawled") + .build() +); + +static TRACER: LazyLock = LazyLock::new(|| global::tracer("Internet_Mapper")); #[derive(Deserialize)] struct Config { @@ -49,6 +76,43 @@ struct Config { async fn main() { println!("Logs and metrics are provided to the Grafana dashboard"); + // Start TRACE / LOGGING / METRICS +// let otlp_log = opentelemetry_otlp::LogExporter::builder() +// .with_tonic() +// .with_endpoint(endpoint) +// .build() +// .unwrap(); + // Send metrics to Prometheus + let otlp_metrics = opentelemetry_otlp::MetricExporter::builder() + .with_http() + .with_protocol(Protocol::HttpBinary) + .with_endpoint("http://localhost:9090/api/v1/otlp/v1/metrics") + .build() + .unwrap(); + // Send spans to Alloy (which will send them to Tempo) + let otlp_span = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint("http://localhost:4317") + .build() + .unwrap(); + + let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() + .with_simple_exporter(otlp_span) + .build(); + // let logger_provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder() + // .with_simple_exporter(otlp_log) + // .build(); + let metrics_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder() + .with_periodic_exporter(otlp_metrics) // default delay is 60s, turn down to like 15 + .build(); + + global::set_tracer_provider(tracer_provider); + global::set_meter_provider(metrics_provider); + // How to set logger? + + // End TRACE + + // Start LOGGING let writer = std::fs::OpenOptions::new() .append(true) .create(true) @@ -70,15 +134,7 @@ async fn main() { ); tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber"); - - let builder = PrometheusBuilder::new(); - builder - .with_http_listener(std::net::SocketAddr::new( - IpAddr::V4(Ipv4Addr::from_octets([0, 0, 0, 0])), - 2500, - )) - .install() - .expect("failed to install recorder/exporter"); + // End LOGGING info!("Starting..."); @@ -106,16 +162,13 @@ async fn main() { // Kick off the whole machine - This Website object doesn't matter, it's just to allow for // get() to work. - let span = trace_span!("Pre-Loop"); - let pre_loop_span = span.enter(); - // Download the site + let mut span = TRACER.start("Pre-Loop"); let site = Website::new(starting_url, false); process(site, db.clone(), reqwest.clone()).await; + span.end(); + // Download the site - drop(pre_loop_span); - - let span = trace_span!("Loop"); - let span = span.enter(); + let mut main_loop_span= TRACER.start("Main-Loop"); while crawled < config.budget { let uncrawled = get_uncrawled_links(&db, config.budget - crawled, config.crawl_filter.clone(), &config).await; @@ -127,22 +180,17 @@ async fn main() { { let mut futures = JoinSet::new(); for site in uncrawled { - gauge!(BEING_PROCESSED).increment(1); futures.spawn(process(site, db.clone(), reqwest.clone())); - // let percent = format!("{:.2}%", (crawled as f32 / budget as f32) * 100f32); - // info!("Crawled {crawled} out of {budget} pages. ({percent})"); } - let c = counter!(SITES_CRAWLED); // As futures complete runs code in while block while futures.join_next().await.is_some() { - c.increment(1); - gauge!(BEING_PROCESSED).decrement(1); + SITES_CRAWLED.add(1, &[]); crawled += 1; } } } - drop(span); + main_loop_span.end(); if let Ok(mut ok) = db .query("count(select id from website where crawled = true)") @@ -161,17 +209,20 @@ async fn main() { /// Downloads and crawls and stores a webpage. /// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Client) { + // METRICS trace!("Process: {}", &site.site); + BEING_PROCESSED.add(1, &[]); + let mut process_span = TRACER.start("Process"); + // Build the request let request_builder = reqwest.get(site.site.to_string()); - - // METRICS - let g = gauge!(GET_IN_FLIGHT); - g.increment(1); - + // Send the http request (get) + GET_IN_FLIGHT.add(1, &[]); if let Ok(response) = request_builder.send().await { + GET_IN_FLIGHT.add(-1, &[]); + let headers = response.headers(); let code = response.status(); @@ -203,10 +254,13 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien let mut buf: Vec = Vec::new(); // Write file to disk - info!("Writing at: {:?}", path); + trace!("Writing at: {:?}", path); + BEING_STREAMED.add(1, &[]); + let mut stream_span = TRACER.start("Stream"); while let Some(data) = stream.next().await { match data { Ok(data) => { + TOTAL_BYTES_DOWN.add(data.len() as u64, &[]); let _ = writer.write_all(&data).await; // If we are going to parse this file later, we will save it // into memory as well as the disk. @@ -220,13 +274,14 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien }, } } + stream_span.end(); + BEING_STREAMED.add(-1, &[]); let _ = writer.flush(); - // (If needed) Parse the file if should_parse { - let span = debug_span!("Should Parse"); - let enter = span.enter(); + BEING_PARSED.add(1, &[]); + let mut parsing_span = TRACER.start("Parsing"); // Parse document and get relationships let sites = parser::parse(&site, &buf).await; @@ -242,13 +297,10 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien // Store all the other sites so that we can link to them. let _ = Website::store_all(de_dupe_sites, &db).await; - drop(enter); + parsing_span.end(); + BEING_PARSED.add(-1, &[]); } - // METRICS - g.decrement(1); - counter!(GET_METRIC).increment(1); - // update self in db site.crawled = true; site.status_code = code.as_u16(); @@ -256,11 +308,12 @@ async fn process(mut site: Website, db: Surreal, reqwest: reqwest::Clien } else { error!("File failed to cooperate: {:?}", path); } - - trace!("Done processing: {}", &site.site); } else { error!("Failed to get: {}", &site.site); } + + process_span.end(); + BEING_PROCESSED.add(-1, &[]); } /// Returns uncrawled links @@ -275,7 +328,7 @@ async fn get_uncrawled_links( count = config.batch_size; } - debug!("Getting uncrawled links"); + debug!("Getting {} uncrawled links from DB", count); let mut response = db .query("SELECT * FROM website WHERE crawled = false AND site ~ type::string($format) LIMIT $count;") diff --git a/src/parser.rs b/src/parser.rs index 908d21f..2d004b8 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -63,7 +63,7 @@ impl TokenSink for Website { #[instrument(skip_all)] /// Parses the passed site and returns all the sites it links to. pub async fn parse(site: &Website, data: &[u8]) -> Vec { - debug!("Parsing {}", site.site.to_string()); + trace!("Parsing {}", site.site.to_string()); // prep work let mut other_sites: Vec = Vec::new();