not yet
This commit is contained in:
parent
b9c1f0b492
commit
1b9964a938
38
Cargo.lock
generated
38
Cargo.lock
generated
@ -1961,6 +1961,25 @@ dependencies = [
|
|||||||
"generic-array",
|
"generic-array",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "internet_mapper"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"base64 0.22.1",
|
||||||
|
"html5ever 0.29.1",
|
||||||
|
"metrics",
|
||||||
|
"metrics-exporter-prometheus",
|
||||||
|
"minio",
|
||||||
|
"reqwest",
|
||||||
|
"serde",
|
||||||
|
"surrealdb",
|
||||||
|
"tokio",
|
||||||
|
"toml",
|
||||||
|
"tracing",
|
||||||
|
"tracing-subscriber",
|
||||||
|
"url",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ipnet"
|
name = "ipnet"
|
||||||
version = "2.11.0"
|
version = "2.11.0"
|
||||||
@ -4112,25 +4131,6 @@ version = "2.6.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
|
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "surreal_spider"
|
|
||||||
version = "0.1.0"
|
|
||||||
dependencies = [
|
|
||||||
"base64 0.22.1",
|
|
||||||
"html5ever 0.29.1",
|
|
||||||
"metrics",
|
|
||||||
"metrics-exporter-prometheus",
|
|
||||||
"minio",
|
|
||||||
"reqwest",
|
|
||||||
"serde",
|
|
||||||
"surrealdb",
|
|
||||||
"tokio",
|
|
||||||
"toml",
|
|
||||||
"tracing",
|
|
||||||
"tracing-subscriber",
|
|
||||||
"url",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "surrealdb"
|
name = "surrealdb"
|
||||||
version = "2.2.1"
|
version = "2.2.1"
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "surreal_spider"
|
name = "internet_mapper"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
|
@ -3,13 +3,13 @@ surreal_url = "localhost:8000"
|
|||||||
surreal_username = "root"
|
surreal_username = "root"
|
||||||
surreal_password = "root"
|
surreal_password = "root"
|
||||||
surreal_ns = "test"
|
surreal_ns = "test"
|
||||||
surreal_db = "v1.15.4"
|
surreal_db = "v1.16"
|
||||||
|
|
||||||
# Minio config
|
# Minio config
|
||||||
s3_bucket = "v1.15.4"
|
s3_bucket = "v1.16"
|
||||||
s3_url = "http://localhost:9000"
|
s3_url = "http://localhost:9000"
|
||||||
s3_access_key = "3ptjsHhRHCHlpCmgFy9n"
|
s3_access_key = "DwJfDDVIbmCmfAblwSqp"
|
||||||
s3_secret_key = "68CmV07YExeCxb8kJhosSauEizj5CAE7PINZIfQz"
|
s3_secret_key = "V4UqvC1Vm4AwLE5FAhu2gxlMfvexTBQnDxuy8uZx"
|
||||||
|
|
||||||
# Crawler config
|
# Crawler config
|
||||||
crawl_filter = "en.wikipedia.com"
|
crawl_filter = "en.wikipedia.com"
|
||||||
|
93
src/db.rs
93
src/db.rs
@ -5,7 +5,7 @@ use base64::{
|
|||||||
};
|
};
|
||||||
use metrics::counter;
|
use metrics::counter;
|
||||||
use serde::{ser::SerializeStruct, Deserialize, Serialize};
|
use serde::{ser::SerializeStruct, Deserialize, Serialize};
|
||||||
use std::{fmt::Debug, sync::LazyLock, time::Instant};
|
use std::{collections::HashSet, fmt::Debug, sync::LazyLock, time::Instant};
|
||||||
use surrealdb::{
|
use surrealdb::{
|
||||||
engine::remote::ws::{Client, Ws},
|
engine::remote::ws::{Client, Ws},
|
||||||
opt::auth::Root,
|
opt::auth::Root,
|
||||||
@ -29,7 +29,7 @@ const TIME_SPENT_ON_LOCK: &'static str = "surql_lock_waiting_ms";
|
|||||||
const STORE: &'static str = "surql_store_calls";
|
const STORE: &'static str = "surql_store_calls";
|
||||||
const LINK: &'static str = "surql_link_calls";
|
const LINK: &'static str = "surql_link_calls";
|
||||||
|
|
||||||
#[derive(Deserialize, Clone)]
|
#[derive(Deserialize, Clone, Hash, Eq, PartialEq)]
|
||||||
pub struct Website {
|
pub struct Website {
|
||||||
/// The url that this data is found at
|
/// The url that this data is found at
|
||||||
pub site: Url,
|
pub site: Url,
|
||||||
@ -40,12 +40,13 @@ pub struct Website {
|
|||||||
impl Serialize for Website {
|
impl Serialize for Website {
|
||||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
where
|
where
|
||||||
S: serde::Serializer {
|
S: serde::Serializer,
|
||||||
let mut state = serializer.serialize_struct("Website", 2)?;
|
{
|
||||||
state.serialize_field("crawled", &self.crawled)?;
|
let mut state = serializer.serialize_struct("Website", 2)?;
|
||||||
// to_string() calls the correct naming of site
|
state.serialize_field("crawled", &self.crawled)?;
|
||||||
state.serialize_field("site", &self.site.to_string())?;
|
// to_string() calls the correct naming of site
|
||||||
state.end()
|
state.serialize_field("site", &self.site.to_string())?;
|
||||||
|
state.end()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -63,10 +64,7 @@ impl Website {
|
|||||||
Ok(a) => a,
|
Ok(a) => a,
|
||||||
Err(_) => todo!(),
|
Err(_) => todo!(),
|
||||||
};
|
};
|
||||||
Self {
|
Self { crawled, site }
|
||||||
crawled,
|
|
||||||
site
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_crawled(&mut self) {
|
pub fn set_crawled(&mut self) {
|
||||||
@ -78,7 +76,10 @@ impl Website {
|
|||||||
let domain = match site.domain() {
|
let domain = match site.domain() {
|
||||||
Some(s) => s.to_string(),
|
Some(s) => s.to_string(),
|
||||||
None => {
|
None => {
|
||||||
warn!("Failed to get domain of URL: {}, falling back to 'localhost'", site.to_string());
|
warn!(
|
||||||
|
"Failed to get domain of URL: {}, falling back to 'localhost'",
|
||||||
|
site.to_string()
|
||||||
|
);
|
||||||
"localhost".to_string()
|
"localhost".to_string()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -86,6 +87,7 @@ impl Website {
|
|||||||
|
|
||||||
domain + path
|
domain + path
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_url_as_b64_path(site: &Url) -> String {
|
pub fn get_url_as_b64_path(site: &Url) -> String {
|
||||||
let domain = site.domain().unwrap_or("DOMAIN").to_string();
|
let domain = site.domain().unwrap_or("DOMAIN").to_string();
|
||||||
let path = &CUSTOM_ENGINE.encode(site.path());
|
let path = &CUSTOM_ENGINE.encode(site.path());
|
||||||
@ -99,7 +101,7 @@ impl Website {
|
|||||||
if len == 0 {
|
if len == 0 {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let from = &self.site;
|
let from = &self.site;
|
||||||
|
|
||||||
// let to = other.site.to_string();
|
// let to = other.site.to_string();
|
||||||
@ -126,7 +128,10 @@ impl Website {
|
|||||||
trace!("Link for {from} OK - {num}/{len}");
|
trace!("Link for {from} OK - {num}/{len}");
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
error!("Didn't link all the records. {num}/{len}. Surreal response: {:?}", e);
|
error!(
|
||||||
|
"Didn't link all the records. {num}/{len}. Surreal response: {:?}",
|
||||||
|
e
|
||||||
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -139,48 +144,61 @@ impl Website {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn store_self(&self, db: &Surreal<Client>) {
|
||||||
|
counter!(STORE).increment(1);
|
||||||
|
|
||||||
|
db.query(
|
||||||
|
"INSERT INTO website $self
|
||||||
|
ON DUPLICATE KEY UPDATE
|
||||||
|
crawled = crawled OR $input.crawled
|
||||||
|
RETURN VALUE id;
|
||||||
|
",
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("Failed to store self");
|
||||||
|
}
|
||||||
|
|
||||||
// Insert ever item in the vec into surreal, crawled state will be preserved as TRUE
|
// Insert ever item in the vec into surreal, crawled state will be preserved as TRUE
|
||||||
// if already in the database as such or incoming data is TRUE.
|
// if already in the database as such or incoming data is TRUE.
|
||||||
pub async fn store_all(all: Vec<Self>, db: &Surreal<Client>) -> Vec<Thing> {
|
pub async fn store_all(all: HashSet<Self>, db: &Surreal<Client>) -> Vec<Thing> {
|
||||||
|
// NOTES:
|
||||||
|
// * all incoming Websites come in as !crawled
|
||||||
|
// * there are potentially duplicates in all
|
||||||
|
|
||||||
counter!(STORE).increment(1);
|
counter!(STORE).increment(1);
|
||||||
let mut things = Vec::with_capacity(all.len());
|
|
||||||
|
|
||||||
// TODO this only allows for one thread to be in the database at a time.
|
|
||||||
// This is currently required since otherwise we get write errors.
|
|
||||||
// If the default `crawled` is set to false, we might not need to write any more
|
|
||||||
// than just the name. `accessed_at` is fun but not needed.
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let lock = LOCK.lock().await;
|
let lock = LOCK.lock().await;
|
||||||
counter!(TIME_SPENT_ON_LOCK).increment(now.elapsed().as_millis() as u64);
|
counter!(TIME_SPENT_ON_LOCK).increment(now.elapsed().as_millis() as u64);
|
||||||
|
|
||||||
|
let mut results = Vec::with_capacity(all.len());
|
||||||
match db
|
match db
|
||||||
.query(
|
.query(
|
||||||
|
// TODO making this an upsert would make sense, but
|
||||||
|
// upserts seem to be broken.
|
||||||
|
//
|
||||||
|
// Doesn't look like upsert can take in an array, so insert
|
||||||
|
// it is...
|
||||||
|
//
|
||||||
"INSERT INTO website $array
|
"INSERT INTO website $array
|
||||||
ON DUPLICATE KEY UPDATE
|
ON DUPLICATE KEY UPDATE
|
||||||
accessed_at = time::now(),
|
last_write = time::now()
|
||||||
crawled = crawled OR $input.crawled
|
RETURN VALUE id;"
|
||||||
RETURN VALUE id;
|
,
|
||||||
",
|
|
||||||
)
|
)
|
||||||
.bind(("array", all))
|
.bind(("array", all))
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(mut id) => match id.take::<Vec<Thing>>(0) {
|
Ok(mut id) => match id.take::<Vec<Thing>>(0) {
|
||||||
Ok(mut x) => things.append(&mut x),
|
Ok(mut x) => results.append(&mut x),
|
||||||
Err(err) => match err {
|
Err(err) => {
|
||||||
Api(error) => {
|
error!("{:?}", err);
|
||||||
eprintln!("{:?}", error);
|
}
|
||||||
error!("{:?}", error);
|
|
||||||
}
|
|
||||||
_ => error!("{:?}", err),
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
Err(err) => {
|
Err(err) => error!("{:?}", err),
|
||||||
error!("{:?}", err);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
drop(lock);
|
drop(lock);
|
||||||
things
|
results
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -224,3 +242,4 @@ pub async fn connect(config: &Config) -> surrealdb::Result<Surreal<Client>> {
|
|||||||
|
|
||||||
Ok(db)
|
Ok(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
20
src/main.rs
20
src/main.rs
@ -3,10 +3,7 @@
|
|||||||
extern crate html5ever;
|
extern crate html5ever;
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
fs::File,
|
collections::HashSet, fs::File, io::Read, net::{IpAddr, Ipv4Addr}, time::Instant
|
||||||
io::Read,
|
|
||||||
net::{IpAddr, Ipv4Addr},
|
|
||||||
time::Instant,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use db::{connect, Website};
|
use db::{connect, Website};
|
||||||
@ -175,6 +172,7 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
|
|||||||
// METRICS
|
// METRICS
|
||||||
let g = gauge!(GET_IN_FLIGHT);
|
let g = gauge!(GET_IN_FLIGHT);
|
||||||
g.increment(1);
|
g.increment(1);
|
||||||
|
counter!(GET_METRIC).increment(1);
|
||||||
let timer = Timer::start("Got page");
|
let timer = Timer::start("Got page");
|
||||||
|
|
||||||
// Send the http request (get)
|
// Send the http request (get)
|
||||||
@ -183,7 +181,6 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
|
|||||||
// METRICS
|
// METRICS
|
||||||
timer.stop();
|
timer.stop();
|
||||||
g.decrement(1);
|
g.decrement(1);
|
||||||
counter!(GET_METRIC).increment(1);
|
|
||||||
|
|
||||||
// Get body from response
|
// Get body from response
|
||||||
let data = response
|
let data = response
|
||||||
@ -198,11 +195,18 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
|
|||||||
|
|
||||||
// update self in db
|
// update self in db
|
||||||
site.set_crawled();
|
site.set_crawled();
|
||||||
Website::store_all(vec![site.clone()], &db).await;
|
site.store_self(&db).await;
|
||||||
|
|
||||||
|
// de duplicate this list
|
||||||
|
let set = sites.iter().fold(HashSet::new(), |mut set, item| {
|
||||||
|
// TODO seems kinda dumb to clone everything.
|
||||||
|
set.insert(item.clone());
|
||||||
|
set
|
||||||
|
});
|
||||||
|
trace!("Shrunk items to store from {} to {}", sites.len(), set.len());
|
||||||
|
|
||||||
// Store all the other sites so that we can link to them.
|
// Store all the other sites so that we can link to them.
|
||||||
// let mut links_to = Vec::new();
|
let others = Website::store_all(set, &db).await;
|
||||||
let others = Website::store_all(sites, &db).await;
|
|
||||||
|
|
||||||
// Make the database's links reflect the html links between sites
|
// Make the database's links reflect the html links between sites
|
||||||
site.links_to(others, &db).await;
|
site.links_to(others, &db).await;
|
||||||
|
@ -5,5 +5,4 @@ DEFINE INDEX IF NOT EXISTS idx ON TABLE website COLUMNS site UNIQUE;
|
|||||||
|
|
||||||
DEFINE FIELD IF NOT EXISTS crawled ON TABLE website TYPE bool;
|
DEFINE FIELD IF NOT EXISTS crawled ON TABLE website TYPE bool;
|
||||||
|
|
||||||
DEFINE FIELD IF NOT EXISTS accessed_at ON TABLE website VALUE time::now();
|
DEFINE FIELD IF NOT EXISTS created ON TABLE website VALUE time::now();
|
||||||
DEFINE FIELD IF NOT EXISTS first_accessed_at ON TABLE website VALUE time::now();
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user