Compare commits
	
		
			2 Commits
		
	
	
		
			e535bcc295
			...
			f3a51065b5
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| f3a51065b5 | |||
| 343d3a7570 | 
@@ -59,7 +59,6 @@ impl Website {
 | 
				
			|||||||
        counter!(STORE).increment(1);
 | 
					        counter!(STORE).increment(1);
 | 
				
			||||||
        let mut things = Vec::with_capacity(all.len());
 | 
					        let mut things = Vec::with_capacity(all.len());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // FIXME fails *sometimes* because "Resource Busy"
 | 
					 | 
				
			||||||
        match db
 | 
					        match db
 | 
				
			||||||
            .query(
 | 
					            .query(
 | 
				
			||||||
                "INSERT INTO website $array
 | 
					                "INSERT INTO website $array
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -36,12 +36,16 @@ pub fn as_path(url: &Url, content_type: &HeaderValue) -> PathBuf {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
pub async fn init(filename: &PathBuf) -> Option<fs::File> {
 | 
					pub async fn init(filename: &PathBuf) -> Option<fs::File> {
 | 
				
			||||||
    let file = async || tokio::fs::OpenOptions::new()
 | 
					    let file = async || tokio::fs::OpenOptions::new()
 | 
				
			||||||
 | 
					        .write(true)
 | 
				
			||||||
        .append(true)
 | 
					        .append(true)
 | 
				
			||||||
        .create(true)
 | 
					        .create(true)
 | 
				
			||||||
        .open(&filename).await;
 | 
					        .open(&filename).await;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    match file().await {
 | 
					    match file().await {
 | 
				
			||||||
        Ok(ok) => Some(ok),
 | 
					        Ok(ok) => {
 | 
				
			||||||
 | 
					            trace!("Initialized file {}", filename.to_str().unwrap_or("N/A"));
 | 
				
			||||||
 | 
					            Some(ok)
 | 
				
			||||||
 | 
					        },
 | 
				
			||||||
        Err(err) => {
 | 
					        Err(err) => {
 | 
				
			||||||
            // the file/folder isn't found
 | 
					            // the file/folder isn't found
 | 
				
			||||||
            if err.kind() == ErrorKind::NotFound {
 | 
					            if err.kind() == ErrorKind::NotFound {
 | 
				
			||||||
@@ -55,6 +59,14 @@ pub async fn init(filename: &PathBuf) -> Option<fs::File> {
 | 
				
			|||||||
                } else {
 | 
					                } else {
 | 
				
			||||||
                    error!("Couldn't get file's parents: {:?}", &filename);
 | 
					                    error!("Couldn't get file's parents: {:?}", &filename);
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
 | 
					            } else if err.kind() == ErrorKind::NotADirectory {
 | 
				
			||||||
 | 
					                // Example:
 | 
				
			||||||
 | 
					                // 1. example.com/user
 | 
				
			||||||
 | 
					                // 2. example.com/user/post
 | 
				
			||||||
 | 
					                // If file 1 exists it will prevent file 2 from existing
 | 
				
			||||||
 | 
					                // FIXME
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                error!("One of the parent directories is actually a file...")
 | 
				
			||||||
            } else {
 | 
					            } else {
 | 
				
			||||||
                error!("File open error: {err} {:?}", filename);
 | 
					                error!("File open error: {err} {:?}", filename);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										23
									
								
								src/main.rs
									
									
									
									
									
								
							
							
						
						
									
										23
									
								
								src/main.rs
									
									
									
									
									
								
							@@ -12,6 +12,7 @@ use futures_util::StreamExt;
 | 
				
			|||||||
use opentelemetry::{global::{self}, metrics::{Counter, Meter, UpDownCounter}};
 | 
					use opentelemetry::{global::{self}, metrics::{Counter, Meter, UpDownCounter}};
 | 
				
			||||||
use opentelemetry_otlp::{Protocol, WithExportConfig};
 | 
					use opentelemetry_otlp::{Protocol, WithExportConfig};
 | 
				
			||||||
use db::{connect, Website};
 | 
					use db::{connect, Website};
 | 
				
			||||||
 | 
					use opentelemetry_sdk::{metrics::SdkMeterProvider, trace::SdkTracerProvider};
 | 
				
			||||||
use serde::Deserialize;
 | 
					use serde::Deserialize;
 | 
				
			||||||
use surrealdb::{engine::remote::ws::Client, Surreal};
 | 
					use surrealdb::{engine::remote::ws::Client, Surreal};
 | 
				
			||||||
use tokio::{io::{AsyncWriteExt, BufWriter}, sync::RwLock, task::JoinSet};
 | 
					use tokio::{io::{AsyncWriteExt, BufWriter}, sync::RwLock, task::JoinSet};
 | 
				
			||||||
@@ -91,9 +92,9 @@ async fn main() {
 | 
				
			|||||||
    println!("Logs and metrics are provided to the Grafana dashboard");
 | 
					    println!("Logs and metrics are provided to the Grafana dashboard");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // Start TRACE / LOGGING / METRICS
 | 
					    // Start TRACE / LOGGING / METRICS
 | 
				
			||||||
    load_tracing(&CONFIG);
 | 
					    load_logging(&CONFIG); // this seems to be working ok
 | 
				
			||||||
    load_logging(&CONFIG);
 | 
					    global::set_tracer_provider(load_tracing(&CONFIG));
 | 
				
			||||||
    load_metrics(&CONFIG);
 | 
					    global::set_meter_provider(load_metrics(&CONFIG));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
 | 
					    // When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
 | 
				
			||||||
    // let crawl_filter = "en.wikipedia.org/";
 | 
					    // let crawl_filter = "en.wikipedia.org/";
 | 
				
			||||||
@@ -263,25 +264,23 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
 | 
				
			|||||||
                // parsing_span.end();
 | 
					                // parsing_span.end();
 | 
				
			||||||
                BEING_PARSED.add(-1, &[]);
 | 
					                BEING_PARSED.add(-1, &[]);
 | 
				
			||||||
            } else {
 | 
					            } else {
 | 
				
			||||||
                trace!("Did not parse: {}", site.site.as_str());
 | 
					                trace!(url = site.site.as_str(), "Parse = False");
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            // update self in db
 | 
					            // update self in db
 | 
				
			||||||
            site.crawled = true;
 | 
					            site.crawled = true;
 | 
				
			||||||
            site.status_code = code.as_u16();
 | 
					            site.status_code = code.as_u16();
 | 
				
			||||||
            Website::store_all(vec![site.clone()], &db).await;
 | 
					            Website::store_all(vec![site.clone()], &db).await;
 | 
				
			||||||
        } else {
 | 
					 | 
				
			||||||
            error!("File failed to cooperate: {:?}", path);
 | 
					 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
        error!("Failed to get: {}", &site.site);
 | 
					        error!(url = site.site.as_str(), "Failed to get: {}", &site.site);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
    // process_span.end();
 | 
					    // process_span.end();
 | 
				
			||||||
    BEING_PROCESSED.add(-1, &[]);
 | 
					    BEING_PROCESSED.add(-1, &[]);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
fn load_tracing(config: &Config) {
 | 
					fn load_tracing(config: &Config) -> SdkTracerProvider {
 | 
				
			||||||
    // Send spans to Alloy (which will send them to Tempo)
 | 
					    // Send spans to Alloy (which will send them to Tempo)
 | 
				
			||||||
    let otlp_span = opentelemetry_otlp::SpanExporter::builder()
 | 
					    let otlp_span = opentelemetry_otlp::SpanExporter::builder()
 | 
				
			||||||
        .with_tonic()
 | 
					        .with_tonic()
 | 
				
			||||||
@@ -291,8 +290,7 @@ fn load_tracing(config: &Config) {
 | 
				
			|||||||
    let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
 | 
					    let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
 | 
				
			||||||
        .with_simple_exporter(otlp_span)
 | 
					        .with_simple_exporter(otlp_span)
 | 
				
			||||||
        .build();
 | 
					        .build();
 | 
				
			||||||
 | 
					    tracer_provider
 | 
				
			||||||
    global::set_tracer_provider(tracer_provider);
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
fn load_logging(config: &Config) {
 | 
					fn load_logging(config: &Config) {
 | 
				
			||||||
@@ -327,7 +325,7 @@ fn load_logging(config: &Config) {
 | 
				
			|||||||
    tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
 | 
					    tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
fn load_metrics(config: &Config) {
 | 
					fn load_metrics(config: &Config) -> SdkMeterProvider {
 | 
				
			||||||
    //  Send metrics to Prometheus
 | 
					    //  Send metrics to Prometheus
 | 
				
			||||||
    let otlp_metrics = opentelemetry_otlp::MetricExporter::builder()
 | 
					    let otlp_metrics = opentelemetry_otlp::MetricExporter::builder()
 | 
				
			||||||
        .with_http()
 | 
					        .with_http()
 | 
				
			||||||
@@ -338,8 +336,7 @@ fn load_metrics(config: &Config) {
 | 
				
			|||||||
    let metrics_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
 | 
					    let metrics_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
 | 
				
			||||||
        .with_periodic_exporter(otlp_metrics) // default delay is 60s, turn down to like 15
 | 
					        .with_periodic_exporter(otlp_metrics) // default delay is 60s, turn down to like 15
 | 
				
			||||||
        .build();
 | 
					        .build();
 | 
				
			||||||
 | 
					    metrics_provider
 | 
				
			||||||
    global::set_meter_provider(metrics_provider);
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user