Skip to content

Commit

Permalink
Add garbage collection
Browse files Browse the repository at this point in the history
We should normally NEVER have to garbage collect (meaning remogving
chunks from the storage backend that don't exist in meta db). However,
due to bugs or manual database management, this can occur.
  • Loading branch information
billyb2 committed Apr 26, 2024
1 parent 4d1e168 commit 522457b
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 16 deletions.
36 changes: 21 additions & 15 deletions src/chunk_db/file.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;
use std::{collections::HashSet, sync::Arc};

use bfsp::ChunkID;
use tokio::fs;

use crate::meta_db::MetaDB;
Expand All @@ -13,11 +14,7 @@ impl ChunkDB for FSChunkDB {
Ok(Self)
}

async fn get_chunk(
&self,
chunk_id: &bfsp::ChunkID,
user_id: i64,
) -> anyhow::Result<Option<Vec<u8>>> {
async fn get_chunk(&self, chunk_id: &ChunkID, user_id: i64) -> anyhow::Result<Option<Vec<u8>>> {
let path = Self::get_path(chunk_id, user_id).await;
match fs::read(path).await {
Ok(data) => Ok(Some(data)),
Expand All @@ -28,25 +25,20 @@ impl ChunkDB for FSChunkDB {
}
}

async fn put_chunk(
&self,
chunk_id: &bfsp::ChunkID,
user_id: i64,
data: &[u8],
) -> anyhow::Result<()> {
async fn put_chunk(&self, chunk_id: &ChunkID, user_id: i64, data: &[u8]) -> anyhow::Result<()> {
let path = Self::get_path(chunk_id, user_id).await;
fs::write(path, data).await?;
Ok(())
}

async fn delete_chunk(&self, chunk_id: &bfsp::ChunkID, user_id: i64) -> anyhow::Result<()> {
async fn delete_chunk(&self, chunk_id: &ChunkID, user_id: i64) -> anyhow::Result<()> {
let path = Self::get_path(chunk_id, user_id).await;
fs::remove_file(path).await?;

Ok(())
}

async fn get_path(chunk_id: &bfsp::ChunkID, user_id: i64) -> String {
async fn get_path(chunk_id: &ChunkID, user_id: i64) -> String {
let mut path = format!("./chunks/{user_id}/");
fs::create_dir_all(&path).await.unwrap();

Expand All @@ -55,6 +47,20 @@ impl ChunkDB for FSChunkDB {
}

async fn garbage_collect(&self, meta_db: Arc<impl MetaDB>) -> anyhow::Result<()> {
todo!()
let chunk_ids = meta_db.list_all_chunk_ids().await?;
let chunk_ids = chunk_ids
.iter()
.map(|c| c.id.clone())
.collect::<HashSet<_>>();

let mut files = fs::read_dir("./chunks").await?;
while let Some(file) = files.next_entry().await? {
let file_name = file.file_name().into_string().unwrap();
if !chunk_ids.contains(&ChunkID::try_from(file_name.as_str()).unwrap().id) {
fs::remove_file(file.path()).await?;
}
}

Ok(())
}
}
5 changes: 5 additions & 0 deletions src/chunk_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ pub trait ChunkDB: Sized + Send + Sync {
user_id: i64,
) -> impl Future<Output = anyhow::Result<()>> + Send;
fn get_path(chunk_id: &ChunkID, user_id: i64) -> impl Future<Output = String> + Send;
/// Garbage collect the chunk db, making sure that any differences between meta_db and the storage backend are resolved.
fn garbage_collect(
&self,
meta_db: Arc<impl MetaDB>,
) -> impl Future<Output = anyhow::Result<()>> + Send;
}
43 changes: 43 additions & 0 deletions src/chunk_db/s3.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::sync::Arc;

use bfsp::ChunkID;
use s3::{creds::Credentials, Bucket, Region};

use crate::meta_db::MetaDB;

use super::ChunkDB;

pub struct S3ChunkDB {
Expand Down Expand Up @@ -60,4 +64,43 @@ impl ChunkDB for S3ChunkDB {
async fn get_path(chunk_id: &ChunkID, user_id: i64) -> String {
format!("/{}/{}", user_id, chunk_id)
}

async fn garbage_collect(&self, meta_db: Arc<impl MetaDB>) -> anyhow::Result<()> {
let s3_chunks = self
.bucket
.list("".to_string(), None)
.await?
.into_iter()
.flat_map(|o| {
o.contents
.iter()
.map(|c| c.key.clone())
.collect::<Vec<String>>()
});

let chunk_ids = meta_db.list_all_chunk_ids().await?;

let futures = s3_chunks
.filter(|c| {
let path = c.split('/').last().unwrap();
let chunk_id = ChunkID::try_from(path).unwrap();

!chunk_ids.contains(&chunk_id)
})
.map(|path| {
let bucket = self.bucket.clone();
let path = path.clone();

tokio::task::spawn(async move { bucket.delete_object(path).await })
});

for f in futures {
match f.await.unwrap() {
Ok(_) => log::info!("Garbage collected chunk!"),
Err(err) => log::error!("Failed to garbage collect chunk: {:?}", err),
}
}

Ok(())
}
}
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ async fn main() -> Result<()> {
#[cfg(not(debug_assertions))]
let chunk_db = Arc::new(S3ChunkDB::new().unwrap());

chunk_db.garbage_collect(meta_db.clone()).await?;

info!("Starting server!");

let addr = match env::var("FLY_APP_NAME").is_ok() {
Expand Down
17 changes: 17 additions & 0 deletions src/meta_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub trait MetaDB: Sized + Send + Sync {
chunk_ids: HashSet<ChunkID>,
user_id: i64,
) -> impl Future<Output = Result<HashMap<ChunkID, ChunkMetadata>>> + Send;
fn list_all_chunk_ids(&self) -> impl Future<Output = Result<HashSet<ChunkID>>> + Send;
}

pub struct PostgresMetaDB {
Expand Down Expand Up @@ -305,4 +306,20 @@ impl MetaDB for PostgresMetaDB {

Ok(chunk_meta)
}

fn list_all_chunk_ids(&self) -> impl Future<Output = Result<HashSet<ChunkID>>> + Send {
async move {
let chunk_meta: HashSet<_> = sqlx::query("select id from chunks")
.fetch_all(&self.pool)
.await?
.into_iter()
.map(|row| {
let id: String = row.get("id");
ChunkID::try_from(id.as_str()).unwrap()
})
.collect();

Ok(chunk_meta)
}
}
}
2 changes: 1 addition & 1 deletion src/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rcgen::{Certificate, CertificateParams, DistinguishedName};

use instant_acme::{
Account, AccountCredentials, AuthorizationStatus, ChallengeType, Identifier, NewOrder,
OrderState, OrderStatus,
OrderStatus,
};
use sqlx::Row;
use tokio::sync::RwLock;
Expand Down

0 comments on commit 522457b

Please sign in to comment.