Compare commits

...

1 Commits

Author SHA1 Message Date
1b9964a938 not yet 2025-03-20 14:33:27 -06:00
6 changed files with 93 additions and 71 deletions

38
Cargo.lock generated
View File

@ -1961,6 +1961,25 @@ dependencies = [
"generic-array", "generic-array",
] ]
[[package]]
name = "internet_mapper"
version = "0.1.0"
dependencies = [
"base64 0.22.1",
"html5ever 0.29.1",
"metrics",
"metrics-exporter-prometheus",
"minio",
"reqwest",
"serde",
"surrealdb",
"tokio",
"toml",
"tracing",
"tracing-subscriber",
"url",
]
[[package]] [[package]]
name = "ipnet" name = "ipnet"
version = "2.11.0" version = "2.11.0"
@ -4112,25 +4131,6 @@ version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "surreal_spider"
version = "0.1.0"
dependencies = [
"base64 0.22.1",
"html5ever 0.29.1",
"metrics",
"metrics-exporter-prometheus",
"minio",
"reqwest",
"serde",
"surrealdb",
"tokio",
"toml",
"tracing",
"tracing-subscriber",
"url",
]
[[package]] [[package]]
name = "surrealdb" name = "surrealdb"
version = "2.2.1" version = "2.2.1"

View File

@ -1,5 +1,5 @@
[package] [package]
name = "surreal_spider" name = "internet_mapper"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"

View File

@ -3,13 +3,13 @@ surreal_url = "localhost:8000"
surreal_username = "root" surreal_username = "root"
surreal_password = "root" surreal_password = "root"
surreal_ns = "test" surreal_ns = "test"
surreal_db = "v1.15.4" surreal_db = "v1.16"
# Minio config # Minio config
s3_bucket = "v1.15.4" s3_bucket = "v1.16"
s3_url = "http://localhost:9000" s3_url = "http://localhost:9000"
s3_access_key = "3ptjsHhRHCHlpCmgFy9n" s3_access_key = "DwJfDDVIbmCmfAblwSqp"
s3_secret_key = "68CmV07YExeCxb8kJhosSauEizj5CAE7PINZIfQz" s3_secret_key = "V4UqvC1Vm4AwLE5FAhu2gxlMfvexTBQnDxuy8uZx"
# Crawler config # Crawler config
crawl_filter = "en.wikipedia.com" crawl_filter = "en.wikipedia.com"

View File

@ -5,7 +5,7 @@ use base64::{
}; };
use metrics::counter; use metrics::counter;
use serde::{ser::SerializeStruct, Deserialize, Serialize}; use serde::{ser::SerializeStruct, Deserialize, Serialize};
use std::{fmt::Debug, sync::LazyLock, time::Instant}; use std::{collections::HashSet, fmt::Debug, sync::LazyLock, time::Instant};
use surrealdb::{ use surrealdb::{
engine::remote::ws::{Client, Ws}, engine::remote::ws::{Client, Ws},
opt::auth::Root, opt::auth::Root,
@ -29,7 +29,7 @@ const TIME_SPENT_ON_LOCK: &'static str = "surql_lock_waiting_ms";
const STORE: &'static str = "surql_store_calls"; const STORE: &'static str = "surql_store_calls";
const LINK: &'static str = "surql_link_calls"; const LINK: &'static str = "surql_link_calls";
#[derive(Deserialize, Clone)] #[derive(Deserialize, Clone, Hash, Eq, PartialEq)]
pub struct Website { pub struct Website {
/// The url that this data is found at /// The url that this data is found at
pub site: Url, pub site: Url,
@ -40,12 +40,13 @@ pub struct Website {
impl Serialize for Website { impl Serialize for Website {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where where
S: serde::Serializer { S: serde::Serializer,
let mut state = serializer.serialize_struct("Website", 2)?; {
state.serialize_field("crawled", &self.crawled)?; let mut state = serializer.serialize_struct("Website", 2)?;
// to_string() calls the correct naming of site state.serialize_field("crawled", &self.crawled)?;
state.serialize_field("site", &self.site.to_string())?; // to_string() calls the correct naming of site
state.end() state.serialize_field("site", &self.site.to_string())?;
state.end()
} }
} }
@ -63,10 +64,7 @@ impl Website {
Ok(a) => a, Ok(a) => a,
Err(_) => todo!(), Err(_) => todo!(),
}; };
Self { Self { crawled, site }
crawled,
site
}
} }
pub fn set_crawled(&mut self) { pub fn set_crawled(&mut self) {
@ -78,7 +76,10 @@ impl Website {
let domain = match site.domain() { let domain = match site.domain() {
Some(s) => s.to_string(), Some(s) => s.to_string(),
None => { None => {
warn!("Failed to get domain of URL: {}, falling back to 'localhost'", site.to_string()); warn!(
"Failed to get domain of URL: {}, falling back to 'localhost'",
site.to_string()
);
"localhost".to_string() "localhost".to_string()
} }
}; };
@ -86,6 +87,7 @@ impl Website {
domain + path domain + path
} }
pub fn get_url_as_b64_path(site: &Url) -> String { pub fn get_url_as_b64_path(site: &Url) -> String {
let domain = site.domain().unwrap_or("DOMAIN").to_string(); let domain = site.domain().unwrap_or("DOMAIN").to_string();
let path = &CUSTOM_ENGINE.encode(site.path()); let path = &CUSTOM_ENGINE.encode(site.path());
@ -99,7 +101,7 @@ impl Website {
if len == 0 { if len == 0 {
return; return;
} }
let from = &self.site; let from = &self.site;
// let to = other.site.to_string(); // let to = other.site.to_string();
@ -126,7 +128,10 @@ impl Website {
trace!("Link for {from} OK - {num}/{len}"); trace!("Link for {from} OK - {num}/{len}");
return; return;
} else { } else {
error!("Didn't link all the records. {num}/{len}. Surreal response: {:?}", e); error!(
"Didn't link all the records. {num}/{len}. Surreal response: {:?}",
e
);
return; return;
} }
} }
@ -139,48 +144,61 @@ impl Website {
} }
} }
pub async fn store_self(&self, db: &Surreal<Client>) {
counter!(STORE).increment(1);
db.query(
"INSERT INTO website $self
ON DUPLICATE KEY UPDATE
crawled = crawled OR $input.crawled
RETURN VALUE id;
",
)
.await
.expect("Failed to store self");
}
// Insert ever item in the vec into surreal, crawled state will be preserved as TRUE // Insert ever item in the vec into surreal, crawled state will be preserved as TRUE
// if already in the database as such or incoming data is TRUE. // if already in the database as such or incoming data is TRUE.
pub async fn store_all(all: Vec<Self>, db: &Surreal<Client>) -> Vec<Thing> { pub async fn store_all(all: HashSet<Self>, db: &Surreal<Client>) -> Vec<Thing> {
// NOTES:
// * all incoming Websites come in as !crawled
// * there are potentially duplicates in all
counter!(STORE).increment(1); counter!(STORE).increment(1);
let mut things = Vec::with_capacity(all.len());
// TODO this only allows for one thread to be in the database at a time.
// This is currently required since otherwise we get write errors.
// If the default `crawled` is set to false, we might not need to write any more
// than just the name. `accessed_at` is fun but not needed.
let now = Instant::now(); let now = Instant::now();
let lock = LOCK.lock().await; let lock = LOCK.lock().await;
counter!(TIME_SPENT_ON_LOCK).increment(now.elapsed().as_millis() as u64); counter!(TIME_SPENT_ON_LOCK).increment(now.elapsed().as_millis() as u64);
let mut results = Vec::with_capacity(all.len());
match db match db
.query( .query(
// TODO making this an upsert would make sense, but
// upserts seem to be broken.
//
// Doesn't look like upsert can take in an array, so insert
// it is...
//
"INSERT INTO website $array "INSERT INTO website $array
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
accessed_at = time::now(), last_write = time::now()
crawled = crawled OR $input.crawled RETURN VALUE id;"
RETURN VALUE id; ,
",
) )
.bind(("array", all)) .bind(("array", all))
.await .await
{ {
Ok(mut id) => match id.take::<Vec<Thing>>(0) { Ok(mut id) => match id.take::<Vec<Thing>>(0) {
Ok(mut x) => things.append(&mut x), Ok(mut x) => results.append(&mut x),
Err(err) => match err { Err(err) => {
Api(error) => { error!("{:?}", err);
eprintln!("{:?}", error); }
error!("{:?}", error);
}
_ => error!("{:?}", err),
},
}, },
Err(err) => { Err(err) => error!("{:?}", err),
error!("{:?}", err);
}
} }
drop(lock); drop(lock);
things results
} }
} }
@ -224,3 +242,4 @@ pub async fn connect(config: &Config) -> surrealdb::Result<Surreal<Client>> {
Ok(db) Ok(db)
} }

View File

@ -3,10 +3,7 @@
extern crate html5ever; extern crate html5ever;
use std::{ use std::{
fs::File, collections::HashSet, fs::File, io::Read, net::{IpAddr, Ipv4Addr}, time::Instant
io::Read,
net::{IpAddr, Ipv4Addr},
time::Instant,
}; };
use db::{connect, Website}; use db::{connect, Website};
@ -175,6 +172,7 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
// METRICS // METRICS
let g = gauge!(GET_IN_FLIGHT); let g = gauge!(GET_IN_FLIGHT);
g.increment(1); g.increment(1);
counter!(GET_METRIC).increment(1);
let timer = Timer::start("Got page"); let timer = Timer::start("Got page");
// Send the http request (get) // Send the http request (get)
@ -183,7 +181,6 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
// METRICS // METRICS
timer.stop(); timer.stop();
g.decrement(1); g.decrement(1);
counter!(GET_METRIC).increment(1);
// Get body from response // Get body from response
let data = response let data = response
@ -198,11 +195,18 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
// update self in db // update self in db
site.set_crawled(); site.set_crawled();
Website::store_all(vec![site.clone()], &db).await; site.store_self(&db).await;
// de duplicate this list
let set = sites.iter().fold(HashSet::new(), |mut set, item| {
// TODO seems kinda dumb to clone everything.
set.insert(item.clone());
set
});
trace!("Shrunk items to store from {} to {}", sites.len(), set.len());
// Store all the other sites so that we can link to them. // Store all the other sites so that we can link to them.
// let mut links_to = Vec::new(); let others = Website::store_all(set, &db).await;
let others = Website::store_all(sites, &db).await;
// Make the database's links reflect the html links between sites // Make the database's links reflect the html links between sites
site.links_to(others, &db).await; site.links_to(others, &db).await;

View File

@ -5,5 +5,4 @@ DEFINE INDEX IF NOT EXISTS idx ON TABLE website COLUMNS site UNIQUE;
DEFINE FIELD IF NOT EXISTS crawled ON TABLE website TYPE bool; DEFINE FIELD IF NOT EXISTS crawled ON TABLE website TYPE bool;
DEFINE FIELD IF NOT EXISTS accessed_at ON TABLE website VALUE time::now(); DEFINE FIELD IF NOT EXISTS created ON TABLE website VALUE time::now();
DEFINE FIELD IF NOT EXISTS first_accessed_at ON TABLE website VALUE time::now();