Compare commits

...

2 Commits

Author SHA1 Message Date
f3a51065b5 remove fixme 2025-07-17 09:37:03 -06:00
343d3a7570 better logging 2025-07-17 09:36:37 -06:00
3 changed files with 23 additions and 15 deletions

View File

@ -59,7 +59,6 @@ impl Website {
counter!(STORE).increment(1); counter!(STORE).increment(1);
let mut things = Vec::with_capacity(all.len()); let mut things = Vec::with_capacity(all.len());
// FIXME fails *sometimes* because "Resource Busy"
match db match db
.query( .query(
"INSERT INTO website $array "INSERT INTO website $array

View File

@ -36,12 +36,16 @@ pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf {
pub async fn init(filename: &PathBuf) -> Option<fs::File> { pub async fn init(filename: &PathBuf) -> Option<fs::File> {
let file = async || tokio::fs::OpenOptions::new() let file = async || tokio::fs::OpenOptions::new()
.write(true)
.append(true) .append(true)
.create(true) .create(true)
.open(&filename).await; .open(&filename).await;
match file().await { match file().await {
Ok(ok) => Some(ok), Ok(ok) => {
trace!("Initialized file {}", filename.to_str().unwrap_or("N/A"));
Some(ok)
},
Err(err) => { Err(err) => {
// the file/folder isn't found // the file/folder isn't found
if err.kind() == ErrorKind::NotFound { if err.kind() == ErrorKind::NotFound {
@ -55,6 +59,14 @@ pub async fn init(filename: &PathBuf) -> Option<fs::File> {
} else { } else {
error!("Couldn't get file's parents: {:?}", &filename); error!("Couldn't get file's parents: {:?}", &filename);
} }
} else if err.kind() == ErrorKind::NotADirectory {
// Example:
// 1. example.com/user
// 2. example.com/user/post
// If file 1 exists it will prevent file 2 from existing
// FIXME
error!("One of the parent directories is actually a file...")
} else { } else {
error!("File open error: {err} {:?}", filename); error!("File open error: {err} {:?}", filename);
} }

View File

@ -12,6 +12,7 @@ use futures_util::StreamExt;
use opentelemetry::{global::{self}, metrics::{Counter, Meter, UpDownCounter}}; use opentelemetry::{global::{self}, metrics::{Counter, Meter, UpDownCounter}};
use opentelemetry_otlp::{Protocol, WithExportConfig}; use opentelemetry_otlp::{Protocol, WithExportConfig};
use db::{connect, Website}; use db::{connect, Website};
use opentelemetry_sdk::{metrics::SdkMeterProvider, trace::SdkTracerProvider};
use serde::Deserialize; use serde::Deserialize;
use surrealdb::{engine::remote::ws::Client, Surreal}; use surrealdb::{engine::remote::ws::Client, Surreal};
use tokio::{io::{AsyncWriteExt, BufWriter}, sync::RwLock, task::JoinSet}; use tokio::{io::{AsyncWriteExt, BufWriter}, sync::RwLock, task::JoinSet};
@ -91,9 +92,9 @@ async fn main() {
println!("Logs and metrics are provided to the Grafana dashboard"); println!("Logs and metrics are provided to the Grafana dashboard");
// Start TRACE / LOGGING / METRICS // Start TRACE / LOGGING / METRICS
load_tracing(&CONFIG); load_logging(&CONFIG); // this seems to be working ok
load_logging(&CONFIG); global::set_tracer_provider(load_tracing(&CONFIG));
load_metrics(&CONFIG); global::set_meter_provider(load_metrics(&CONFIG));
// When getting uncrawled pages, name must contain this variable. "" will effectively get ignored. // When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
// let crawl_filter = "en.wikipedia.org/"; // let crawl_filter = "en.wikipedia.org/";
@ -263,25 +264,23 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
// parsing_span.end(); // parsing_span.end();
BEING_PARSED.add(-1, &[]); BEING_PARSED.add(-1, &[]);
} else { } else {
trace!("Did not parse: {}", site.site.as_str()); trace!(url = site.site.as_str(), "Parse = False");
} }
// update self in db // update self in db
site.crawled = true; site.crawled = true;
site.status_code = code.as_u16(); site.status_code = code.as_u16();
Website::store_all(vec![site.clone()], &db).await; Website::store_all(vec![site.clone()], &db).await;
} else {
error!("File failed to cooperate: {:?}", path);
} }
} else { } else {
error!("Failed to get: {}", &site.site); error!(url = site.site.as_str(), "Failed to get: {}", &site.site);
} }
// process_span.end(); // process_span.end();
BEING_PROCESSED.add(-1, &[]); BEING_PROCESSED.add(-1, &[]);
} }
fn load_tracing(config: &Config) { fn load_tracing(config: &Config) -> SdkTracerProvider {
// Send spans to Alloy (which will send them to Tempo) // Send spans to Alloy (which will send them to Tempo)
let otlp_span = opentelemetry_otlp::SpanExporter::builder() let otlp_span = opentelemetry_otlp::SpanExporter::builder()
.with_tonic() .with_tonic()
@ -291,8 +290,7 @@ fn load_tracing(config: &Config) {
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_simple_exporter(otlp_span) .with_simple_exporter(otlp_span)
.build(); .build();
tracer_provider
global::set_tracer_provider(tracer_provider);
} }
fn load_logging(config: &Config) { fn load_logging(config: &Config) {
@ -327,7 +325,7 @@ fn load_logging(config: &Config) {
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber"); tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
} }
fn load_metrics(config: &Config) { fn load_metrics(config: &Config) -> SdkMeterProvider {
// Send metrics to Prometheus // Send metrics to Prometheus
let otlp_metrics = opentelemetry_otlp::MetricExporter::builder() let otlp_metrics = opentelemetry_otlp::MetricExporter::builder()
.with_http() .with_http()
@ -338,8 +336,7 @@ fn load_metrics(config: &Config) {
let metrics_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder() let metrics_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
.with_periodic_exporter(otlp_metrics) // default delay is 60s, turn down to like 15 .with_periodic_exporter(otlp_metrics) // default delay is 60s, turn down to like 15
.build(); .build();
metrics_provider
global::set_meter_provider(metrics_provider);
} }