traces and new metrics work
This commit is contained in:
159
src/main.rs
159
src/main.rs
@@ -4,32 +4,59 @@
|
||||
|
||||
extern crate html5ever;
|
||||
|
||||
use futures_util::StreamExt;
|
||||
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
fs::File,
|
||||
io::Read,
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
collections::HashSet, fs::File, io::Read, net::{IpAddr, Ipv4Addr}, sync::LazyLock, time::Instant
|
||||
};
|
||||
|
||||
use futures_util::StreamExt;
|
||||
use opentelemetry::{global::{self, BoxedTracer}, metrics::{Counter, Meter, UpDownCounter}, trace::{Span, Tracer}};
|
||||
use opentelemetry_otlp::{Protocol, WithExportConfig};
|
||||
use db::{connect, Website};
|
||||
use metrics::{counter, gauge};
|
||||
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||
use serde::Deserialize;
|
||||
use surrealdb::{engine::remote::ws::Client, Surreal};
|
||||
use tokio::{io::{AsyncWriteExt, BufWriter}, task::JoinSet};
|
||||
use tracing::{debug, debug_span, error, info, instrument, level_filters::LevelFilter, trace, trace_span, warn};
|
||||
use tracing::{debug, error, info, instrument, level_filters::LevelFilter, trace, warn};
|
||||
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry};
|
||||
|
||||
mod db;
|
||||
mod filesystem;
|
||||
mod parser;
|
||||
|
||||
const GET_METRIC: &str = "total_gets";
|
||||
const GET_IN_FLIGHT: &str = "gets_in_flight";
|
||||
const SITES_CRAWLED: &str = "pages_crawled";
|
||||
const BEING_PROCESSED: &str = "pages_being_processed";
|
||||
// TODO prefix all of these with "crawler" or something
|
||||
|
||||
static METER: LazyLock<Meter> = LazyLock::new(|| global::meter("Internet_Mapper"));
|
||||
static BEING_PROCESSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(||
|
||||
METER
|
||||
.i64_up_down_counter("pages_being_processed")
|
||||
.build()
|
||||
);
|
||||
static BEING_PARSED: LazyLock<UpDownCounter<i64>> = LazyLock::new(||
|
||||
METER
|
||||
.i64_up_down_counter("pages_being_parsed")
|
||||
.build()
|
||||
);
|
||||
static BEING_STREAMED: LazyLock<UpDownCounter<i64>> = LazyLock::new(||
|
||||
METER
|
||||
.i64_up_down_counter("pages_being_streamed")
|
||||
.build()
|
||||
);
|
||||
static GET_IN_FLIGHT: LazyLock<UpDownCounter<i64>> = LazyLock::new(||
|
||||
METER
|
||||
.i64_up_down_counter("gets_in_flight")
|
||||
.build()
|
||||
);
|
||||
static TOTAL_BYTES_DOWN: LazyLock<Counter<u64>> = LazyLock::new(||
|
||||
METER
|
||||
.u64_counter("total_bytes_down")
|
||||
.build()
|
||||
);
|
||||
static SITES_CRAWLED: LazyLock<Counter<u64>> = LazyLock::new(||
|
||||
METER
|
||||
.u64_counter("total_sites_crawled")
|
||||
.build()
|
||||
);
|
||||
|
||||
static TRACER: LazyLock<BoxedTracer> = LazyLock::new(|| global::tracer("Internet_Mapper"));
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Config {
|
||||
@@ -49,6 +76,43 @@ struct Config {
|
||||
async fn main() {
|
||||
println!("Logs and metrics are provided to the Grafana dashboard");
|
||||
|
||||
// Start TRACE / LOGGING / METRICS
|
||||
// let otlp_log = opentelemetry_otlp::LogExporter::builder()
|
||||
// .with_tonic()
|
||||
// .with_endpoint(endpoint)
|
||||
// .build()
|
||||
// .unwrap();
|
||||
// Send metrics to Prometheus
|
||||
let otlp_metrics = opentelemetry_otlp::MetricExporter::builder()
|
||||
.with_http()
|
||||
.with_protocol(Protocol::HttpBinary)
|
||||
.with_endpoint("http://localhost:9090/api/v1/otlp/v1/metrics")
|
||||
.build()
|
||||
.unwrap();
|
||||
// Send spans to Alloy (which will send them to Tempo)
|
||||
let otlp_span = opentelemetry_otlp::SpanExporter::builder()
|
||||
.with_tonic()
|
||||
.with_endpoint("http://localhost:4317")
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
|
||||
.with_simple_exporter(otlp_span)
|
||||
.build();
|
||||
// let logger_provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder()
|
||||
// .with_simple_exporter(otlp_log)
|
||||
// .build();
|
||||
let metrics_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
|
||||
.with_periodic_exporter(otlp_metrics) // default delay is 60s, turn down to like 15
|
||||
.build();
|
||||
|
||||
global::set_tracer_provider(tracer_provider);
|
||||
global::set_meter_provider(metrics_provider);
|
||||
// How to set logger?
|
||||
|
||||
// End TRACE
|
||||
|
||||
// Start LOGGING
|
||||
let writer = std::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.create(true)
|
||||
@@ -70,15 +134,7 @@ async fn main() {
|
||||
);
|
||||
|
||||
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
|
||||
|
||||
let builder = PrometheusBuilder::new();
|
||||
builder
|
||||
.with_http_listener(std::net::SocketAddr::new(
|
||||
IpAddr::V4(Ipv4Addr::from_octets([0, 0, 0, 0])),
|
||||
2500,
|
||||
))
|
||||
.install()
|
||||
.expect("failed to install recorder/exporter");
|
||||
// End LOGGING
|
||||
|
||||
info!("Starting...");
|
||||
|
||||
@@ -106,16 +162,13 @@ async fn main() {
|
||||
|
||||
// Kick off the whole machine - This Website object doesn't matter, it's just to allow for
|
||||
// get() to work.
|
||||
let span = trace_span!("Pre-Loop");
|
||||
let pre_loop_span = span.enter();
|
||||
// Download the site
|
||||
let mut span = TRACER.start("Pre-Loop");
|
||||
let site = Website::new(starting_url, false);
|
||||
process(site, db.clone(), reqwest.clone()).await;
|
||||
span.end();
|
||||
// Download the site
|
||||
|
||||
drop(pre_loop_span);
|
||||
|
||||
let span = trace_span!("Loop");
|
||||
let span = span.enter();
|
||||
let mut main_loop_span= TRACER.start("Main-Loop");
|
||||
while crawled < config.budget {
|
||||
let uncrawled =
|
||||
get_uncrawled_links(&db, config.budget - crawled, config.crawl_filter.clone(), &config).await;
|
||||
@@ -127,22 +180,17 @@ async fn main() {
|
||||
{
|
||||
let mut futures = JoinSet::new();
|
||||
for site in uncrawled {
|
||||
gauge!(BEING_PROCESSED).increment(1);
|
||||
futures.spawn(process(site, db.clone(), reqwest.clone()));
|
||||
// let percent = format!("{:.2}%", (crawled as f32 / budget as f32) * 100f32);
|
||||
// info!("Crawled {crawled} out of {budget} pages. ({percent})");
|
||||
}
|
||||
|
||||
let c = counter!(SITES_CRAWLED);
|
||||
// As futures complete runs code in while block
|
||||
while futures.join_next().await.is_some() {
|
||||
c.increment(1);
|
||||
gauge!(BEING_PROCESSED).decrement(1);
|
||||
SITES_CRAWLED.add(1, &[]);
|
||||
crawled += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(span);
|
||||
main_loop_span.end();
|
||||
|
||||
if let Ok(mut ok) = db
|
||||
.query("count(select id from website where crawled = true)")
|
||||
@@ -161,17 +209,20 @@ async fn main() {
|
||||
/// Downloads and crawls and stores a webpage.
|
||||
/// It is acceptable to clone `db`, `reqwest`, and `s3` because they all use `Arc`s internally. - Noted by Oliver
|
||||
async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client) {
|
||||
|
||||
// METRICS
|
||||
trace!("Process: {}", &site.site);
|
||||
BEING_PROCESSED.add(1, &[]);
|
||||
let mut process_span = TRACER.start("Process");
|
||||
|
||||
// Build the request
|
||||
let request_builder = reqwest.get(site.site.to_string());
|
||||
|
||||
// METRICS
|
||||
let g = gauge!(GET_IN_FLIGHT);
|
||||
g.increment(1);
|
||||
|
||||
|
||||
// Send the http request (get)
|
||||
GET_IN_FLIGHT.add(1, &[]);
|
||||
if let Ok(response) = request_builder.send().await {
|
||||
GET_IN_FLIGHT.add(-1, &[]);
|
||||
|
||||
let headers = response.headers();
|
||||
let code = response.status();
|
||||
|
||||
@@ -203,10 +254,13 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
|
||||
// Write file to disk
|
||||
info!("Writing at: {:?}", path);
|
||||
trace!("Writing at: {:?}", path);
|
||||
BEING_STREAMED.add(1, &[]);
|
||||
let mut stream_span = TRACER.start("Stream");
|
||||
while let Some(data) = stream.next().await {
|
||||
match data {
|
||||
Ok(data) => {
|
||||
TOTAL_BYTES_DOWN.add(data.len() as u64, &[]);
|
||||
let _ = writer.write_all(&data).await;
|
||||
// If we are going to parse this file later, we will save it
|
||||
// into memory as well as the disk.
|
||||
@@ -220,13 +274,14 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
|
||||
},
|
||||
}
|
||||
}
|
||||
stream_span.end();
|
||||
BEING_STREAMED.add(-1, &[]);
|
||||
let _ = writer.flush();
|
||||
|
||||
|
||||
// (If needed) Parse the file
|
||||
if should_parse {
|
||||
let span = debug_span!("Should Parse");
|
||||
let enter = span.enter();
|
||||
BEING_PARSED.add(1, &[]);
|
||||
let mut parsing_span = TRACER.start("Parsing");
|
||||
|
||||
// Parse document and get relationships
|
||||
let sites = parser::parse(&site, &buf).await;
|
||||
@@ -242,13 +297,10 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
|
||||
// Store all the other sites so that we can link to them.
|
||||
let _ = Website::store_all(de_dupe_sites, &db).await;
|
||||
|
||||
drop(enter);
|
||||
parsing_span.end();
|
||||
BEING_PARSED.add(-1, &[]);
|
||||
}
|
||||
|
||||
// METRICS
|
||||
g.decrement(1);
|
||||
counter!(GET_METRIC).increment(1);
|
||||
|
||||
// update self in db
|
||||
site.crawled = true;
|
||||
site.status_code = code.as_u16();
|
||||
@@ -256,11 +308,12 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
|
||||
} else {
|
||||
error!("File failed to cooperate: {:?}", path);
|
||||
}
|
||||
|
||||
trace!("Done processing: {}", &site.site);
|
||||
} else {
|
||||
error!("Failed to get: {}", &site.site);
|
||||
}
|
||||
|
||||
process_span.end();
|
||||
BEING_PROCESSED.add(-1, &[]);
|
||||
}
|
||||
|
||||
/// Returns uncrawled links
|
||||
@@ -275,7 +328,7 @@ async fn get_uncrawled_links(
|
||||
count = config.batch_size;
|
||||
}
|
||||
|
||||
debug!("Getting uncrawled links");
|
||||
debug!("Getting {} uncrawled links from DB", count);
|
||||
|
||||
let mut response = db
|
||||
.query("SELECT * FROM website WHERE crawled = false AND site ~ type::string($format) LIMIT $count;")
|
||||
|
@@ -63,7 +63,7 @@ impl TokenSink for Website {
|
||||
#[instrument(skip_all)]
|
||||
/// Parses the passed site and returns all the sites it links to.
|
||||
pub async fn parse(site: &Website, data: &[u8]) -> Vec<Website> {
|
||||
debug!("Parsing {}", site.site.to_string());
|
||||
trace!("Parsing {}", site.site.to_string());
|
||||
// prep work
|
||||
let mut other_sites: Vec<Website> = Vec::new();
|
||||
|
||||
|
Reference in New Issue
Block a user