Compare commits
2 Commits
e535bcc295
...
f3a51065b5
Author | SHA1 | Date | |
---|---|---|---|
f3a51065b5 | |||
343d3a7570 |
@ -59,7 +59,6 @@ impl Website {
|
||||
counter!(STORE).increment(1);
|
||||
let mut things = Vec::with_capacity(all.len());
|
||||
|
||||
// FIXME fails *sometimes* because "Resource Busy"
|
||||
match db
|
||||
.query(
|
||||
"INSERT INTO website $array
|
||||
|
@ -36,12 +36,16 @@ pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf {
|
||||
|
||||
pub async fn init(filename: &PathBuf) -> Option<fs::File> {
|
||||
let file = async || tokio::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.append(true)
|
||||
.create(true)
|
||||
.open(&filename).await;
|
||||
|
||||
match file().await {
|
||||
Ok(ok) => Some(ok),
|
||||
Ok(ok) => {
|
||||
trace!("Initialized file {}", filename.to_str().unwrap_or("N/A"));
|
||||
Some(ok)
|
||||
},
|
||||
Err(err) => {
|
||||
// the file/folder isn't found
|
||||
if err.kind() == ErrorKind::NotFound {
|
||||
@ -55,6 +59,14 @@ pub async fn init(filename: &PathBuf) -> Option<fs::File> {
|
||||
} else {
|
||||
error!("Couldn't get file's parents: {:?}", &filename);
|
||||
}
|
||||
} else if err.kind() == ErrorKind::NotADirectory {
|
||||
// Example:
|
||||
// 1. example.com/user
|
||||
// 2. example.com/user/post
|
||||
// If file 1 exists it will prevent file 2 from existing
|
||||
// FIXME
|
||||
|
||||
error!("One of the parent directories is actually a file...")
|
||||
} else {
|
||||
error!("File open error: {err} {:?}", filename);
|
||||
}
|
||||
|
23
src/main.rs
23
src/main.rs
@ -12,6 +12,7 @@ use futures_util::StreamExt;
|
||||
use opentelemetry::{global::{self}, metrics::{Counter, Meter, UpDownCounter}};
|
||||
use opentelemetry_otlp::{Protocol, WithExportConfig};
|
||||
use db::{connect, Website};
|
||||
use opentelemetry_sdk::{metrics::SdkMeterProvider, trace::SdkTracerProvider};
|
||||
use serde::Deserialize;
|
||||
use surrealdb::{engine::remote::ws::Client, Surreal};
|
||||
use tokio::{io::{AsyncWriteExt, BufWriter}, sync::RwLock, task::JoinSet};
|
||||
@ -91,9 +92,9 @@ async fn main() {
|
||||
println!("Logs and metrics are provided to the Grafana dashboard");
|
||||
|
||||
// Start TRACE / LOGGING / METRICS
|
||||
load_tracing(&CONFIG);
|
||||
load_logging(&CONFIG);
|
||||
load_metrics(&CONFIG);
|
||||
load_logging(&CONFIG); // this seems to be working ok
|
||||
global::set_tracer_provider(load_tracing(&CONFIG));
|
||||
global::set_meter_provider(load_metrics(&CONFIG));
|
||||
|
||||
// When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
|
||||
// let crawl_filter = "en.wikipedia.org/";
|
||||
@ -263,25 +264,23 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
|
||||
// parsing_span.end();
|
||||
BEING_PARSED.add(-1, &[]);
|
||||
} else {
|
||||
trace!("Did not parse: {}", site.site.as_str());
|
||||
trace!(url = site.site.as_str(), "Parse = False");
|
||||
}
|
||||
|
||||
// update self in db
|
||||
site.crawled = true;
|
||||
site.status_code = code.as_u16();
|
||||
Website::store_all(vec![site.clone()], &db).await;
|
||||
} else {
|
||||
error!("File failed to cooperate: {:?}", path);
|
||||
}
|
||||
} else {
|
||||
error!("Failed to get: {}", &site.site);
|
||||
error!(url = site.site.as_str(), "Failed to get: {}", &site.site);
|
||||
}
|
||||
|
||||
// process_span.end();
|
||||
BEING_PROCESSED.add(-1, &[]);
|
||||
}
|
||||
|
||||
fn load_tracing(config: &Config) {
|
||||
fn load_tracing(config: &Config) -> SdkTracerProvider {
|
||||
// Send spans to Alloy (which will send them to Tempo)
|
||||
let otlp_span = opentelemetry_otlp::SpanExporter::builder()
|
||||
.with_tonic()
|
||||
@ -291,8 +290,7 @@ fn load_tracing(config: &Config) {
|
||||
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
|
||||
.with_simple_exporter(otlp_span)
|
||||
.build();
|
||||
|
||||
global::set_tracer_provider(tracer_provider);
|
||||
tracer_provider
|
||||
}
|
||||
|
||||
fn load_logging(config: &Config) {
|
||||
@ -327,7 +325,7 @@ fn load_logging(config: &Config) {
|
||||
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
|
||||
}
|
||||
|
||||
fn load_metrics(config: &Config) {
|
||||
fn load_metrics(config: &Config) -> SdkMeterProvider {
|
||||
// Send metrics to Prometheus
|
||||
let otlp_metrics = opentelemetry_otlp::MetricExporter::builder()
|
||||
.with_http()
|
||||
@ -338,8 +336,7 @@ fn load_metrics(config: &Config) {
|
||||
let metrics_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
|
||||
.with_periodic_exporter(otlp_metrics) // default delay is 60s, turn down to like 15
|
||||
.build();
|
||||
|
||||
global::set_meter_provider(metrics_provider);
|
||||
metrics_provider
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user