foss_storage #3
							
								
								
									
										63
									
								
								src/main.rs
									
									
									
									
									
								
							
							
						
						
									
										63
									
								
								src/main.rs
									
									
									
									
									
								
							@@ -24,6 +24,8 @@ const GET_IN_FLIGHT: &str = "gets_in_flight";
 | 
			
		||||
const SITES_CRAWLED: &str = "pages_crawled";
 | 
			
		||||
const BEING_PROCESSED: &str = "pages_being_processed";
 | 
			
		||||
 | 
			
		||||
const BATCH_SIZE: usize = 2;
 | 
			
		||||
 | 
			
		||||
#[derive(Deserialize)]
 | 
			
		||||
struct Config {
 | 
			
		||||
    surreal_ns: String,
 | 
			
		||||
@@ -109,13 +111,7 @@ async fn main() {
 | 
			
		||||
    let span = trace_span!("Loop");
 | 
			
		||||
    let span = span.enter();
 | 
			
		||||
    while crawled < config.budget {
 | 
			
		||||
        let get_num = if config.budget - crawled < 100 {
 | 
			
		||||
            config.budget - crawled
 | 
			
		||||
        } else {
 | 
			
		||||
            100
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let uncrawled = get_uncrawled_links(&db, get_num, config.crawl_filter.clone()).await;
 | 
			
		||||
        let uncrawled = get_uncrawled_links(&db, config.budget - crawled, config.crawl_filter.clone()).await;
 | 
			
		||||
        if uncrawled.is_empty() {
 | 
			
		||||
            info!("Had more budget but finished crawling everything.");
 | 
			
		||||
            return;
 | 
			
		||||
@@ -170,39 +166,44 @@ async fn process(mut site: Website, db: Surreal<Client>, reqwest: reqwest::Clien
 | 
			
		||||
    // Send the http request (get)
 | 
			
		||||
    if let Ok(response) = request_builder.send().await {
 | 
			
		||||
 | 
			
		||||
        // METRICS
 | 
			
		||||
        g.decrement(1);
 | 
			
		||||
        counter!(GET_METRIC).increment(1);
 | 
			
		||||
 | 
			
		||||
        // TODO if this will fail if the object we are downloading is
 | 
			
		||||
        // larger than the memory of the device it's running on.
 | 
			
		||||
        // We should store it *as* we download it then parse it in-place.
 | 
			
		||||
        // Get body from response
 | 
			
		||||
        let data = response
 | 
			
		||||
            .text()
 | 
			
		||||
            .await
 | 
			
		||||
            .expect("Failed to read http response's body!");
 | 
			
		||||
 | 
			
		||||
        // Store document
 | 
			
		||||
        filesystem::store(&data, &site.site).await;
 | 
			
		||||
        // METRICS
 | 
			
		||||
        g.decrement(1);
 | 
			
		||||
        counter!(GET_METRIC).increment(1);
 | 
			
		||||
 | 
			
		||||
        // Parse document and get relationships
 | 
			
		||||
        let sites = parser::parse(&site, &data).await;
 | 
			
		||||
        // Store document
 | 
			
		||||
        let should_parse = filesystem::store(&data, &site.site).await;
 | 
			
		||||
 | 
			
		||||
        if should_parse {
 | 
			
		||||
            // Parse document and get relationships
 | 
			
		||||
            let sites = parser::parse(&site, &data).await;
 | 
			
		||||
        
 | 
			
		||||
            // De-duplicate this list
 | 
			
		||||
            let prev_len = sites.len();
 | 
			
		||||
            let set = sites.into_iter().fold(HashSet::new(), |mut set,item| {
 | 
			
		||||
                set.insert(item);
 | 
			
		||||
                set
 | 
			
		||||
            });
 | 
			
		||||
            let de_dupe_sites: Vec<Website> = set.into_iter().collect();
 | 
			
		||||
            let diff = prev_len - de_dupe_sites.len();
 | 
			
		||||
            trace!("Saved {diff} from being entered into the db by de-duping");
 | 
			
		||||
 | 
			
		||||
            // Store all the other sites so that we can link to them.
 | 
			
		||||
            let _ = Website::store_all(de_dupe_sites, &db).await;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // update self in db
 | 
			
		||||
        site.set_crawled();
 | 
			
		||||
        Website::store_all(vec![site], &db).await;
 | 
			
		||||
 | 
			
		||||
        // De-duplicate this list
 | 
			
		||||
        let prev_len = sites.len();
 | 
			
		||||
        let set = sites.into_iter().fold(HashSet::new(), |mut set,item| {
 | 
			
		||||
            set.insert(item);
 | 
			
		||||
            set
 | 
			
		||||
        });
 | 
			
		||||
        let de_dupe_sites: Vec<Website> = set.into_iter().collect();
 | 
			
		||||
        let diff = prev_len - de_dupe_sites.len();
 | 
			
		||||
        trace!("Saved {diff} from being entered into the db by de-duping");
 | 
			
		||||
 | 
			
		||||
        // Store all the other sites so that we can link to them.
 | 
			
		||||
        let _ = Website::store_all(de_dupe_sites, &db).await;
 | 
			
		||||
 | 
			
		||||
    } else {
 | 
			
		||||
        error!("Failed to get: {}", &site.site);
 | 
			
		||||
    }
 | 
			
		||||
@@ -215,9 +216,11 @@ async fn get_uncrawled_links(
 | 
			
		||||
    mut count: usize,
 | 
			
		||||
    filter: String,
 | 
			
		||||
) -> Vec<Website> {
 | 
			
		||||
    if count > 100 {
 | 
			
		||||
        count = 100
 | 
			
		||||
 | 
			
		||||
    if count > BATCH_SIZE {
 | 
			
		||||
        count = BATCH_SIZE;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    debug!("Getting uncrawled links");
 | 
			
		||||
 | 
			
		||||
    let mut response = db
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user