From 464fd3563deff909a4a05884474349bc25d65199 Mon Sep 17 00:00:00 2001 From: Eric Swanson <64809312+ericswanson-dfinity@users.noreply.github.com> Date: Thu, 11 Mar 2021 19:19:25 -0800 Subject: [PATCH] feat: allow upload/retrieve of assets of arbitrary size from asset canister (#1482) Per the [design doc](https://github.com/dfinity/sdk/blob/master/docs/design/asset-canister.adoc): 1. Adds asset canister methods: - create_batch() - create_chunk() - commit_batch() - get() - get_chunk() 2. Reworks the asset installer in `dfx` to use these methods. It can therefore upload assets that exceed the message ingress size. Separate work (which this PR enables) will have to update `agent-js` to download these large assets. Other than allowing the storage and retrieval of large assets, this PR does not address: - multiple content types and content encodings: in this PR `dfx` always uploads with `content-type: application/octet-stream` and `content-encoding: identity` - smart/correct synchronization: in this PR `dfx` always uploads all assets (even those that have not changed), and still does not delete assets that were previously uploaded but no longer exist. - All assets and chunks are uploaded in series. See https://github.com/dfinity/sdk/issues/1491 and https://github.com/dfinity/agent-rs/issues/125 - The `store()`, `retrieve()`, and `list()` method signatures are unchanged for the time being, for backward compatibility. Later work will remove the `retrieve()` method, change the semantics and parameters of `store()`, and change the parameters and return type of `list()`. ``` Uploading assets to asset canister... large-asset.bin 1/7 (1900000 bytes) large-asset.bin 2/7 (1900000 bytes) large-asset.bin 3/7 (1900000 bytes) large-asset.bin 4/7 (1900000 bytes) large-asset.bin 5/7 (1900000 bytes) large-asset.bin 6/7 (1900000 bytes) large-asset.bin 7/7 (1100000 bytes) index.js 1/1 (1218 bytes) sample-asset.txt 1/1 (24 bytes) index.js.map 1/1 (5625 bytes) ``` --- CHANGELOG.adoc | 10 +- distributed-canisters.nix | 8 +- e2e/tests-dfx/assetscanister.bash | 44 +++ src/dfx/src/lib/installers/assets.rs | 323 +++++++++++++++++++-- src/distributed/assetstorage.mo | 82 ------ src/distributed/assetstorage/Asset.mo | 67 +++++ src/distributed/assetstorage/Batch.mo | 66 +++++ src/distributed/assetstorage/Chunk.mo | 59 ++++ src/distributed/assetstorage/Main.mo | 400 ++++++++++++++++++++++++++ src/distributed/assetstorage/Types.mo | 61 ++++ src/distributed/assetstorage/Utils.mo | 33 +++ 11 files changed, 1041 insertions(+), 112 deletions(-) delete mode 100644 src/distributed/assetstorage.mo create mode 100644 src/distributed/assetstorage/Asset.mo create mode 100644 src/distributed/assetstorage/Batch.mo create mode 100644 src/distributed/assetstorage/Chunk.mo create mode 100644 src/distributed/assetstorage/Main.mo create mode 100644 src/distributed/assetstorage/Types.mo create mode 100644 src/distributed/assetstorage/Utils.mo diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index 5de5352f2e..69456ab42d 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -19,7 +19,15 @@ This commit also upgrades tokio and reqwest in order to work correctly. There ar Renamed the `project_name` in our own generated assets to `canister_name`, for things that are generated during canister build (and not project generation). ---- +== Asset Canister + +=== feat: The asset canister can now store assets that exceed the message ingress limit (2 MB) + +* Please note that neither the JS agent nor the HTTP server have been updated yet to server such large assets. +* The existing interface is left in place for backwards-compatibility, but deprecated: +** retrieve(): use get() and get_chunk() instead +** store(): use create_batch(), create_chunk(), and commit_batch() instead +** list(): use keys() instead = 0.6.25 diff --git a/distributed-canisters.nix b/distributed-canisters.nix index b54b966b17..52b9e64866 100644 --- a/distributed-canisters.nix +++ b/distributed-canisters.nix @@ -12,19 +12,19 @@ pkgs.runCommandNoCCLocal "distributed-canisters" { } '' mkdir -p $out - for canister_mo in ${distributed}/*.mo; do - canister_name=$(basename -s .mo $canister_mo) + for canister_dir in $(find ${distributed} -mindepth 1 -maxdepth 1 -type d); do + canister_name=$(basename $canister_dir) build_dir=$out/$canister_name mkdir -p $build_dir $moc/bin/moc \ - $canister_mo \ + $canister_dir/Main.mo \ -o $build_dir/$canister_name.did \ --idl \ --package base $base $moc/bin/moc \ - $canister_mo \ + $canister_dir/Main.mo \ -o $build_dir/$canister_name.wasm \ -c --release \ --package base $base diff --git a/e2e/tests-dfx/assetscanister.bash b/e2e/tests-dfx/assetscanister.bash index c17bf00f13..045e0b1de8 100644 --- a/e2e/tests-dfx/assetscanister.bash +++ b/e2e/tests-dfx/assetscanister.bash @@ -62,3 +62,47 @@ teardown() { it's cherry season CHERRIES" "$stdout" } + +@test 'can store arbitrarily large files' { + [ "$USE_IC_REF" ] && skip "skip for ic-ref" # this takes too long for ic-ref's wasm interpreter + + install_asset assetscanister + + dfx_start + dfx canister create --all + dfx build + dfx canister install e2e_project_assets + + dd if=/dev/urandom of=src/e2e_project_assets/assets/large-asset.bin bs=1000000 count=6 + + dfx deploy + + assert_command dfx canister call --query e2e_project_assets get '(record{key="/large-asset.bin";accept_encodings=vec{"identity"}})' + assert_match 'total_length = 6_000_000' + assert_match 'content_type = "application/octet-stream"' + assert_match 'content_encoding = "identity"' + + assert_command dfx canister call --query e2e_project_assets get_chunk '(record{key="/large-asset.bin";content_encoding="identity";index=2})' + + assert_command dfx canister call --query e2e_project_assets get_chunk '(record{key="/large-asset.bin";content_encoding="identity";index=3})' + assert_command_fail dfx canister call --query e2e_project_assets get_chunk '(record{key="/large-asset.bin";content_encoding="identity";index=4})' +} + +@test "list() and keys() return asset keys" { + install_asset assetscanister + + dfx_start + dfx canister create --all + dfx build + dfx canister install e2e_project_assets + + assert_command dfx canister call --query e2e_project_assets list + assert_match '"/binary/noise.txt"' + assert_match '"/text-with-newlines.txt"' + assert_match '"/sample-asset.txt"' + + assert_command dfx canister call --query e2e_project_assets keys + assert_match '"/binary/noise.txt"' + assert_match '"/text-with-newlines.txt"' + assert_match '"/sample-asset.txt"' +} diff --git a/src/dfx/src/lib/installers/assets.rs b/src/dfx/src/lib/installers/assets.rs index 25184d7c65..46a57ce578 100644 --- a/src/dfx/src/lib/installers/assets.rs +++ b/src/dfx/src/lib/installers/assets.rs @@ -1,13 +1,273 @@ use crate::lib::canister_info::assets::AssetsCanisterInfo; use crate::lib::canister_info::CanisterInfo; -use crate::lib::error::DfxResult; +use crate::lib::error::{DfxError, DfxResult}; use crate::lib::waiter::waiter_with_timeout; -use candid::Encode; +use candid::{CandidType, Decode, Encode, Nat}; +use delay::{Delay, Waiter}; use ic_agent::Agent; +use ic_types::Principal; +use serde::Deserialize; +use std::path::PathBuf; use std::time::Duration; use walkdir::WalkDir; +const CREATE_BATCH: &str = "create_batch"; +const CREATE_CHUNK: &str = "create_chunk"; +const COMMIT_BATCH: &str = "commit_batch"; +const MAX_CHUNK_SIZE: usize = 1_900_000; + +#[derive(CandidType, Debug)] +struct CreateBatchRequest {} + +#[derive(CandidType, Debug, Deserialize)] +struct CreateBatchResponse { + batch_id: Nat, +} + +#[derive(CandidType, Debug, Deserialize)] +struct CreateChunkRequest<'a> { + batch_id: Nat, + #[serde(with = "serde_bytes")] + content: &'a [u8], +} + +#[derive(CandidType, Debug, Deserialize)] +struct CreateChunkResponse { + chunk_id: Nat, +} + +#[derive(CandidType, Debug)] +struct GetRequest { + key: String, + accept_encodings: Vec, +} + +#[derive(CandidType, Debug, Deserialize)] +struct GetResponse { + #[serde(with = "serde_bytes")] + contents: Vec, + content_type: String, + content_encoding: String, +} + +#[derive(CandidType, Debug)] +struct CreateAssetArguments { + key: String, + content_type: String, +} +#[derive(CandidType, Debug)] +struct SetAssetContentArguments { + key: String, + content_encoding: String, + chunk_ids: Vec, +} +#[derive(CandidType, Debug)] +struct UnsetAssetContentArguments { + key: String, + content_encoding: String, +} +#[derive(CandidType, Debug)] +struct DeleteAssetArguments { + key: String, +} +#[derive(CandidType, Debug)] +struct ClearArguments {} + +#[derive(CandidType, Debug)] +enum BatchOperationKind { + CreateAsset(CreateAssetArguments), + + SetAssetContent(SetAssetContentArguments), + + _UnsetAssetContent(UnsetAssetContentArguments), + + DeleteAsset(DeleteAssetArguments), + + _Clear(ClearArguments), +} + +#[derive(CandidType, Debug)] +struct CommitBatchArguments<'a> { + batch_id: &'a Nat, + operations: Vec, +} + +#[derive(Clone, Debug)] +struct AssetLocation { + source: PathBuf, + key: String, +} + +struct ChunkedAsset { + asset_location: AssetLocation, + chunk_ids: Vec, +} + +async fn create_chunk( + agent: &Agent, + canister_id: &Principal, + timeout: Duration, + batch_id: &Nat, + content: &[u8], +) -> DfxResult { + let batch_id = batch_id.clone(); + let args = CreateChunkRequest { batch_id, content }; + let args = candid::Encode!(&args)?; + + let mut waiter = Delay::builder() + .timeout(std::time::Duration::from_secs(30)) + .throttle(std::time::Duration::from_secs(1)) + .build(); + waiter.start(); + + loop { + match agent + .update(&canister_id, CREATE_CHUNK) + .with_arg(&args) + .expire_after(timeout) + .call_and_wait(waiter_with_timeout(timeout)) + .await + .map_err(DfxError::from) + .and_then(|response| { + candid::Decode!(&response, CreateChunkResponse) + .map_err(DfxError::from) + .map(|x| x.chunk_id) + }) { + Ok(chunk_id) => { + break Ok(chunk_id); + } + Err(agent_err) => match waiter.wait() { + Ok(()) => {} + Err(_) => break Err(agent_err), + }, + } + } +} + +async fn make_chunked_asset( + agent: &Agent, + canister_id: &Principal, + timeout: Duration, + batch_id: &Nat, + asset_location: AssetLocation, +) -> DfxResult { + let content = &std::fs::read(&asset_location.source)?; + + // ?? doesn't work: rust lifetimes + task::spawn = tears + // how to deal with lifetimes for agent and canister_id here + // this function won't exit until after the task is joined... + // let chunks_future_tasks: Vec<_> = content + // .chunks(MAX_CHUNK_SIZE) + // .map(|content| task::spawn(create_chunk(agent, canister_id, timeout, batch_id, content))) + // .collect(); + // println!("await chunk creation"); + // let but_lifetimes = try_join_all(chunks_future_tasks) + // .await? + // .into_iter() + // .collect::>>() + // .map(|chunk_ids| ChunkedAsset { + // asset_location, + // chunk_ids, + // }); + // ?? doesn't work + + // works (sometimes), does more work concurrently, but often doesn't work against bootstrap. + // (connection stuck in odd idle state: all agent requests return "channel closed" error.) + // let chunks_futures: Vec<_> = content + // .chunks(MAX_CHUNK_SIZE) + // .map(|content| create_chunk(agent, canister_id, timeout, batch_id, content)) + // .collect(); + // println!("await chunk creation"); + // + // try_join_all(chunks_futures) + // .await + // .map(|chunk_ids| ChunkedAsset { + // asset_location, + // chunk_ids, + // }) + // works (sometimes) + + let mut chunk_ids: Vec = vec![]; + let chunks = content.chunks(MAX_CHUNK_SIZE); + let (num_chunks, _) = chunks.size_hint(); + for (i, data_chunk) in chunks.enumerate() { + println!( + " {} {}/{} ({} bytes)", + &asset_location.key, + i + 1, + num_chunks, + data_chunk.len() + ); + chunk_ids.push(create_chunk(agent, canister_id, timeout, batch_id, data_chunk).await?); + } + Ok(ChunkedAsset { + asset_location, + chunk_ids, + }) +} + +async fn make_chunked_assets( + agent: &Agent, + canister_id: &Principal, + timeout: Duration, + batch_id: &Nat, + locs: Vec, +) -> DfxResult> { + // this neat futures version works faster in parallel when it works, + // but does not work often when connecting through the bootstrap. + // let futs: Vec<_> = locs + // .into_iter() + // .map(|loc| make_chunked_asset(agent, canister_id, timeout, batch_id, loc)) + // .collect(); + // try_join_all(futs).await + let mut chunked_assets = vec![]; + for loc in locs { + chunked_assets.push(make_chunked_asset(agent, canister_id, timeout, batch_id, loc).await?); + } + Ok(chunked_assets) +} + +async fn commit_batch( + agent: &Agent, + canister_id: &Principal, + timeout: Duration, + batch_id: &Nat, + chunked_assets: Vec, +) -> DfxResult { + let operations: Vec<_> = chunked_assets + .into_iter() + .map(|chunked_asset| { + let key = chunked_asset.asset_location.key; + vec![ + BatchOperationKind::DeleteAsset(DeleteAssetArguments { key: key.clone() }), + BatchOperationKind::CreateAsset(CreateAssetArguments { + key: key.clone(), + content_type: "application/octet-stream".to_string(), + }), + BatchOperationKind::SetAssetContent(SetAssetContentArguments { + key, + content_encoding: "identity".to_string(), + chunk_ids: chunked_asset.chunk_ids, + }), + ] + }) + .flatten() + .collect(); + let arg = CommitBatchArguments { + batch_id, + operations, + }; + let arg = candid::Encode!(&arg)?; + agent + .update(&canister_id, COMMIT_BATCH) + .with_arg(arg) + .expire_after(timeout) + .call_and_wait(waiter_with_timeout(timeout)) + .await?; + Ok(()) +} + pub async fn post_install_store_assets( info: &CanisterInfo, agent: &Agent, @@ -16,28 +276,41 @@ pub async fn post_install_store_assets( let assets_canister_info = info.as_info::()?; let output_assets_path = assets_canister_info.get_output_assets_path(); - let walker = WalkDir::new(output_assets_path).into_iter(); - for entry in walker { - let entry = entry?; - if entry.file_type().is_file() { - let source = entry.path(); - let relative = source - .strip_prefix(output_assets_path) - .expect("cannot strip prefix"); - let content = &std::fs::read(&source)?; - let path = String::from("/") + relative.to_string_lossy().as_ref(); - let blob = candid::Encode!(&path, &content)?; - - let canister_id = info.get_canister_id().expect("Could not find canister ID."); - let method_name = String::from("store"); - - agent - .update(&canister_id, &method_name) - .with_arg(&blob) - .expire_after(timeout) - .call_and_wait(waiter_with_timeout(timeout)) - .await?; - } - } + let asset_locations: Vec = WalkDir::new(output_assets_path) + .into_iter() + .filter_map(|r| { + r.ok().filter(|entry| entry.file_type().is_file()).map(|e| { + let source = e.path().to_path_buf(); + let relative = source + .strip_prefix(output_assets_path) + .expect("cannot strip prefix"); + let key = String::from("/") + relative.to_string_lossy().as_ref(); + + AssetLocation { source, key } + }) + }) + .collect(); + + let canister_id = info.get_canister_id().expect("Could not find canister ID."); + + let batch_id = create_batch(agent, &canister_id, timeout).await?; + + let chunked_assets = + make_chunked_assets(agent, &canister_id, timeout, &batch_id, asset_locations).await?; + + commit_batch(agent, &canister_id, timeout, &batch_id, chunked_assets).await?; + Ok(()) } + +async fn create_batch(agent: &Agent, canister_id: &Principal, timeout: Duration) -> DfxResult { + let create_batch_args = CreateBatchRequest {}; + let response = agent + .update(&canister_id, CREATE_BATCH) + .with_arg(candid::Encode!(&create_batch_args)?) + .expire_after(timeout) + .call_and_wait(waiter_with_timeout(timeout)) + .await?; + let create_batch_response = candid::Decode!(&response, CreateBatchResponse)?; + Ok(create_batch_response.batch_id) +} diff --git a/src/distributed/assetstorage.mo b/src/distributed/assetstorage.mo deleted file mode 100644 index 3055b95600..0000000000 --- a/src/distributed/assetstorage.mo +++ /dev/null @@ -1,82 +0,0 @@ -import Error "mo:base/Error"; -import Iter "mo:base/Iter"; -import Array "mo:base/Array"; -import Text "mo:base/Text"; -import Tree "mo:base/RBTree"; - -shared ({caller = creator}) actor class () { - - public type Path = Text; - public type Contents = Blob; - - public type HeaderField = (Text, Text); - - public type HttpRequest = { - method: Text; - url: Text; - headers: [HeaderField]; - body: Blob; - }; - public type HttpResponse = { - status_code: Nat16; - headers: [HeaderField]; - body: Blob; - }; - - stable var authorized: [Principal] = [creator]; - - let db: Tree.RBTree = Tree.RBTree(Text.compare); - - public shared ({ caller }) func authorize(other: Principal) : async () { - if (isSafe(caller)) { - authorized := Array.append(authorized, [other]); - } else { - throw Error.reject("not authorized"); - } - }; - - public shared ({ caller }) func store(path : Path, contents : Contents) : async () { - if (isSafe(caller)) { - db.put(path, contents); - } else { - throw Error.reject("not authorized"); - }; - }; - - public query func retrieve(path : Path) : async Contents { - switch (db.get(path)) { - case null throw Error.reject("not found"); - case (?contents) contents; - }; - }; - - public query func list() : async [Path] { - let iter = Iter.map<(Path, Contents), Path>(db.entries(), func (path, _) = path); - Iter.toArray(iter) - }; - - func isSafe(caller: Principal) : Bool { - func eq(value: Principal): Bool = value == caller; - Array.find(authorized, eq) != null - }; - - public query func http_request(request: HttpRequest): async HttpResponse { - let content = getContent(request.url); - - switch (content) { - case null {{ status_code = 404; headers = []; body = "" }}; - case (?c) {{ status_code = 200; headers = []; body = c }}; - } - }; - - private func getContent(uri: Text): ?Blob { - let splitted = Text.split(uri, #char '?'); - let array = Iter.toArray(splitted); - let path = array[0]; - - switch (db.get(path)) { - case null db.get("/index.html"); - case (?contents) ?contents; - } - }; -}; diff --git a/src/distributed/assetstorage/Asset.mo b/src/distributed/assetstorage/Asset.mo new file mode 100644 index 0000000000..f6c0a89ede --- /dev/null +++ b/src/distributed/assetstorage/Asset.mo @@ -0,0 +1,67 @@ +import HashMap "mo:base/HashMap"; +import Iter "mo:base/Iter"; +import Text "mo:base/Text"; + +import T "Types"; +import U "Utils"; + +module { + public type AssetEncoding = { + contentEncoding: Text; + content: [Blob]; + totalLength: Nat; + }; + + public class Asset( + initContentType: Text, + initEncodings: HashMap.HashMap + ) { + public let contentType = initContentType; + let encodings = initEncodings; + + // Naive encoding selection: of the accepted encodings, pick the first available. + public func chooseEncoding(acceptEncodings : [Text]) : ?AssetEncoding { + for (acceptEncoding in acceptEncodings.vals()) { + switch (encodings.get(acceptEncoding)) { + case null {}; + case (?encoding) return ?encoding; + } + }; + null + }; + + public func getEncoding(encodingType: Text): ?AssetEncoding { + encodings.get(encodingType) + }; + + public func setEncoding(encodingType: Text, encoding: AssetEncoding) { + encodings.put(encodingType, encoding) + }; + + public func unsetEncoding(encodingType: Text) { + encodings.delete(encodingType) + }; + + public func toStableAsset() : StableAsset = { + contentType = contentType; + encodings = Iter.toArray(encodings.entries()); + }; + }; + + public type StableAsset = { + contentType: Text; + encodings: [(Text, AssetEncoding)]; + }; + + public func toStableAssetEntry((k: T.Key, v: Asset)) : ((T.Key, StableAsset)) { + (k, v.toStableAsset()) + }; + + public func toAssetEntry((k: T.Key, v: StableAsset)) : ((T.Key, Asset)) { + let a = Asset( + v.contentType, + HashMap.fromIter(v.encodings.vals(), 7, Text.equal, Text.hash) + ); + (k, a) + }; +} diff --git a/src/distributed/assetstorage/Batch.mo b/src/distributed/assetstorage/Batch.mo new file mode 100644 index 0000000000..ade6f42f04 --- /dev/null +++ b/src/distributed/assetstorage/Batch.mo @@ -0,0 +1,66 @@ +import Debug "mo:base/Debug"; +import HashMap "mo:base/HashMap"; +import Int "mo:base/Int"; +import Time "mo:base/Time"; + +import T "Types"; +import U "Utils"; + +module { + +object batch { + public func nextExpireTime() : T.Time { + let expiryNanos = 5 * 60 * 1000 * 1000 * 1000; + Time.now() + expiryNanos + } +}; + +// A batch associates a bunch of chunks that are being uploaded, so that none +// of them time out or all of them do. +public class Batch(initBatchId: T.BatchId) { + public let batchId = initBatchId; + var expiresAt : T.Time = batch.nextExpireTime(); + + public func refreshExpiry() { + expiresAt := batch.nextExpireTime(); + }; + + public func isExpired(asOf : T.Time) : Bool { + expiresAt <= asOf + }; +}; + +// We group the staged chunks into batches. Uploading a chunk refreshes the batch's expiry timer. +// We delete expired batches so that they don't consume space forever after an interrupted install. +public class Batches() { + var nextBatchId = 1; + let batches = HashMap.HashMap(7, Int.equal, Int.hash); + + public func get(batchId: T.BatchId) : ?Batch { + batches.get(batchId) + }; + + public func delete(batchId: T.BatchId) { + batches.delete(batchId) + }; + + public func create(): Batch { + let batchId = nextBatchId; + nextBatchId += 1; + let batch = Batch(batchId); + batches.put(batchId, batch); + batch + }; + + public func deleteExpired() : () { + let now = Time.now(); + U.deleteFromHashMap(batches, Int.equal, Int.hash, func(k: Int, batch: Batch) : Bool = batch.isExpired(now)); + }; + + public func reset() { + nextBatchId := 1; + U.clearHashMap(batches); + } +} + +} diff --git a/src/distributed/assetstorage/Chunk.mo b/src/distributed/assetstorage/Chunk.mo new file mode 100644 index 0000000000..ce72f40a52 --- /dev/null +++ b/src/distributed/assetstorage/Chunk.mo @@ -0,0 +1,59 @@ +import Debug "mo:base/Debug"; +import HashMap "mo:base/HashMap"; +import Int "mo:base/Int"; +import Result "mo:base/Result"; +import Time "mo:base/Time"; + +import B "Batch"; +import T "Types"; +import U "Utils"; + +module Chunk { + +// A chunks holds a staged piece of content until we assign it to +// an asset by content-encoding. +public type Chunk = { + batch: B.Batch; + content: Blob; +}; + +public class Chunks() { + var nextChunkId = 1; + let chunks = HashMap.HashMap(7, Int.equal, Int.hash); + + // Create a new chunk for a piece of content. This refreshes the batch's + // expiry timer. + public func create(batch: B.Batch, content: Blob) : T.ChunkId { + let chunkId = nextChunkId; + nextChunkId += 1; + let chunk : Chunk = { + batch = batch; + content = content; + }; + + batch.refreshExpiry(); + chunks.put(chunkId, chunk); + + chunkId + }; + + public func take(chunkId: T.ChunkId): Result.Result { + switch (chunks.remove(chunkId)) { + case null #err("chunk not found"); + case (?chunk) #ok(chunk.content); + } + }; + + public func reset() { + nextChunkId := 1; + U.clearHashMap(chunks); + }; + + public func deleteExpired() : () { + let now = Time.now(); + + U.deleteFromHashMap(chunks, Int.equal, Int.hash, func(k: Int, chunk: Chunk) : Bool = chunk.batch.isExpired(now)); + }; +} + +} diff --git a/src/distributed/assetstorage/Main.mo b/src/distributed/assetstorage/Main.mo new file mode 100644 index 0000000000..ec5cbf1699 --- /dev/null +++ b/src/distributed/assetstorage/Main.mo @@ -0,0 +1,400 @@ +import Array "mo:base/Array"; +import Debug "mo:base/Debug"; +import Error "mo:base/Error"; +import HashMap "mo:base/HashMap"; +import Int "mo:base/Int"; +import Iter "mo:base/Iter"; +import Nat "mo:base/Nat"; +import Nat8 "mo:base/Nat8"; +import Result "mo:base/Result"; +import Text "mo:base/Text"; +import Time "mo:base/Time"; + +import A "Asset"; +import B "Batch"; +import C "Chunk"; +import T "Types"; +import U "Utils"; + + +shared ({caller = creator}) actor class () { + + stable var authorized: [Principal] = [creator]; + + stable var stableAssets : [(T.Key, A.StableAsset)] = []; + let assets = HashMap.fromIter(Iter.map(stableAssets.vals(), A.toAssetEntry), 7, Text.equal, Text.hash); + + let chunks = C.Chunks(); + let batches = B.Batches(); + + system func preupgrade() { + stableAssets := Iter.toArray(Iter.map(assets.entries(), A.toStableAssetEntry)); + }; + + system func postupgrade() { + stableAssets := []; + }; + + public shared ({ caller }) func authorize(other: Principal) : async () { + if (isSafe(caller)) { + authorized := Array.append(authorized, [other]); + } else { + throw Error.reject("not authorized"); + } + }; + + // Retrieve an asset's contents by name. Only returns the first chunk of an asset's + // contents, even if there were more than one chunk. + // To handle larger assets, use get() and get_chunk(). + public query func retrieve(path : T.Path) : async T.Contents { + switch (assets.get(path)) { + case null throw Error.reject("not found"); + case (?asset) { + switch (asset.getEncoding("identity")) { + case null throw Error.reject("no identity encoding"); + case (?encoding) encoding.content[0]; + }; + }; + } + }; + + // Store an asset of limited size, with + // content-type: "application/octet-stream" + // content-encoding: "identity" + // This deprecated function is backwards-compatible with an older interface and will be replaced + // with a function of the same name but that allows specification of the content type and encoding. + // Prefer to use create_batch(), create_chunk(), commit_batch(). + public shared ({ caller }) func store(path : T.Path, contents : T.Contents) : async () { + if (isSafe(caller) == false) { + throw Error.reject("not authorized"); + }; + + let batch = batches.create(); + let chunkId = chunks.create(batch, contents); + + let create_asset_args : T.CreateAssetArguments = { + key = path; + content_type = "application/octet-stream" + }; + switch(createAsset(create_asset_args)) { + case (#ok(())) {}; + case (#err(msg)) throw Error.reject(msg); + }; + + let args : T.SetAssetContentArguments = { + key = path; + content_encoding = "identity"; + chunk_ids = [ chunkId ]; + }; + switch(setAssetContent(args)) { + case (#ok(())) {}; + case (#err(msg)) throw Error.reject(msg); + }; + }; + + func listKeys(): [T.Path] { + let iter = Iter.map<(Text, A.Asset), T.Path>(assets.entries(), func (key, _) = key); + Iter.toArray(iter) + }; + + // deprecated: the signature of this method will change, to take an empty record as + // a parameter and to return an array of records. + // For now, call keys() instead + public query func list() : async [T.Path] { + listKeys() + }; + + // Returns an array of the keys of all assets contained in the asset canister. + // This method will be deprecated after the signature of list() changes. + public query func keys() : async [T.Path] { + listKeys() + }; + + func isSafe(caller: Principal) : Bool { + func eq(value: Principal): Bool = value == caller; + Array.find(authorized, eq) != null + }; + + // 1. Choose a content encoding from among the accepted encodings. + // 2. Return its content, or the first chunk of its content. + // + // If content.size() > total_length, caller must call get_chunk() get the rest of the content. + // All chunks except the last will have the same size as the first chunk. + public query func get(arg:{ + key: T.Key; + accept_encodings: [Text] + }) : async ( { + content: Blob; + content_type: Text; + content_encoding: Text; + total_length: Nat; + } ) { + switch (assets.get(arg.key)) { + case null throw Error.reject("asset not found"); + case (?asset) { + switch (asset.chooseEncoding(arg.accept_encodings)) { + case null throw Error.reject("no such encoding"); + case (?encoding) { + { + content = encoding.content[0]; + content_type = asset.contentType; + content_encoding = encoding.contentEncoding; + total_length = encoding.totalLength; + } + } + }; + }; + }; + }; + + // Get subsequent chunks of an asset encoding's content, after get(). + public query func get_chunk(arg:{ + key: T.Key; + content_encoding: Text; + index: Nat; + }) : async ( { + content: Blob + }) { + switch (assets.get(arg.key)) { + case null throw Error.reject("asset not found"); + case (?asset) { + switch (asset.getEncoding(arg.content_encoding)) { + case null throw Error.reject("no such encoding"); + case (?encoding) { + { + content = encoding.content[arg.index]; + } + } + }; + }; + }; + }; + + // All chunks are associated with a batch until committed with commit_batch. + public shared ({ caller }) func create_batch(arg: {}) : async ({ + batch_id: T.BatchId + }) { + if (isSafe(caller) == false) + throw Error.reject("not authorized"); + + batches.deleteExpired(); + chunks.deleteExpired(); + + { + batch_id = batches.create().batchId; + } + }; + + public shared ({ caller }) func create_chunk( arg: { + batch_id: T.BatchId; + content: Blob; + } ) : async ({ + chunk_id: T.ChunkId + }) { + //Debug.print("create_chunk(batch " # Int.toText(arg.batch_id) # ", " # Int.toText(arg.content.size()) # " bytes)"); + if (isSafe(caller) == false) + throw Error.reject("not authorized"); + + let chunkId = switch (batches.get(arg.batch_id)) { + case null throw Error.reject("batch not found"); + case (?batch) chunks.create(batch, arg.content) + }; + + { + chunk_id = chunkId; + } + }; + + public shared ({ caller }) func commit_batch(args: T.CommitBatchArguments) : async () { + //Debug.print("commit_batch (" # Int.toText(args.operations.size()) # ")"); + if (isSafe(caller) == false) + throw Error.reject("not authorized"); + + for (op in args.operations.vals()) { + let r : Result.Result<(), Text> = switch(op) { + case (#CreateAsset(args)) { createAsset(args); }; + case (#SetAssetContent(args)) { setAssetContent(args); }; + case (#UnsetAssetContent(args)) { unsetAssetContent(args); }; + case (#DeleteAsset(args)) { deleteAsset(args); }; + case (#Clear(args)) { doClear(args); } + }; + switch(r) { + case (#ok(())) {}; + case (#err(msg)) throw Error.reject(msg); + }; + }; + batches.delete(args.batch_id); + }; + + public shared ({ caller }) func create_asset(arg: T.CreateAssetArguments) : async () { + if (isSafe(caller) == false) + throw Error.reject("not authorized"); + + switch(createAsset(arg)) { + case (#ok(())) {}; + case (#err(err)) throw Error.reject(err); + }; + }; + + func createAsset(arg: T.CreateAssetArguments) : Result.Result<(), Text> { + //Debug.print("createAsset(" # arg.key # ")"); + switch (assets.get(arg.key)) { + case null { + let asset = A.Asset( + arg.content_type, + HashMap.HashMap(7, Text.equal, Text.hash) + ); + assets.put(arg.key, asset ); + }; + case (?asset) { + if (asset.contentType != arg.content_type) + return #err("create_asset: content type mismatch"); + } + }; + #ok(()) + }; + + public shared ({ caller }) func set_asset_content(arg: T.SetAssetContentArguments) : async () { + if (isSafe(caller) == false) + throw Error.reject("not authorized"); + + switch(setAssetContent(arg)) { + case (#ok(())) {}; + case (#err(err)) throw Error.reject(err); + }; + }; + + func chunkLengthsMatch(chunks: [Blob]): Bool { + if (chunks.size() > 2) { + let expectedLength = chunks[0].size(); + for (i in Iter.range(1, chunks.size()-2)) { + //Debug.print("chunk at index " # Int.toText(i) # " has length " # Int.toText(chunks[i].size()) # " and expected is " # Int.toText(expectedLength) ); + if (chunks[i].size() != expectedLength) { + //Debug.print("chunk at index " # Int.toText(i) # " with length " # Int.toText(chunks[i].size()) # " does not match expected length " # Int.toText(expectedLength) ); + + return false; + } + }; + }; + true + }; + + func setAssetContent(arg: T.SetAssetContentArguments) : Result.Result<(), Text> { + //Debug.print("setAssetContent(" # arg.key # ")"); + switch (assets.get(arg.key)) { + case null #err("asset not found"); + case (?asset) { + switch (Array.mapResult(arg.chunk_ids, chunks.take)) { + case (#ok(chunks)) { + if (chunkLengthsMatch(chunks) == false) { + #err("chunk lengths do not match the size of the first chunk") + } else { + let encoding : A.AssetEncoding = { + contentEncoding = arg.content_encoding; + content = chunks; + totalLength = Array.foldLeft(chunks, 0, func (acc: Nat, blob: Blob): Nat { + acc + blob.size() + }); + }; + #ok(asset.setEncoding(arg.content_encoding, encoding)); + }; + }; + case (#err(err)) #err(err); + }; + }; + } + }; + + public shared ({ caller }) func unset_asset_content(args: T.UnsetAssetContentArguments) : async () { + if (isSafe(caller) == false) + throw Error.reject("not authorized"); + + switch(unsetAssetContent(args)) { + case (#ok(())) {}; + case (#err(err)) throw Error.reject(err); + }; + }; + + func unsetAssetContent(args: T.UnsetAssetContentArguments) : Result.Result<(), Text> { + //Debug.print("unsetAssetContent(" # args.key # ")"); + switch (assets.get(args.key)) { + case null #err("asset not found"); + case (?asset) { + asset.unsetEncoding(args.content_encoding); + #ok(()) + }; + }; + }; + + public shared ({ caller }) func delete_asset(args: T.DeleteAssetArguments) : async () { + if (isSafe(caller) == false) + throw Error.reject("not authorized"); + + switch(deleteAsset(args)) { + case (#ok(())) {}; + case (#err(err)) throw Error.reject(err); + }; + }; + + func deleteAsset(args: T.DeleteAssetArguments) : Result.Result<(), Text> { + //Debug.print("deleteAsset(" # args.key # ")"); + if (assets.size() > 0) { // avoid div/0 bug https://github.com/dfinity/motoko-base/issues/228 + assets.delete(args.key); + }; + #ok(()) + }; + + public shared ({ caller }) func clear(args: T.ClearArguments) : async () { + if (isSafe(caller) == false) + throw Error.reject("not authorized"); + + switch(doClear(args)) { + case (#ok(())) {}; + case (#err(err)) throw Error.reject(err); + }; + }; + + func doClear(args: T.ClearArguments) : Result.Result<(), Text> { + stableAssets := []; + U.clearHashMap(assets); + + batches.reset(); + chunks.reset(); + + #ok(()) + }; + + public query func http_request(request: T.HttpRequest): async T.HttpResponse { + let path = getPath(request.url); + + let assetEncoding: ?A.AssetEncoding = switch (getAssetEncoding(path)) { + case null getAssetEncoding("/index.html"); + case (?found) ?found; + }; + + switch (assetEncoding) { + case null {{ status_code = 404; headers = []; body = "" }}; + case (?c) { + if (c.content.size() > 1) + throw Error.reject("asset too large"); + + { status_code = 200; headers = []; body = c.content[0] } + } + } + }; + + private func getPath(uri: Text): Text { + let splitted = Text.split(uri, #char '?'); + let array = Iter.toArray(splitted); + let path = array[0]; + path + }; + + private func getAssetEncoding(path: Text): ?A.AssetEncoding { + switch (assets.get(path)) { + case null null; + case (?asset) asset.getEncoding("identity"); + } + }; + +}; diff --git a/src/distributed/assetstorage/Types.mo b/src/distributed/assetstorage/Types.mo new file mode 100644 index 0000000000..8228f3a462 --- /dev/null +++ b/src/distributed/assetstorage/Types.mo @@ -0,0 +1,61 @@ +module Types { + public type Contents = Blob; + public type Path = Text; + + public type BatchId = Nat; + public type ChunkId = Nat; + public type Key = Text; + public type Time = Int; + + public type CreateAssetArguments = { + key: Key; + content_type: Text; + }; + + public type SetAssetContentArguments = { + key: Key; + content_encoding: Text; + chunk_ids: [ChunkId] + }; + + public type UnsetAssetContentArguments = { + key: Key; + content_encoding: Text; + }; + + public type DeleteAssetArguments = { + key: Key; + }; + + public type ClearArguments = { + }; + + public type BatchOperationKind = { + #CreateAsset: CreateAssetArguments; + #SetAssetContent: SetAssetContentArguments; + #UnsetAssetContent: UnsetAssetContentArguments; + + #DeleteAsset: DeleteAssetArguments; + + #Clear: ClearArguments; + }; + + public type CommitBatchArguments = { + batch_id: BatchId; + operations: [BatchOperationKind]; + }; + + public type HeaderField = (Text, Text); + + public type HttpRequest = { + method: Text; + url: Text; + headers: [HeaderField]; + body: Blob; + }; + public type HttpResponse = { + status_code: Nat16; + headers: [HeaderField]; + body: Blob; + }; +}; diff --git a/src/distributed/assetstorage/Utils.mo b/src/distributed/assetstorage/Utils.mo new file mode 100644 index 0000000000..c1b6ee8e98 --- /dev/null +++ b/src/distributed/assetstorage/Utils.mo @@ -0,0 +1,33 @@ +import Hash "mo:base/Hash"; +import HashMap "mo:base/HashMap"; +import Int "mo:base/Int"; +import Iter "mo:base/Iter"; + +module Utils { + + public func clearHashMap(h:HashMap.HashMap) : () { + let keys = Iter.toArray(Iter.map(h.entries(), func((k: K, _: V)): K = k )); + for (key in keys.vals()) { + h.delete(key); + }; + }; + + public func deleteFromHashMap + (h:HashMap.HashMap, + keyEq: (K,K) -> Bool, + keyHash: K -> Hash.Hash, + deleteFn: (K, V) -> Bool + ): () { + let entriesToDelete: HashMap.HashMap = HashMap.mapFilter(h, keyEq, keyHash, + func(k: K, v: V) : ?V { + if (deleteFn(k, v)) + ?v + else + null + } + ); + for ((k,_) in entriesToDelete.entries()) { + h.delete(k); + }; + } +};