epic metrics
This commit is contained in:
26
src/db.rs
26
src/db.rs
@@ -1,4 +1,5 @@
|
||||
use std::fmt::Debug;
|
||||
use metrics::counter;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use surrealdb::{
|
||||
engine::remote::ws::{Client, Ws}, error::Db, opt::auth::Root, sql::Thing, Response, Surreal
|
||||
@@ -8,6 +9,10 @@ use url::Url;
|
||||
|
||||
use crate::{Config, Timer};
|
||||
|
||||
const ROUND_TRIP_METRIC: &'static str = "surql_trips";
|
||||
const STORE: &'static str = "surql_store_calls";
|
||||
const LINK: &'static str = "surql_link_calls";
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct Website {
|
||||
/// The url that this data is found at
|
||||
@@ -47,6 +52,7 @@ impl Website {
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn links_to(&self, other: Vec<Thing>, db: &Surreal<Client>) {
|
||||
|
||||
let len = other.len();
|
||||
if len == 0 {return}
|
||||
|
||||
@@ -57,6 +63,8 @@ impl Website {
|
||||
let timer = Timer::start(&msg);
|
||||
// prevent the timer from being dropped instantly.
|
||||
let _ = timer;
|
||||
counter!(ROUND_TRIP_METRIC).increment(1);
|
||||
counter!(LINK).increment(1);
|
||||
match db
|
||||
.query("COUNT(RELATE (SELECT id FROM website WHERE site = $in) -> links_to -> $out)")
|
||||
.bind(("in", from))
|
||||
@@ -89,8 +97,12 @@ impl Website {
|
||||
|
||||
#[instrument(name = "surql_store", skip_all)]
|
||||
pub async fn store(&self, db: &Surreal<Client>) -> Option<Thing> {
|
||||
counter!(STORE).increment(1);
|
||||
let counter = counter!(ROUND_TRIP_METRIC);
|
||||
let t = Timer::start("Stored link");
|
||||
let _ = t;
|
||||
counter.increment(1);
|
||||
|
||||
// check if it's been gone thru before
|
||||
let mut response = db
|
||||
.query("SELECT * FROM ONLY website WHERE site = $site LIMIT 1")
|
||||
@@ -105,6 +117,7 @@ impl Website {
|
||||
let mut new = self.clone();
|
||||
new.crawled = old.crawled | new.crawled;
|
||||
|
||||
counter.increment(1);
|
||||
// update the record
|
||||
match db.upsert((id.tb, id.id.to_string())).content(new).await {
|
||||
Ok(e) => {
|
||||
@@ -130,6 +143,7 @@ impl Website {
|
||||
};
|
||||
}
|
||||
} else {
|
||||
counter.increment(1);
|
||||
// sites hasn't existed yet
|
||||
match db.create("website").content(self.clone()).await {
|
||||
Ok(e) => {
|
||||
@@ -165,23 +179,23 @@ pub struct Record {
|
||||
}
|
||||
|
||||
#[instrument(skip_all, name = "SurrealDB")]
|
||||
pub async fn connect(config: &Config<'_>) -> surrealdb::Result<Surreal<Client>> {
|
||||
pub async fn connect(config: &Config) -> surrealdb::Result<Surreal<Client>> {
|
||||
trace!("Establishing connection to surreal...");
|
||||
// Connect to the server
|
||||
let db = Surreal::new::<Ws>(config.surreal_url).await?;
|
||||
let db = Surreal::new::<Ws>(&config.surreal_url).await?;
|
||||
|
||||
trace!("Logging in...");
|
||||
// Signin as a namespace, database, or root user
|
||||
db.signin(Root {
|
||||
username: config.surreal_username,
|
||||
password: config.surreal_password,
|
||||
username: &config.surreal_username,
|
||||
password: &config.surreal_password,
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Select a specific namespace / database
|
||||
db
|
||||
.use_ns(config.surreal_ns)
|
||||
.use_db(config.surreal_db)
|
||||
.use_ns(&config.surreal_ns)
|
||||
.use_db(&config.surreal_db)
|
||||
.await?;
|
||||
|
||||
let setup = include_bytes!("setup.surql");
|
||||
|
108
src/main.rs
108
src/main.rs
@@ -1,32 +1,43 @@
|
||||
#![feature(type_alias_impl_trait)]
|
||||
#![feature(const_async_blocks)]
|
||||
#![feature(ip_from)]
|
||||
|
||||
extern crate html5ever;
|
||||
|
||||
use std::time::Instant;
|
||||
use std::{fs::File, io::Read, net::{IpAddr, Ipv4Addr}, time::Instant};
|
||||
|
||||
use db::{connect, Website};
|
||||
use metrics::{counter, gauge};
|
||||
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||
use s3::S3;
|
||||
use serde::Deserialize;
|
||||
use surrealdb::{engine::remote::ws::Client, Surreal};
|
||||
use tokio::{task::JoinSet};
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{debug, info, instrument, trace, trace_span, warn};
|
||||
use tracing_subscriber::{fmt::{self, time::LocalTime}, layer::{Filter, SubscriberExt}, EnvFilter, Layer, Registry};
|
||||
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer, Registry};
|
||||
|
||||
mod db;
|
||||
mod parser;
|
||||
mod s3;
|
||||
|
||||
struct Config<'a> {
|
||||
surreal_ns: &'a str,
|
||||
surreal_db: &'a str,
|
||||
surreal_url: &'a str,
|
||||
surreal_username: &'a str,
|
||||
surreal_password: &'a str,
|
||||
const GET_METRIC: &'static str = "total_gets";
|
||||
const GET_IN_FLIGHT: &'static str = "gets_in_flight";
|
||||
const SITES_CRAWLED: &'static str = "pages_crawled";
|
||||
const BEING_PROCESSED: &'static str = "pages_being_processed";
|
||||
|
||||
s3_url: &'a str,
|
||||
s3_bucket: &'a str,
|
||||
s3_access_key: &'a str,
|
||||
s3_secret_key: &'a str,
|
||||
#[derive(Deserialize)]
|
||||
struct Config {
|
||||
surreal_ns: String,
|
||||
surreal_db: String,
|
||||
surreal_url: String,
|
||||
surreal_username: String,
|
||||
surreal_password: String,
|
||||
|
||||
s3_url: String,
|
||||
s3_bucket: String,
|
||||
s3_access_key: String,
|
||||
s3_secret_key: String,
|
||||
|
||||
crawl_filter: String,
|
||||
budget: usize,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@@ -48,38 +59,33 @@ async fn main() {
|
||||
// .with_timer(LocalTime::rfc_3339()) // Loki or alloy does this automatically
|
||||
.json()
|
||||
.with_writer(writer)
|
||||
// .with_filter(EnvFilter::from_default_env())
|
||||
);
|
||||
|
||||
tracing::subscriber::set_global_default(registry).expect("Failed to set default subscriber");
|
||||
|
||||
// tracing_subscriber::fmt()
|
||||
// .with_env_filter(EnvFilter::from_default_env())
|
||||
// .with_line_number(true)
|
||||
// .with_thread_ids(true)
|
||||
// .with_file(true)
|
||||
// .with_timer(LocalTime::rfc_3339())
|
||||
// .init();
|
||||
|
||||
let builder = PrometheusBuilder::new();
|
||||
builder.with_http_listener(
|
||||
std::net::SocketAddr::new(IpAddr::V4(Ipv4Addr::from_octets([0,0,0,0])), 2500)
|
||||
)
|
||||
.install()
|
||||
.expect("failed to install recorder/exporter");
|
||||
|
||||
debug!("Starting...");
|
||||
// Would probably take these in as parameters from a cli
|
||||
// Would probably take these in as parameters from a cli
|
||||
let starting_url = "https://en.wikipedia.org/";
|
||||
// When getting uncrawled pages, name must contain this variable. "" will effectively get ignored.
|
||||
let crawl_filter = "en.wikipedia.org/";
|
||||
let budget = 50;
|
||||
// let crawl_filter = "en.wikipedia.org/";
|
||||
// let budget = 50;
|
||||
let mut crawled = 0;
|
||||
|
||||
let config = Config {
|
||||
surreal_url: "localhost:8000",
|
||||
surreal_username: "root",
|
||||
surreal_password: "root",
|
||||
surreal_ns: "test",
|
||||
surreal_db: "v1.12",
|
||||
s3_bucket: "v1.12",
|
||||
s3_url: "http://localhost:9000",
|
||||
s3_access_key: "p8gXIZEO2FnWqWBiJYwo",
|
||||
s3_secret_key: "1mRO0EYA2YAQ0xsKrlbkIIz4AT8KNXy6QIQPtxUu",
|
||||
};
|
||||
|
||||
|
||||
let mut file = File::open("./Crawler.toml").expect("Failed to read Crawler.toml");
|
||||
let mut buf = String::new();
|
||||
let _ = file.read_to_string(&mut buf);
|
||||
|
||||
let config: Config = toml::from_str(&buf).expect("Failed to parse Crawler.toml");
|
||||
|
||||
let db = connect(&config)
|
||||
.await
|
||||
@@ -106,14 +112,14 @@ async fn main() {
|
||||
|
||||
let span = trace_span!("Loop");
|
||||
let span = span.enter();
|
||||
while crawled < budget {
|
||||
let get_num = if budget - crawled < 100 {
|
||||
budget - crawled
|
||||
while crawled < config.budget {
|
||||
let get_num = if config.budget - crawled < 100 {
|
||||
config.budget - crawled
|
||||
} else {
|
||||
100
|
||||
};
|
||||
|
||||
let uncrawled = get_uncrawled_links(&db, get_num, crawl_filter.to_string()).await;
|
||||
let uncrawled = get_uncrawled_links(&db, get_num, config.crawl_filter.clone()).await;
|
||||
if uncrawled.len() == 0 {
|
||||
info!("Had more budget but finished crawling everything.");
|
||||
return;
|
||||
@@ -126,16 +132,20 @@ async fn main() {
|
||||
{
|
||||
let mut futures = JoinSet::new();
|
||||
for site in uncrawled {
|
||||
gauge!(BEING_PROCESSED).increment(1);
|
||||
futures.spawn(get(site, db.clone(), reqwest.clone(), s3.clone()));
|
||||
// technically the site hasn't be crawled *yet*, but the future
|
||||
// where it is crawled has been set up.
|
||||
crawled += 1;
|
||||
// let percent = format!("{:.2}%", (crawled as f32 / budget as f32) * 100f32);
|
||||
// info!("Crawled {crawled} out of {budget} pages. ({percent})");
|
||||
}
|
||||
debug!("Joining {} futures...", futures.len());
|
||||
// join all the gets together
|
||||
let _ = futures.join_all().await;
|
||||
|
||||
let c = counter!(SITES_CRAWLED);
|
||||
// As futures complete runs code in while block
|
||||
while let Some(_) = futures.join_next().await {
|
||||
c.increment(1);
|
||||
gauge!(BEING_PROCESSED).decrement(1);
|
||||
crawled += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(span);
|
||||
@@ -154,9 +164,15 @@ async fn get(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Client, s
|
||||
let request_builder = reqwest.get(site.to_string());
|
||||
timer.stop();
|
||||
|
||||
let g = gauge!(GET_IN_FLIGHT);
|
||||
g.increment(1);
|
||||
let timer = Timer::start("Got page");
|
||||
|
||||
if let Ok(response) = request_builder.send().await {
|
||||
|
||||
timer.stop();
|
||||
g.decrement(1);
|
||||
counter!(GET_METRIC).increment(1);
|
||||
debug!("Getting body...");
|
||||
|
||||
// Get body
|
||||
|
@@ -97,6 +97,7 @@ pub async fn parse(db: &Surreal<Client>, site: &mut Website, data: &str) {
|
||||
let mut links_to = Vec::with_capacity(other_sites.len());
|
||||
|
||||
for a in other_sites {
|
||||
|
||||
let other = a.store(db).await;
|
||||
if let Some(o) = other {
|
||||
links_to.push(o);
|
||||
|
@@ -1,4 +1,5 @@
|
||||
use base64::{alphabet, engine::{self, general_purpose}, Engine};
|
||||
use metrics::counter;
|
||||
use minio::s3::{
|
||||
args::{BucketExistsArgs, MakeBucketArgs},
|
||||
client::ClientBuilder,
|
||||
@@ -14,6 +15,8 @@ use crate::{Config, Timer};
|
||||
|
||||
const CUSTOM_ENGINE: engine::GeneralPurpose = engine::GeneralPurpose::new(&alphabet::URL_SAFE, general_purpose::NO_PAD);
|
||||
|
||||
const ROUND_TRIP_METRIC: &'static str = "s3_trips";
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct S3 {
|
||||
bucket_name: String,
|
||||
@@ -22,7 +25,7 @@ pub struct S3 {
|
||||
|
||||
impl S3 {
|
||||
#[instrument(skip_all, name = "S3")]
|
||||
pub async fn connect(config: &Config<'_>) -> Result<Self, Error> {
|
||||
pub async fn connect(config: &Config) -> Result<Self, Error> {
|
||||
let base_url = config
|
||||
.s3_url
|
||||
.parse::<BaseUrl>()
|
||||
@@ -62,6 +65,7 @@ impl S3 {
|
||||
|
||||
#[instrument(name = "s3_store", skip_all)]
|
||||
pub async fn store(&self, data: &str, url: &Url) {
|
||||
let counter = counter!(ROUND_TRIP_METRIC);
|
||||
let t = Timer::start("Stored page");
|
||||
let _ = t; // prevent compiler drop
|
||||
if let Some(domain) = url.domain() {
|
||||
@@ -69,6 +73,7 @@ impl S3 {
|
||||
|
||||
trace!("Created filename: {filename} from raw: {}", url.to_string());
|
||||
|
||||
counter.increment(1);
|
||||
let _ = match &self
|
||||
.client
|
||||
.put_object_content(&self.bucket_name, &filename, data.to_owned())
|
||||
@@ -83,6 +88,7 @@ impl S3 {
|
||||
|
||||
let filename: String = domain.to_owned() + &CUSTOM_ENGINE.encode(url.path());
|
||||
|
||||
counter.increment(1);
|
||||
let _ = &self
|
||||
.client
|
||||
.put_object_content(&self.bucket_name, &filename, data.to_owned())
|
||||
|
Reference in New Issue
Block a user