Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: allow crater runs #8171

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
8738150
chore(dev-deps): always specify the version for dev-dependencies
bishopcheckmate Jan 16, 2024
36304c0
test(tower-batch-control): remove zebra-consensus dev dependency
bishopcheckmate Jan 18, 2024
119fb61
test(zebra-consensus): update ed25519 verifier tests with ones from b…
bishopcheckmate Jan 18, 2024
3710896
test(zebra-consensus): restore previous timeout values
bishopcheckmate Jan 19, 2024
d53031b
Merge branch 'main' into chore/allow-crater-runs
bishopcheckmate Jan 19, 2024
77ee350
chore: update dev-deps versions to beta.33
bishopcheckmate Jan 19, 2024
63057e0
chore(tower-batch-control): remove dev-dependency on metrics
bishopcheckmate Jan 21, 2024
230b196
chore(tower-batch-control): remove zebra-chain dev-dependency
bishopcheckmate Jan 23, 2024
5caab5c
Merge branch 'main' into chore/allow-crater-runs
arya2 Jan 24, 2024
e43dced
Merge branch 'main' into chore/allow-crater-runs
oxarbitrage Jan 24, 2024
531370c
Merge branch 'main' into chore/allow-crater-runs
arya2 Jan 26, 2024
8d34992
Merge branch 'main' into chore/allow-crater-runs
arya2 Feb 5, 2024
b2b6a41
Update zebra-scan/Cargo.toml
arya2 Feb 7, 2024
c9e2bad
Merge branch 'main' into chore/allow-crater-runs
arya2 Feb 7, 2024
8b0cfa6
Merge branch 'main' into chore/allow-crater-runs
oxarbitrage Feb 22, 2024
c559370
bump all versions to match current release
oxarbitrage Feb 22, 2024
6c49ba2
fix missed commas in version bumps
oxarbitrage Feb 22, 2024
25ff343
Merge branch 'main' into chore/allow-crater-runs
mpguerra Feb 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4674,6 +4674,7 @@ dependencies = [
"ed25519-zebra",
"futures",
"futures-core",
"metrics 0.22.0",
"pin-project",
"rand 0.8.5",
"rayon",
Expand All @@ -4686,7 +4687,7 @@ dependencies = [
"tower-test",
"tracing",
"tracing-futures",
"zebra-consensus",
"zebra-chain",
"zebra-test",
]

Expand Down
7 changes: 4 additions & 3 deletions tower-batch-control/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ tracing-futures = "0.2.5"

[dev-dependencies]
color-eyre = "0.6.2"
metrics = "0.22.0"
# This is a transitive dependency via color-eyre.
# Enable a feature that makes tinyvec compile much faster.
tinyvec = { version = "1.6.0", features = ["rustc_1_55"] }
Expand All @@ -43,8 +44,8 @@ rand = "0.8.5"

tokio = { version = "1.35.1", features = ["full", "tracing", "test-util"] }
tokio-test = "0.4.3"
tower-fallback = { path = "../tower-fallback/" }
tower-fallback = { path = "../tower-fallback/", version = "0.2.41-beta.8" }
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved
tower-test = "0.4.0"

zebra-consensus = { path = "../zebra-consensus/" }
zebra-test = { path = "../zebra-test/" }
zebra-chain = { path = "../zebra-chain/", version = "1.0.0-beta.32" }
zebra-test = { path = "../zebra-test/", version = "1.0.0-beta.32" }
183 changes: 165 additions & 18 deletions tower-batch-control/tests/ed25519.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,175 @@
//! Test batching using ed25519 verification.

use std::time::Duration;
use std::{
mem,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use color_eyre::{eyre::eyre, Report};
use ed25519_zebra::*;
use futures::stream::{FuturesOrdered, StreamExt};
use futures::FutureExt;
use futures_core::Future;
use rand::thread_rng;
use tokio::sync::{oneshot::error::RecvError, watch};
use tower::{Service, ServiceExt};
use tower_batch_control::Batch;
use tower_batch_control::{Batch, BatchControl};
use tower_fallback::Fallback;
use zebra_chain::primitives::ed25519::batch;

// ============ service impl ============

use zebra_consensus::ed25519::{Item as Ed25519Item, Verifier as Ed25519Verifier};
/// A boxed [`std::error::Error`].
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

/// The type of the batch verifier.
type BatchVerifier = batch::Verifier;

/// The type of verification results.
type VerifyResult = Result<(), Error>;

/// The type of the batch sender channel.
type Sender = watch::Sender<Option<VerifyResult>>;

/// The type of the batch item.
/// This is an `Ed25519Item`.
type Item = batch::Item;

/// Ed25519 signature verifier service
struct Verifier {
/// A batch verifier for ed25519 signatures.
batch: BatchVerifier,

/// A channel for broadcasting the result of a batch to the futures for each batch item.
///
/// Each batch gets a newly created channel, so there is only ever one result sent per channel.
/// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel.
tx: Sender,
}

impl Default for Verifier {
fn default() -> Self {
let batch = BatchVerifier::default();
let (tx, _) = watch::channel(None);
Self { batch, tx }
}
}

impl Verifier {
/// Returns the batch verifier and channel sender from `self`,
/// replacing them with a new empty batch.
fn take(&mut self) -> (BatchVerifier, Sender) {
// Use a new verifier and channel for each batch.
let batch = mem::take(&mut self.batch);

let (tx, _) = watch::channel(None);
let tx = mem::replace(&mut self.tx, tx);

(batch, tx)
}

/// Synchronously process the batch, and send the result using the channel sender.
/// This function blocks until the batch is completed.
fn verify(batch: BatchVerifier, tx: Sender) {
let result = batch.verify(thread_rng());
let _ = tx.send(Some(result));
}

/// Flush the batch using a thread pool, and return the result via the channel.
/// This returns immediately, usually before the batch is completed.
fn flush_blocking(&mut self) {
let (batch, tx) = self.take();

// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// We don't care about execution order here, because this method is only called on drop.
tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx)));
}

/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
async fn flush_spawning(batch: BatchVerifier, tx: Sender) {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
let _ = tx.send(spawn_fifo(move || batch.verify(thread_rng())).await.ok());
}
}

impl Service<BatchControl<Item>> for Verifier {
arya2 marked this conversation as resolved.
Show resolved Hide resolved
type Response = ();
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
match req {
BatchControl::Item(item) => {
tracing::trace!("got ed25519 item");
self.batch.queue(item);
let mut rx = self.tx.subscribe();

Box::pin(async move {
match rx.changed().await {
Ok(()) => {
// We use a new channel for each batch,
// so we always get the correct batch result here.
let result = rx.borrow()
.ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;

if result.is_ok() {
tracing::trace!(?result, "validated ed25519 signature");
metrics::counter!("signatures.ed25519.validated").increment(1);
} else {
tracing::trace!(?result, "invalid ed25519 signature");
metrics::counter!("signatures.ed25519.invalid").increment(1);
}
result.map_err(BoxError::from)
}
Err(_recv_error) => panic!("ed25519 verifier was dropped without flushing"),
}
})
}

BatchControl::Flush => {
tracing::trace!("got ed25519 flush command");

let (batch, tx) = self.take();

Box::pin(Self::flush_spawning(batch, tx).map(Ok))
}
}
}
}

impl Drop for Verifier {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
// This returns immediately, usually before the batch is completed.
self.flush_blocking();
}
}

/// Fires off a task into the Rayon threadpool and awaits the result through a oneshot channel.
async fn spawn_fifo<
E: 'static + std::error::Error + Sync + Send,
F: 'static + FnOnce() -> Result<(), E> + Send,
>(
f: F,
) -> Result<Result<(), E>, RecvError> {
// Rayon doesn't have a spawn function that returns a value,
// so we use a oneshot channel instead.
let (rsp_tx, rsp_rx) = tokio::sync::oneshot::channel();

rayon::spawn_fifo(move || {
let _ = rsp_tx.send(f());
});

rsp_rx.await
}

// =============== testing code ========

Expand All @@ -22,7 +179,7 @@ async fn sign_and_verify<V>(
bad_index: Option<usize>,
) -> Result<(), V::Error>
where
V: Service<Ed25519Item, Response = ()>,
V: Service<Item, Response = ()>,
{
let mut results = FuturesOrdered::new();
for i in 0..n {
Expand Down Expand Up @@ -61,7 +218,7 @@ async fn batch_flushes_on_max_items() -> Result<(), Report> {
// flushing is happening based on hitting max_items.
//
// Create our own verifier, so we don't shut down a shared verifier used by other tests.
let verifier = Batch::new(Ed25519Verifier::default(), 10, 5, Duration::from_secs(1000));
let verifier = Batch::new(Verifier::default(), 10, 5, Duration::from_secs(1000));
timeout(Duration::from_secs(1), sign_and_verify(verifier, 100, None))
.await
.map_err(|e| eyre!(e))?
Expand All @@ -79,12 +236,7 @@ async fn batch_flushes_on_max_latency() -> Result<(), Report> {
// flushing is happening based on hitting max_latency.
//
// Create our own verifier, so we don't shut down a shared verifier used by other tests.
let verifier = Batch::new(
Ed25519Verifier::default(),
100,
10,
Duration::from_millis(500),
);
let verifier = Batch::new(Verifier::default(), 100, 10, Duration::from_millis(500));
timeout(Duration::from_secs(1), sign_and_verify(verifier, 10, None))
.await
.map_err(|e| eyre!(e))?
Expand All @@ -99,13 +251,8 @@ async fn fallback_verification() -> Result<(), Report> {

// Create our own verifier, so we don't shut down a shared verifier used by other tests.
let verifier = Fallback::new(
Batch::new(
Ed25519Verifier::default(),
10,
1,
Duration::from_millis(100),
),
tower::service_fn(|item: Ed25519Item| async move { item.verify_single() }),
Batch::new(Verifier::default(), 10, 1, Duration::from_millis(100)),
tower::service_fn(|item: Item| async move { item.verify_single() }),
);

sign_and_verify(verifier, 100, Some(39))
Expand Down
2 changes: 1 addition & 1 deletion tower-fallback/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ tracing = "0.1.39"
[dev-dependencies]
tokio = { version = "1.35.1", features = ["full", "tracing", "test-util"] }

zebra-test = { path = "../zebra-test/" }
zebra-test = { path = "../zebra-test/", version = "1.0.0-beta.32" }
2 changes: 1 addition & 1 deletion zebra-chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ rand_chacha = "0.3.1"

tokio = { version = "1.35.1", features = ["full", "tracing", "test-util"] }

zebra-test = { path = "../zebra-test/" }
zebra-test = { path = "../zebra-test/", version = "1.0.0-beta.32" }

[[bench]]
name = "block"
Expand Down
6 changes: 3 additions & 3 deletions zebra-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,6 @@ tokio = { version = "1.35.1", features = ["full", "tracing", "test-util"] }
tracing-error = "0.2.0"
tracing-subscriber = "0.3.18"

zebra-state = { path = "../zebra-state", features = ["proptest-impl"] }
zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] }
zebra-test = { path = "../zebra-test/" }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.32", features = ["proptest-impl"] }
zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.32", features = ["proptest-impl"] }
zebra-test = { path = "../zebra-test/", version = "1.0.0-beta.32" }
Loading
Loading