Skip to content

Commit

Permalink
Add blob size limits. (#2705)
Browse files Browse the repository at this point in the history
* Add a blob size limit.

* Add a bytecode size limit.

* Add unit tests for limits.

* Don't enforce the limit for already published bytecode.

* Simplify LimitedWriter; add unit test.

* Add decompressed_size_at_most.

* Update and copy comment about #2710.
  • Loading branch information
afck authored Oct 26, 2024
1 parent 529e108 commit a0003ce
Show file tree
Hide file tree
Showing 16 changed files with 257 additions and 33 deletions.
6 changes: 5 additions & 1 deletion CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,9 @@ View or update the resource control policy
* `--message <MESSAGE>` — Set the base price of sending a message from a block..
* `--message-byte <MESSAGE_BYTE>` — Set the additional price for each byte in the argument of a user message
* `--maximum-fuel-per-block <MAXIMUM_FUEL_PER_BLOCK>` — Set the maximum amount of fuel per block
* `--maximum-executed-block-size <MAXIMUM_EXECUTED_BLOCK_SIZE>` — Set the maximum size of an executed block
* `--maximum-executed-block-size <MAXIMUM_EXECUTED_BLOCK_SIZE>` — Set the maximum size of an executed block, in bytes
* `--maximum-blob-size <MAXIMUM_BLOB_SIZE>` — Set the maximum size of data blobs, compressed bytecode and other binary blobs, in bytes
* `--maximum-bytecode-size <MAXIMUM_BYTECODE_SIZE>` — Set the maximum size of decompressed contract or service bytecode, in bytes
* `--maximum-bytes-read-per-block <MAXIMUM_BYTES_READ_PER_BLOCK>` — Set the maximum read data per block
* `--maximum-bytes-written-per-block <MAXIMUM_BYTES_WRITTEN_PER_BLOCK>` — Set the maximum write data per block

Expand Down Expand Up @@ -473,6 +475,8 @@ Create genesis configuration for a Linera deployment. Create initial user chains
Default value: `0`
* `--maximum-fuel-per-block <MAXIMUM_FUEL_PER_BLOCK>` — Set the maximum amount of fuel per block
* `--maximum-executed-block-size <MAXIMUM_EXECUTED_BLOCK_SIZE>` — Set the maximum size of an executed block
* `--maximum-bytecode-size <MAXIMUM_BYTECODE_SIZE>` — Set the maximum size of decompressed contract or service bytecode, in bytes
* `--maximum-blob-size <MAXIMUM_BLOB_SIZE>` — Set the maximum size of data blobs, compressed bytecode and other binary blobs, in bytes
* `--maximum-bytes-read-per-block <MAXIMUM_BYTES_READ_PER_BLOCK>` — Set the maximum read data per block
* `--maximum-bytes-written-per-block <MAXIMUM_BYTES_WRITTEN_PER_BLOCK>` — Set the maximum write data per block
* `--testing-prng-seed <TESTING_PRNG_SEED>` — Force this wallet to generate keys using a PRNG and a given seed. USE FOR TESTING ONLY
Expand Down
57 changes: 42 additions & 15 deletions linera-base/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::{
ApplicationId, BlobId, BlobType, BytecodeId, Destination, GenericApplicationId, MessageId,
UserApplicationId,
},
limited_writer::{LimitedWriter, LimitedWriterError},
time::{Duration, SystemTime},
};

Expand Down Expand Up @@ -861,14 +862,8 @@ impl fmt::Debug for Bytecode {
#[derive(Error, Debug)]
pub enum DecompressionError {
/// Compressed bytecode is invalid, and could not be decompressed.
#[cfg(not(target_arch = "wasm32"))]
#[error("Bytecode could not be decompressed")]
InvalidCompressedBytecode(#[source] io::Error),

/// Compressed bytecode is invalid, and could not be decompressed.
#[cfg(target_arch = "wasm32")]
#[error("Bytecode could not be decompressed")]
InvalidCompressedBytecode(#[from] ruzstd::frame_decoder::FrameDecoderError),
#[error("Bytecode could not be decompressed: {0}")]
InvalidCompressedBytecode(#[from] io::Error),
}

/// A compressed WebAssembly module's bytecode.
Expand All @@ -880,20 +875,53 @@ pub struct CompressedBytecode {
pub compressed_bytes: Vec<u8>,
}

#[cfg(not(target_arch = "wasm32"))]
impl CompressedBytecode {
/// Returns `true` if the decompressed size does not exceed the limit.
pub fn decompressed_size_at_most(&self, limit: u64) -> Result<bool, DecompressionError> {
let mut decoder = zstd::stream::Decoder::new(&*self.compressed_bytes)?;
let limit = usize::try_from(limit).unwrap_or(usize::MAX);
let mut writer = LimitedWriter::new(io::sink(), limit);
match io::copy(&mut decoder, &mut writer) {
Ok(_) => Ok(true),
Err(error) => {
error.downcast::<LimitedWriterError>()?;
Ok(false)
}
}
}

/// Decompresses a [`CompressedBytecode`] into a [`Bytecode`].
#[cfg(not(target_arch = "wasm32"))]
pub fn decompress(&self) -> Result<Bytecode, DecompressionError> {
#[cfg(with_metrics)]
let _decompression_latency = BYTECODE_DECOMPRESSION_LATENCY.measure_latency();
let bytes = zstd::stream::decode_all(&*self.compressed_bytes)
.map_err(DecompressionError::InvalidCompressedBytecode)?;
let bytes = zstd::stream::decode_all(&*self.compressed_bytes)?;

Ok(Bytecode { bytes })
}
}

#[cfg(target_arch = "wasm32")]
impl CompressedBytecode {
/// Returns `true` if the decompressed size does not exceed the limit.
pub fn decompressed_size_at_most(&self, limit: u64) -> Result<bool, DecompressionError> {
let compressed_bytes = &*self.compressed_bytes;
let limit = usize::try_from(limit).unwrap_or(usize::MAX);
let mut writer = LimitedWriter::new(io::sink(), limit);
let mut decoder = ruzstd::streaming_decoder::StreamingDecoder::new(compressed_bytes)
.map_err(io::Error::other)?;

// TODO(#2710): Decode multiple frames, if present
match io::copy(&mut decoder, &mut writer) {
Ok(_) => Ok(true),
Err(error) => {
error.downcast::<LimitedWriterError>()?;
Ok(false)
}
}
}

/// Decompresses a [`CompressedBytecode`] into a [`Bytecode`].
#[cfg(target_arch = "wasm32")]
pub fn decompress(&self) -> Result<Bytecode, DecompressionError> {
use ruzstd::{io::Read, streaming_decoder::StreamingDecoder};

Expand All @@ -902,10 +930,9 @@ impl CompressedBytecode {

let compressed_bytes = &*self.compressed_bytes;
let mut bytes = Vec::new();
let mut decoder = StreamingDecoder::new(compressed_bytes)?;
let mut decoder = StreamingDecoder::new(compressed_bytes).map_err(io::Error::other)?;

// Decode multiple frames, if present
// (https://github.com/KillingSpark/zstd-rs/issues/57)
// TODO(#2710): Decode multiple frames, if present
while !decoder.get_ref().is_empty() {
decoder
.read_to_end(&mut bytes)
Expand Down
1 change: 1 addition & 0 deletions linera-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod crypto;
pub mod data_types;
mod graphql;
pub mod identifiers;
mod limited_writer;
pub mod ownership;
#[cfg(not(target_arch = "wasm32"))]
pub mod port;
Expand Down
68 changes: 68 additions & 0 deletions linera-base/src/limited_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::io::{self, Write};

use thiserror::Error;

use crate::ensure;

#[derive(Error, Debug)]
#[error("Writer limit exceeded")]
pub struct LimitedWriterError;

/// Custom writer that enforces a byte limit.
pub struct LimitedWriter<W: Write> {
inner: W,
limit: usize,
written: usize,
}

impl<W: Write> LimitedWriter<W> {
pub fn new(inner: W, limit: usize) -> Self {
Self {
inner,
limit,
written: 0,
}
}
}

impl<W: Write> Write for LimitedWriter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// Calculate the number of bytes we can write without exceeding the limit.
// Fail if the buffer doesn't fit.
ensure!(
self.limit
.checked_sub(self.written)
.is_some_and(|remaining| buf.len() <= remaining),
io::Error::other(LimitedWriterError)
);
// Forward to the inner writer.
let n = self.inner.write(buf)?;
self.written += n;
Ok(n)
}

fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_limited_writer() {
let mut out_buffer = Vec::new();
let mut writer = LimitedWriter::new(&mut out_buffer, 5);
assert_eq!(writer.write(b"foo").unwrap(), 3);
assert_eq!(writer.write(b"ba").unwrap(), 2);
assert!(writer
.write(b"r")
.unwrap_err()
.downcast::<LimitedWriterError>()
.is_ok());
}
}
2 changes: 1 addition & 1 deletion linera-chain/src/unit_tests/chain_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async fn test_block_size_limit() {
let mut chain = ChainStateView::new(chain_id).await;

// The size of the executed valid block below.
let maximum_executed_block_size = 675;
let maximum_executed_block_size = 691;

// Initialize the chain.
let mut config = make_open_chain_config();
Expand Down
20 changes: 19 additions & 1 deletion linera-client/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,19 @@ pub enum ClientCommand {
#[arg(long)]
maximum_fuel_per_block: Option<u64>,

/// Set the maximum size of an executed block.
/// Set the maximum size of an executed block, in bytes.
#[arg(long)]
maximum_executed_block_size: Option<u64>,

/// Set the maximum size of data blobs, compressed bytecode and other binary blobs,
/// in bytes.
#[arg(long)]
maximum_blob_size: Option<u64>,

/// Set the maximum size of decompressed contract or service bytecode, in bytes.
#[arg(long)]
maximum_bytecode_size: Option<u64>,

/// Set the maximum read data per block.
#[arg(long)]
maximum_bytes_read_per_block: Option<u64>,
Expand Down Expand Up @@ -635,6 +644,15 @@ pub enum ClientCommand {
#[arg(long)]
maximum_executed_block_size: Option<u64>,

/// Set the maximum size of decompressed contract or service bytecode, in bytes.
#[arg(long)]
maximum_bytecode_size: Option<u64>,

/// Set the maximum size of data blobs, compressed bytecode and other binary blobs,
/// in bytes.
#[arg(long)]
maximum_blob_size: Option<u64>,

/// Set the maximum read data per block.
#[arg(long)]
maximum_bytes_read_per_block: Option<u64>,
Expand Down
23 changes: 22 additions & 1 deletion linera-core/src/chain_worker/state/temporary_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use std::borrow::Cow;

use linera_base::{
data_types::{ArithmeticError, Timestamp, UserApplicationDescription},
data_types::{ArithmeticError, BlobContent, Timestamp, UserApplicationDescription},
ensure,
identifiers::{GenericApplicationId, UserApplicationId},
};
Expand Down Expand Up @@ -179,6 +179,8 @@ where
.current_committee()
.expect("chain is active");
check_block_epoch(epoch, block)?;
let maximum_blob_size = committee.policy().maximum_blob_size;
let maximum_bytecode_size = committee.policy().maximum_bytecode_size;
// Check the authentication of the block.
let public_key = self
.0
Expand Down Expand Up @@ -213,6 +215,25 @@ where
for blob in blobs {
self.0.cache_recent_blob(Cow::Borrowed(blob)).await;
}
for blob in self.0.get_blobs(block.published_blob_ids()).await? {
let blob_size = match blob.content() {
BlobContent::Data(bytes) => bytes.len(),
BlobContent::ContractBytecode(compressed_bytecode)
| BlobContent::ServiceBytecode(compressed_bytecode) => {
ensure!(
compressed_bytecode.decompressed_size_at_most(maximum_bytecode_size)?,
WorkerError::BytecodeTooLarge
);
compressed_bytecode.compressed_bytes.len()
}
};
ensure!(
u64::try_from(blob_size)
.ok()
.is_some_and(|size| size <= maximum_blob_size),
WorkerError::BlobTooLarge
)
}

let local_time = self.0.storage.clock().current_time();
ensure!(
Expand Down
14 changes: 12 additions & 2 deletions linera-core/src/unit_tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2413,7 +2413,15 @@ async fn test_propose_block_with_messages_and_blobs<B>(storage_builder: B) -> an
where
B: StorageBuilder,
{
let mut builder = TestBuilder::new(storage_builder, 4, 0).await?;
let blob_bytes = b"blob".to_vec();
let large_blob_bytes = b"blob+".to_vec();
let policy = ResourceControlPolicy {
maximum_blob_size: blob_bytes.len() as u64,
..ResourceControlPolicy::default()
};
let mut builder = TestBuilder::new(storage_builder, 4, 0)
.await?
.with_policy(policy.clone());
let description1 = ChainDescription::Root(1);
let description2 = ChainDescription::Root(2);
let description3 = ChainDescription::Root(3);
Expand All @@ -2426,7 +2434,6 @@ where
builder.set_fault_type([3], FaultType::Offline).await;

// Publish a blob on chain 1.
let blob_bytes = b"blob".to_vec();
let blob_id = BlobId::new(
CryptoHash::new(&BlobBytes(blob_bytes.clone())),
BlobType::Data,
Expand Down Expand Up @@ -2458,5 +2465,8 @@ where
assert_eq!(executed_block.block.incoming_bundles.len(), 1);
assert_eq!(executed_block.required_blob_ids().len(), 1);

let result = client1.publish_data_blob(large_blob_bytes).await;
assert_matches!(result, Err(ChainClientError::LocalNodeError(_)));

Ok(())
}
40 changes: 31 additions & 9 deletions linera-core/src/unit_tests/wasm_client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ use crate::client::client_tests::RocksDbStorageBuilder;
use crate::client::client_tests::ScyllaDbStorageBuilder;
#[cfg(feature = "storage-service")]
use crate::client::client_tests::ServiceStorageBuilder;
use crate::client::client_tests::{MemoryStorageBuilder, StorageBuilder, TestBuilder};
use crate::client::{
client_tests::{MemoryStorageBuilder, StorageBuilder, TestBuilder},
ChainClientError,
};

#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
Expand Down Expand Up @@ -90,24 +93,31 @@ async fn run_test_create_application<B>(storage_builder: B) -> anyhow::Result<()
where
B: StorageBuilder,
{
let (contract_path, service_path) =
linera_execution::wasm_test::get_example_bytecode_paths("counter")?;
let contract_bytecode = Bytecode::load_from_file(contract_path).await?;
let service_bytecode = Bytecode::load_from_file(service_path).await?;
let contract_compressed_len = contract_bytecode.compress().compressed_bytes.len();
let service_compressed_len = service_bytecode.compress().compressed_bytes.len();

let mut policy = ResourceControlPolicy::all_categories();
policy.maximum_bytecode_size = contract_bytecode
.bytes
.len()
.max(service_bytecode.bytes.len()) as u64;
policy.maximum_blob_size = contract_compressed_len.max(service_compressed_len) as u64;
let mut builder = TestBuilder::new(storage_builder, 4, 1)
.await?
.with_policy(ResourceControlPolicy::all_categories());
.with_policy(policy.clone());
let publisher = builder
.add_initial_chain(ChainDescription::Root(0), Amount::from_tokens(3))
.await?;
let creator = builder
.add_initial_chain(ChainDescription::Root(1), Amount::ONE)
.await?;

let (contract_path, service_path) =
linera_execution::wasm_test::get_example_bytecode_paths("counter")?;

let (bytecode_id, _cert) = publisher
.publish_bytecode(
Bytecode::load_from_file(contract_path).await?,
Bytecode::load_from_file(service_path).await?,
)
.publish_bytecode(contract_bytecode, service_bytecode)
.await
.unwrap()
.unwrap();
Expand Down Expand Up @@ -148,6 +158,18 @@ where
let balance_after_init = creator.local_balance().await?;
assert!(balance_after_init < balance_after_messaging);

let large_bytecode = Bytecode::new(vec![0; policy.maximum_bytecode_size as usize + 1]);
let small_bytecode = Bytecode::new(vec![]);
// Publishing bytecode that exceeds the limit fails.
let result = publisher
.publish_bytecode(large_bytecode.clone(), small_bytecode.clone())
.await;
assert_matches!(result, Err(ChainClientError::LocalNodeError(_)));
let result = publisher
.publish_bytecode(small_bytecode, large_bytecode)
.await;
assert_matches!(result, Err(ChainClientError::LocalNodeError(_)));

Ok(())
}

Expand Down
Loading

0 comments on commit a0003ce

Please sign in to comment.