Skip to content
This repository has been archived by the owner on Jun 10, 2024. It is now read-only.

Commit

Permalink
FS provider 0.15 (#24)
Browse files Browse the repository at this point in the history
* updated types to generated blobstore types

* updated blobstore interface to git dependency

updated readme with wasmcloud

* addressed clippy warnings
  • Loading branch information
brooksmtownsend authored Feb 8, 2021
1 parent 824d16c commit 331c419
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 102 deletions.
20 changes: 11 additions & 9 deletions fs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
[package]
name = "wascc-fs"
version = "0.1.2"
authors = ["Kevin Hoffman <[email protected]>"]
name = "wasmcloud-fs"
version = "0.2.0"
authors = ["wasmCloud Team"]
edition = "2018"
homepage = "https://wascc.dev"
repository = "https://github.com/wascc/fs-provider"
description = "Blob store capability provider (local file system) for the waSCC runtime"
homepage = "https://wasmcloud.dev"
repository = "https://github.com/wasmcloud/capability-providers"
description = "Blob store capability provider (local file system) for the wasmCloud runtime"
license = "Apache-2.0"
documentation = "https://docs.rs/wascc-fs"
documentation = "https://docs.rs/wasmcloud-fs"
readme = "README.md"
keywords = ["webassembly", "wasm", "files", "wascc", "blobstore"]
keywords = ["webassembly", "wasm", "files", "wasmcloud", "blobstore"]
categories = ["wasm", "api-bindings"]

[lib]
Expand All @@ -20,6 +20,8 @@ crate-type = ["cdylib", "rlib"]
static_plugin = []

[dependencies]
wascc-codec = "0.8.1"
wascc-codec = "0.9.0"
log = "0.4.11"
env_logger = "0.7.1"
actor-blobstore = { git = "https://github.com/wasmcloud/actor-interfaces", branch = "main" }
actor-core = { git = "https://github.com/wasmcloud/actor-interfaces", branch = "main" }
12 changes: 6 additions & 6 deletions fs/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[![crates.io](https://img.shields.io/crates/v/wascc-fs.svg)](https://crates.io/crates/wascc-fs)&nbsp;
![Rust](https://github.com/wascc/fs-provider/workflows/Rust/badge.svg)
![license](https://img.shields.io/crates/l/wascc-fs.svg)&nbsp;
[![documentation](https://docs.rs/wascc-fs/badge.svg)](https://docs.rs/wascc-fs)
[![crates.io](https://img.shields.io/crates/v/wasmcloud-fs.svg)](https://crates.io/crates/wasmcloud-fs)&nbsp;
![Rust](https://github.com/wasmcloud/capability-providers/workflows/Rust/badge.svg)
![license](https://img.shields.io/crates/l/wasmcloud-fs.svg)&nbsp;
[![documentation](https://docs.rs/wasmcloud-fs/badge.svg)](https://docs.rs/wasmcloud-fs)

# File System Provider

The **waSCC** File System provider is a capability provider for the `wascap:blobstore` protocol. This generic protocol can be used to support capability providers like Amazon S3, Azure blob storage, Google blob storage, and more. This crate is an implementation of the protocol that operates on top of a designated root directory and can be used interchangeably with the larger cloud blob providers.
The **wasmCloud** File System provider is a capability provider for the `wasmcloud:blobstore` protocol. This generic protocol can be used to support capability providers like Amazon S3, Azure blob storage, Google blob storage, and more. This crate is an implementation of the protocol that operates on top of a designated root directory and can be used interchangeably with the larger cloud blob providers.

For this provider, the concept of a `container` is a directory beneath the root (specified via the `ROOT` configuration variable), while a `blob` corresponds to a file stored within one of the containers.

Because of the way WebAssembly and the waSCC host work, all `wascap:blobstore` capability providers must _stream_ files to and from the actor. This allows actors to unblock long enough to allow other messages from other providers to be processed and keeps the WebAssembly module from allocating too much memory.
Because of the way WebAssembly and the wasmCloud host work, all `wasmcloud:blobstore` capability providers must _stream_ files to and from the actor. This allows actors to unblock long enough to allow other messages from other providers to be processed and keeps the WebAssembly module from allocating too much memory.
126 changes: 39 additions & 87 deletions fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ extern crate wascc_codec as codec;
#[macro_use]
extern crate log;

use actor_blobstore::*;
use actor_core::CapabilityConfiguration;
use chunks::Chunks;
use codec::blobstore::*;
use codec::capabilities::{
CapabilityDescriptor, CapabilityProvider, Dispatcher, NullDispatcher, OperationDirection,
OP_GET_CAPABILITY_DESCRIPTOR,
};
use codec::capabilities::{CapabilityProvider, Dispatcher, NullDispatcher};
use codec::core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR};
use codec::{deserialize, serialize};
use std::collections::HashMap;
Expand All @@ -20,23 +18,25 @@ use std::{
path::{Path, PathBuf},
sync::{Arc, RwLock},
};
use wascc_codec::core::CapabilityConfiguration;

mod chunks;

#[cfg(not(feature = "static_plugin"))]
capability_provider!(FileSystemProvider, FileSystemProvider::new);

const CAPABILITY_ID: &str = "wascc:blobstore";
#[allow(unused)]
const CAPABILITY_ID: &str = "wasmcloud:blobstore";
const SYSTEM_ACTOR: &str = "system";
const FIRST_SEQ_NBR: u64 = 0;
const VERSION: &str = env!("CARGO_PKG_VERSION");
const REVISION: u32 = 3; // Increment for each crates publish

/// Tuple of (expected sequence number, chunks)
type SequencedChunk = (u64, Vec<FileChunk>);

#[derive(Clone)]
pub struct FileSystemProvider {
dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
rootdir: RwLock<PathBuf>,
upload_chunks: RwLock<HashMap<String, (u64, Vec<FileChunk>)>>,
rootdir: Arc<RwLock<PathBuf>>,
upload_chunks: Arc<RwLock<HashMap<String, SequencedChunk>>>,
}

impl Default for FileSystemProvider {
Expand All @@ -45,8 +45,8 @@ impl Default for FileSystemProvider {

FileSystemProvider {
dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
rootdir: RwLock::new(PathBuf::new()),
upload_chunks: RwLock::new(HashMap::new()),
rootdir: Arc::new(RwLock::new(PathBuf::new())),
upload_chunks: Arc::new(RwLock::new(HashMap::new())),
}
}
}
Expand Down Expand Up @@ -101,7 +101,7 @@ impl FileSystemProvider {
container: blob.container,
};
let blob = sanitize_blob(&blob);
info!("Starting upload: {}/{}", blob.container, blob.id);
info!("Starting upload: {}/{}", blob.container.id, blob.id);
let bfile = self.blob_to_path(&blob);
std::fs::write(bfile, &[])?;
Ok(vec![])
Expand Down Expand Up @@ -134,7 +134,7 @@ impl FileSystemProvider {
} else {
Blob {
id: "none".to_string(),
container: "none".to_string(),
container: Container::new("none"),
byte_size: 0,
}
};
Expand All @@ -152,7 +152,7 @@ impl FileSystemProvider {
.map(|e| {
e.map(|e| Blob {
id: e.file_name().into_string().unwrap(),
container: container.id.to_string(),
container: container.clone(),
byte_size: e.metadata().unwrap().len(),
})
})
Expand All @@ -168,7 +168,7 @@ impl FileSystemProvider {
chunk: FileChunk,
) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
let mut upload_chunks = self.upload_chunks.write().unwrap();
let key = actor.to_string() + &sanitize_id(&chunk.container) + &sanitize_id(&chunk.id);
let key = actor.to_string() + &sanitize_id(&chunk.container.id) + &sanitize_id(&chunk.id);
let total_chunk_count = chunk.total_bytes / chunk.chunk_size;

let (expected_sequence_no, chunks) = upload_chunks
Expand All @@ -182,13 +182,16 @@ impl FileSystemProvider {
{
let chunk = chunks.get(i).unwrap();
let bpath = Path::join(
&Path::join(&self.rootdir.read().unwrap(), sanitize_id(&chunk.container)),
&Path::join(
&self.rootdir.read().unwrap(),
sanitize_id(&chunk.container.id),
),
sanitize_id(&chunk.id),
);
let mut file = OpenOptions::new().create(false).append(true).open(bpath)?;
info!(
"Receiving file chunk: {} for {}/{}",
chunk.sequence_no, chunk.container, chunk.id
chunk.sequence_no, chunk.container.id, chunk.id
);

let count = file.write(chunk.chunk_bytes.as_ref())?;
Expand Down Expand Up @@ -223,7 +226,7 @@ impl FileSystemProvider {
let bpath = Path::join(
&Path::join(
&self.rootdir.read().unwrap(),
sanitize_id(&request.container),
sanitize_id(&request.container.id),
),
sanitize_id(&request.id),
);
Expand All @@ -236,7 +239,7 @@ impl FileSystemProvider {
};
let xfer = Transfer {
blob_id: sanitize_id(&request.id),
container: sanitize_id(&request.container),
container: Container::new(sanitize_id(&request.container.id)),
total_size: *byte_size,
chunk_size: chunk_size as _,
total_chunks: *byte_size / chunk_size as u64,
Expand All @@ -254,79 +257,26 @@ impl FileSystemProvider {
}

fn blob_to_path(&self, blob: &Blob) -> PathBuf {
let cdir = Path::join(&self.rootdir.read().unwrap(), blob.container.to_string());
let cdir = Path::join(&self.rootdir.read().unwrap(), blob.container.id.to_string());
Path::join(&cdir, blob.id.to_string())
}

fn container_to_path(&self, container: &Container) -> PathBuf {
Path::join(&self.rootdir.read().unwrap(), container.id.to_string())
}

fn get_descriptor(&self) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
use OperationDirection::{ToActor, ToProvider};
Ok(serialize(
CapabilityDescriptor::builder()
.id(CAPABILITY_ID)
.name("waSCC Blob Store Provider (Disk/File System)")
.long_description(
"A waSCC blob store capability provider exposing a file system to actors",
)
.version(VERSION)
.revision(REVISION)
.with_operation(
OP_CREATE_CONTAINER,
ToProvider,
"Creates a new container/bucket",
)
.with_operation(
OP_REMOVE_CONTAINER,
ToProvider,
"Removes a container/bucket",
)
.with_operation(
OP_LIST_OBJECTS,
ToProvider,
"Lists objects within a container",
)
.with_operation(
OP_UPLOAD_CHUNK,
ToProvider,
"Uploads a chunk of a blob to an item in a container. Must start upload first",
)
.with_operation(
OP_START_UPLOAD,
ToProvider,
"Starts the chunked upload of a blob",
)
.with_operation(
OP_START_DOWNLOAD,
ToProvider,
"Starts the chunked download of a blob",
)
.with_operation(
OP_GET_OBJECT_INFO,
ToProvider,
"Retrieves metadata about a blob",
)
.with_operation(
OP_RECEIVE_CHUNK,
ToActor,
"Receives a chunk of a blob for download",
)
.build(),
)?)
}
}

fn sanitize_container(container: &Container) -> Container {
Container {
id: sanitize_id(&container.id),
}
}

fn sanitize_blob(blob: &Blob) -> Blob {
Blob {
id: sanitize_id(&blob.id),
byte_size: blob.byte_size,
container: sanitize_id(&blob.container),
container: Container::new(sanitize_id(&blob.container.id)),
}
}

Expand All @@ -347,7 +297,7 @@ fn dispatch_chunk(
if let Ok(chunk) = chunk {
let fc = FileChunk {
sequence_no: i as u64,
container: xfer.container.to_string(),
container: Container::new(xfer.container.id.clone()),
id: xfer.blob_id.to_string(),
chunk_bytes: chunk,
chunk_size: xfer.chunk_size,
Expand Down Expand Up @@ -386,7 +336,6 @@ impl CapabilityProvider for FileSystemProvider {
match op {
OP_BIND_ACTOR if actor == SYSTEM_ACTOR => self.configure(deserialize(msg)?),
OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => Ok(vec![]),
OP_GET_CAPABILITY_DESCRIPTOR if actor == SYSTEM_ACTOR => self.get_descriptor(),
OP_CREATE_CONTAINER => self.create_container(actor, deserialize(msg)?),
OP_REMOVE_CONTAINER => self.remove_container(actor, deserialize(msg)?),
OP_REMOVE_OBJECT => self.remove_object(actor, deserialize(msg)?),
Expand All @@ -398,21 +347,24 @@ impl CapabilityProvider for FileSystemProvider {
_ => Err("bad dispatch".into()),
}
}

fn stop(&self) {
// No cleanup needed on stop at the moment
}
}

#[cfg(test)]
#[allow(unused_imports)]
mod tests {
use super::{sanitize_blob, sanitize_container};
use crate::FileSystemProvider;
use codec::blobstore::{Blob, Container};
use actor_blobstore::{Blob, Container, FileChunk};
use actor_core::CapabilityConfiguration;
use std::collections::HashMap;
use std::env::temp_dir;
use std::fs::File;
use std::io::{BufReader, Read};
use std::path::{Path, PathBuf};
use wascc_codec::blobstore::FileChunk;
use wascc_codec::core::CapabilityConfiguration;

#[test]
fn no_hacky_hacky() {
Expand All @@ -422,7 +374,7 @@ mod tests {
let blob = Blob {
byte_size: 0,
id: "../passwd".to_string(),
container: "/etc/h4x0rd".to_string(),
container: Container::new("/etc/h4x0rd"),
};
let c = sanitize_container(&container);
let b = sanitize_blob(&blob);
Expand All @@ -431,18 +383,18 @@ mod tests {
// thereby not expose anything sensitive
assert_eq!(c.id, "etc_h4x0rd");
assert_eq!(b.id, "passwd");
assert_eq!(b.container, "etc_h4x0rd");
assert_eq!(b.container.id, "etc_h4x0rd");
}

#[test]
fn test_start_upload() {
let actor = "actor1";
let container = "container".to_string();
let container = Container::new("container");
let id = "blob".to_string();

let fs = FileSystemProvider::new();
let root_dir = setup_test_start_upload(&fs);
let upload_dir = Path::join(&root_dir, &container);
let upload_dir = Path::join(&root_dir, &container.id);
let bpath = create_dir(&upload_dir, &id);

let total_bytes = 6;
Expand Down

0 comments on commit 331c419

Please sign in to comment.