Skip to content

Commit

Permalink
refactor: Use opendal to replace object_store
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Nov 9, 2024
1 parent 67cb2a2 commit 7d04759
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 129 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ with-db = ["dep:sea-orm", "dep:sea-orm-migration", "loco-gen/with-db"]
channels = ["dep:socketioxide"]
# Storage features
all_storage = ["storage_aws_s3", "storage_azure", "storage_gcp"]
storage_aws_s3 = ["object_store/aws"]
storage_azure = ["object_store/azure"]
storage_gcp = ["object_store/gcp"]
storage_aws_s3 = ["opendal/services-s3"]
storage_azure = ["opendal/services-azblob"]
storage_gcp = ["opendal/services-gcs"]
# Cache feature
cache_inmem = ["dep:moka"]
bg_redis = ["dep:rusty-sidekiq", "dep:bb8"]
Expand Down Expand Up @@ -125,7 +125,7 @@ socketioxide = { version = "0.14.0", features = ["state"], optional = true }


# File Upload
object_store = { version = "0.11.0", default-features = false }
opendal = { version = "0.50.2", default-features = false,features = ["services-memory","services-fs"] }

# cache
moka = { version = "0.12.7", features = ["sync"], optional = true }
Expand Down
60 changes: 19 additions & 41 deletions src/storage/drivers/aws.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
#[cfg(test)]
use core::time::Duration;
use std::sync::Arc;

use object_store::{
aws::{AmazonS3Builder, AwsCredential},
StaticCredentialProvider,
};
#[cfg(test)]
use object_store::{BackoffConfig, RetryConfig};
use opendal::services::S3;
use opendal::Operator;

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use crate::Result;
use super::{opendal_adapter::OpendalAdapter, StoreDriver};
use crate::storage::StorageResult;

/// A set of AWS security credentials
#[derive(Debug)]
Expand All @@ -34,14 +26,10 @@ pub struct Credential {
/// # Errors
///
/// When could not initialize the client instance
pub fn new(bucket_name: &str, region: &str) -> Result<Box<dyn StoreDriver>> {
let s3 = AmazonS3Builder::new()
.with_bucket_name(bucket_name)
.with_region(region)
.build()
.map_err(Box::from)?;
pub fn new(bucket_name: &str, region: &str) -> StorageResult<Box<dyn StoreDriver>> {
let s3 = S3::default().bucket(bucket_name).region(region);

Ok(Box::new(ObjectStoreAdapter::new(Box::new(s3))))
Ok(Box::new(OpendalAdapter::new(Operator::new(s3)?.finish())))
}

/// Create new AWS s3 storage with bucket, region and credentials.
Expand All @@ -64,18 +52,16 @@ pub fn with_credentials(
bucket_name: &str,
region: &str,
credentials: Credential,
) -> Result<Box<dyn StoreDriver>> {
let s3 = AmazonS3Builder::new()
.with_bucket_name(bucket_name)
.with_region(region)
.with_credentials(Arc::new(StaticCredentialProvider::new(AwsCredential {
key_id: credentials.key_id.to_string(),
secret_key: credentials.secret_key.to_string(),
token: credentials.token,
})))
.build()
.map_err(Box::from)?;
Ok(Box::new(ObjectStoreAdapter::new(Box::new(s3))))
) -> StorageResult<Box<dyn StoreDriver>> {
let mut s3 = S3::default()
.bucket(bucket_name)
.region(region)
.access_key_id(&credentials.key_id)
.secret_access_key(&credentials.secret_key);
if let Some(token) = credentials.token {
s3 = s3.session_token(&token);
}
Ok(Box::new(OpendalAdapter::new(Operator::new(s3)?.finish())))
}

/// Build store with failure
Expand All @@ -86,15 +72,7 @@ pub fn with_credentials(
#[cfg(test)]
#[must_use]
pub fn with_failure() -> Box<dyn StoreDriver> {
let s3 = AmazonS3Builder::new()
.with_bucket_name("loco-test")
.with_retry(RetryConfig {
backoff: BackoffConfig::default(),
max_retries: 0,
retry_timeout: Duration::from_secs(0),
})
.build()
.unwrap();
let s3 = S3::default().bucket("loco-test");

Box::new(ObjectStoreAdapter::new(Box::new(s3)))
Box::new(OpendalAdapter::new(Operator::new(s3).unwrap().finish()))
}
24 changes: 13 additions & 11 deletions src/storage/drivers/azure.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use object_store::azure::MicrosoftAzureBuilder;
use opendal::services::Azblob;
use opendal::Operator;

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use crate::Result;
use super::StoreDriver;
use crate::storage::drivers::opendal_adapter::OpendalAdapter;
use crate::storage::StorageResult;

/// Create new Azure storage.
///
Expand All @@ -18,13 +20,13 @@ pub fn new(
container_name: &str,
account_name: &str,
access_key: &str,
) -> Result<Box<dyn StoreDriver>> {
let azure = MicrosoftAzureBuilder::new()
.with_container_name(container_name)
.with_account(account_name)
.with_access_key(access_key)
.build()
.map_err(Box::from)?;
) -> StorageResult<Box<dyn StoreDriver>> {
let azure = Azblob::default()
.container(container_name)
.account_name(account_name)
.account_key(access_key);

Ok(Box::new(ObjectStoreAdapter::new(Box::new(azure))))
Ok(Box::new(OpendalAdapter::new(
Operator::new(azure)?.finish(),
)))
}
27 changes: 11 additions & 16 deletions src/storage/drivers/gcp.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,25 @@
use object_store::gcp::GoogleCloudStorageBuilder;
use opendal::services::Gcs;
use opendal::Operator;

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use crate::Result;
use super::StoreDriver;
use crate::storage::drivers::opendal_adapter::OpendalAdapter;
use crate::storage::StorageResult;

/// Create new GCP storage.
///
/// # Examples
///```
/// use loco_rs::storage::drivers::gcp;
/// let gcp_driver = gcp::new("key", "account_key", "service_account");
/// let gcp_driver = gcp::new("key", "credential_path");
/// ```
///
/// # Errors
///
/// When could not initialize the client instance
pub fn new(
bucket_name: &str,
service_account_key: &str,
service_account: &str,
) -> Result<Box<dyn StoreDriver>> {
let gcs = GoogleCloudStorageBuilder::new()
.with_bucket_name(bucket_name)
.with_service_account_key(service_account_key)
.with_service_account_path(service_account)
.build()
.map_err(Box::from)?;
pub fn new(bucket_name: &str, credential_path: &str) -> StorageResult<Box<dyn StoreDriver>> {
let gcs = Gcs::default()
.bucket(bucket_name)
.credential_path(credential_path);

Ok(Box::new(ObjectStoreAdapter::new(Box::new(gcs))))
Ok(Box::new(OpendalAdapter::new(Operator::new(gcs)?.finish())))
}
22 changes: 14 additions & 8 deletions src/storage/drivers/local.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use object_store::local::LocalFileSystem;
use opendal::services::Fs;
use opendal::Operator;

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use crate::Result;
use super::StoreDriver;
use crate::storage::drivers::opendal_adapter::OpendalAdapter;
use crate::storage::StorageResult;

/// Create new filesystem storage with no prefix
///
Expand All @@ -12,7 +14,12 @@ use crate::Result;
/// ```
#[must_use]
pub fn new() -> Box<dyn StoreDriver> {
Box::new(ObjectStoreAdapter::new(Box::new(LocalFileSystem::new())))
let fs = Fs::default();
Box::new(OpendalAdapter::new(
Operator::new(fs)
.expect("fs service should build with success")
.finish(),
))
}

/// Create new filesystem storage with `prefix` applied to all paths
Expand All @@ -26,8 +33,7 @@ pub fn new() -> Box<dyn StoreDriver> {
/// # Errors
///
/// Returns an error if the path does not exist
pub fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> Result<Box<dyn StoreDriver>> {
Ok(Box::new(ObjectStoreAdapter::new(Box::new(
LocalFileSystem::new_with_prefix(prefix).map_err(Box::from)?,
))))
pub fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> StorageResult<Box<dyn StoreDriver>> {
let fs = Fs::default().root(&prefix.as_ref().display().to_string());
Ok(Box::new(OpendalAdapter::new(Operator::new(fs)?.finish())))
}
13 changes: 9 additions & 4 deletions src/storage/drivers/mem.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use object_store::memory::InMemory;

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use super::StoreDriver;
use crate::storage::drivers::opendal_adapter::OpendalAdapter;
use opendal::services::Memory;
use opendal::Operator;

/// Create new in-memory storage.
///
Expand All @@ -11,5 +12,9 @@ use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
/// ```
#[must_use]
pub fn new() -> Box<dyn StoreDriver> {
Box::new(ObjectStoreAdapter::new(Box::new(InMemory::new())))
Box::new(OpendalAdapter::new(
Operator::new(Memory::default())
.expect("memory service must build with success")
.finish(),
))
}
24 changes: 20 additions & 4 deletions src/storage/drivers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::path::Path;

use async_trait::async_trait;
use bytes::Bytes;
use opendal::Reader;

#[cfg(feature = "storage_aws_s3")]
pub mod aws;
#[cfg(feature = "storage_azure")]
Expand All @@ -11,7 +13,7 @@ pub mod gcp;
pub mod local;
pub mod mem;
pub mod null;
pub mod object_store_adapter;
pub mod opendal_adapter;

use super::StorageResult;

Expand All @@ -21,9 +23,23 @@ pub struct UploadResponse {
pub version: Option<String>,
}

// TODO: need to properly abstract the object_store type in order to not
// strongly depend on it
pub type GetResponse = object_store::GetResult;
/// TODO: Add more methods to `GetResponse` to read the content in different ways
///
/// For example, we can read a specific range of bytes from the stream.
pub struct GetResponse {
stream: Reader,
}

impl GetResponse {
pub(crate) fn new(stream: Reader) -> Self {
Self { stream }
}

/// Read all content from the stream and return as `Bytes`.
pub async fn bytes(&self) -> StorageResult<Bytes> {
Ok(self.stream.read(..).await?.to_bytes())
}
}

#[async_trait]
pub trait StoreDriver: Sync + Send {
Expand Down
Loading

0 comments on commit 7d04759

Please sign in to comment.