Skip to content

Commit

Permalink
Update transport library according to the RFC (#137)
Browse files Browse the repository at this point in the history
Adapting the protocol for several breaking changes:
- logs are pulled instead of pushed,
- scheduler doesn't actively participate in the network,
- pings (renamed to heartbeats) reference assignment files to minimize their size,
- message signatures are updated to support verifying executed queries.

https://github.com/subsquid/specs/tree/main/network-rfc
  • Loading branch information
kalabukdima committed Dec 12, 2024
1 parent db048cb commit f0033c1
Show file tree
Hide file tree
Showing 34 changed files with 1,549 additions and 1,003 deletions.
481 changes: 444 additions & 37 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/bootnode/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn main() -> anyhow::Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let cli = Cli::parse();
let listen_addrs = cli.transport.listen_addrs();
let keypair = get_keypair(cli.transport.key).await?;
let keypair = get_keypair(Some(cli.transport.key)).await?;
let local_peer_id = PeerId::from(keypair.public());
log::info!("Local peer ID: {local_peer_id}");

Expand Down
7 changes: 4 additions & 3 deletions crates/contract-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
[package]
name = "sqd-contract-client"
license = "AGPL-3.0-or-later"
version = "1.2.0"
version = "1.2.1"
edition = "2021"

[dependencies]

async-trait = "0.1"
clap = { version = "4", features = ["derive", "env"] }
ethers = { version = "2", features = ["ws"] }
futures = "0.3"
libp2p = { workspace = true }
log = "0.4"
num-rational = "0.4"
serde = "1"
thiserror = "1"
tokio = { version = "1", features = ["sync"] }
tokio = { version = "1", features = ["sync", "macros"] }
tokio-stream = "0.1"
url = "2"
num-rational = "0.4"

[dev-dependencies]
anyhow = "1"
Expand Down
39 changes: 24 additions & 15 deletions crates/contract-client/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,25 @@ use clap::{Args, ValueEnum};

use crate::Address;

#[derive(Args)]
#[derive(Args, Clone)]
pub struct RpcArgs {
#[arg(
long,
env,
help = "Blockchain RPC URL",
default_value = "http://127.0.0.1:8545/"
)]
/// Blockchain RPC URL
#[arg(long, env)]
pub rpc_url: String,
#[arg(
long,
env,
help = "Layer 1 blockchain RPC URL. If not provided, rpc_url is assumed to be L1"
)]
pub l1_rpc_url: Option<String>,

/// Layer 1 blockchain RPC URL
#[arg(long, env)]
pub l1_rpc_url: String,

#[command(flatten)]
contract_addrs: ContractAddrs,
#[arg(long, env, help = "Network to connect to (mainnet or testnet)")]

/// Network to connect to (mainnet or testnet)
#[arg(long, env, default_value = "mainnet")]
pub network: Network,

#[arg(env, hide(true), default_value_t = 500)]
pub contract_workers_per_page: usize,
}

impl RpcArgs {
Expand Down Expand Up @@ -55,7 +55,7 @@ impl RpcArgs {
}
}

#[derive(Args)]
#[derive(Args, Clone)]
pub struct ContractAddrs {
#[arg(long, env)]
pub gateway_registry_contract_addr: Option<Address>,
Expand Down Expand Up @@ -114,3 +114,12 @@ impl Network {
}
}
}

impl std::fmt::Display for Network {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Tethys => write!(f, "tethys"),
Self::Mainnet => write!(f, "mainnet"),
}
}
}
45 changes: 23 additions & 22 deletions crates/contract-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ pub trait Client: Send + Sync + 'static {
) -> Result<Vec<Allocation>, ClientError>;

/// Get the number of compute units available for the portal in the current epoch
async fn portal_compute_units_per_epoch(&self, portal_id: PeerId)
-> Result<u64, ClientError>;
async fn portal_compute_units_per_epoch(&self, portal_id: PeerId) -> Result<u64, ClientError>;

/// Check if the portal uses the default strategy — allocates CUs evenly among workers
async fn portal_uses_default_strategy(&self, portal_id: PeerId) -> Result<bool, ClientError>;
Expand Down Expand Up @@ -177,19 +176,13 @@ pub trait Client: Send + Sync + 'static {

pub async fn get_client(rpc_args: &RpcArgs) -> Result<Box<dyn Client>, ClientError> {
log::info!(
"Initializing contract client. network={:?} rpc_url={} l1_rpc_url={:?}",
"Initializing contract client. network={:?} rpc_url={} l1_rpc_url={}",
rpc_args.network,
rpc_args.rpc_url,
rpc_args.l1_rpc_url
);
let l2_client = Transport::connect(&rpc_args.rpc_url).await?;
let l1_client = match &rpc_args.l1_rpc_url {
Some(rpc_url) => Transport::connect(rpc_url).await?,
None => {
log::warn!("Layer 1 RPC URL not provided. Assuming the main RPC URL is L1");
l2_client.clone()
}
};
let l1_client = Transport::connect(&rpc_args.l1_rpc_url).await?;
let client: Box<dyn Client> = EthersClient::new(l1_client, l2_client, rpc_args).await?;
Ok(client)
}
Expand All @@ -204,6 +197,7 @@ struct EthersClient {
allocations_viewer: AllocationsViewer<Provider<Transport>>,
default_strategy_addr: Address,
multicall_contract_addr: Option<Address>,
active_workers_per_page: usize,
}

impl EthersClient {
Expand Down Expand Up @@ -236,6 +230,7 @@ impl EthersClient {
allocations_viewer,
default_strategy_addr,
multicall_contract_addr: Some(rpc_args.multicall_addr()),
active_workers_per_page: rpc_args.contract_workers_per_page,
}))
}

Expand Down Expand Up @@ -308,16 +303,25 @@ impl Client for EthersClient {
}

async fn active_workers(&self) -> Result<Vec<Worker>, ClientError> {
let workers_call = self.worker_registration.method("getActiveWorkers", ())?;
let onchain_ids_call = self.worker_registration.method("getActiveWorkerIds", ())?;
let mut multicall = self.multicall().await?;
multicall
.add_call::<Vec<contracts::Worker>>(workers_call, false)
.add_call::<Vec<U256>>(onchain_ids_call, false);
let (workers, onchain_ids): (Vec<contracts::Worker>, Vec<U256>) = multicall.call().await?;
// A single getActiveWorkers call should be used instead but it lacks pagination and runs out of gas

let onchain_ids: Vec<U256> =
self.worker_registration.get_active_worker_ids().call().await?;
let calls = onchain_ids.chunks(self.active_workers_per_page).map(|ids| async move {
let mut multicall = self.multicall().await?;
for id in ids {
multicall.add_call::<contracts::Worker>(
self.worker_registration.method("getWorker", *id)?,
false,
);
}
let workers: Vec<contracts::Worker> = multicall.call_array().await?;
Result::<_, ClientError>::Ok(workers)
});

let workers = futures::future::try_join_all(calls).await?.into_iter().flatten();

let workers = workers
.into_iter()
.zip(onchain_ids)
.filter_map(|(worker, onchain_id)| match Worker::new(&worker, onchain_id) {
Ok(worker) => Some(worker),
Expand Down Expand Up @@ -431,10 +435,7 @@ impl Client for EthersClient {
.collect())
}

async fn portal_compute_units_per_epoch(
&self,
portal_id: PeerId,
) -> Result<u64, ClientError> {
async fn portal_compute_units_per_epoch(&self, portal_id: PeerId) -> Result<u64, ClientError> {
let id: Bytes = portal_id.to_bytes().into();
let cus = self.gateway_registry.computation_units_available(id).call().await?;
Ok(cus.try_into().expect("Computation units should not exceed u64 range"))
Expand Down
22 changes: 21 additions & 1 deletion crates/messages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,38 @@ version = "2.0.0"
edition = "2021"

[features]
assignment_reader = ["anyhow", "crypto_box", "flate2", "reqwest", "serde_json", "serde_with", "sha2", "tokio"]
assignment_writer = ["anyhow", "base64", "bs58", "crypto_box", "curve25519-dalek", "hmac", "log", "serde_json", "serde_with", "sha2", "url"]
bitstring = ["bytemuck", "flate2"]
signatures = ["libp2p"]

[dependencies]
anyhow = "1"
anyhow = { version = "1.0", optional = true }
base64 = { version = "0.22.1", optional = true }
bs58 = { version = "0.5.1", optional = true }
bytemuck = { version = "1.19.0", optional = true, features = ["extern_crate_alloc"] }
crypto_box = { version = "0.9.1", optional = true }
curve25519-dalek = { version = "4.1.3", optional = true }
flate2 = { version = "1.0.30", optional = true }
hex = { version = "0.4", features = ["serde"] }
hmac = { version = "0.12.1", optional = true }
log = { version = "0.4", optional = true }
prost = "0.13"
reqwest = { version = "0.12.4", optional = true, features = ["json"] }
semver = { version = "1", optional = true }
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1.0.111", optional = true, features = ["preserve_order"] }
serde_with = { version = "3.11.0", optional = true, features = ["base64"] }
sha2 = { version = "0.10.8", optional = true }
sha3 = "0.10"
tokio = { version = "1.38", optional = true }
url = { version = "2.5.0", optional = true }

libp2p = { workspace = true, optional = true }

[dev-dependencies]
sqd-network-transport = { path = "../transport" }

[build-dependencies]
prost-build = "0.12"

Expand Down
4 changes: 2 additions & 2 deletions crates/messages/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ fn main() -> std::io::Result<()> {
.type_attribute(".", "#[derive(Eq, serde::Serialize, serde::Deserialize)]")
.type_attribute("messages.Range", "#[derive(Copy, Ord, PartialOrd)]")
.skip_debug(["messages.QueryOk"])
.field_attribute("messages.QueryResultSummary.hash", "#[serde(with = \"hex\")]")
.field_attribute("messages.QueryOkSummary.data_hash", "#[serde(with = \"hex\")]")
.field_attribute("messages.Pong.ping_hash", "#[serde(with = \"hex\")]")
.field_attribute("messages.Ping.signature","#[serde(with = \"hex\")]")
.field_attribute("messages.OldPing.signature","#[serde(with = \"hex\")]")
.field_attribute("messages.Query.signature", "#[serde(with = \"hex\")]")
.protoc_arg("--experimental_allow_proto3_optional")
.compile_protos(&["proto/messages.proto"], &["proto/"])?;
Expand Down
74 changes: 4 additions & 70 deletions crates/messages/proto/messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,13 @@ message Range {
}

message RangeSet {
option deprecated = true;

repeated Range ranges = 1;
}

message WorkerState {
option deprecated = true;
map<string, RangeSet> datasets = 1;
}

message DatasetRanges {
option deprecated = true;

string url = 1;
repeated Range ranges = 2;
}

message BitString {
bytes data = 1;
uint64 size = 2;
bytes data = 1; // deflate-compressed bytes with each one being 0x00 or 0x01
uint64 size = 2; // number of total bits in the original bitstring
uint64 ones = 3; // number of ones in the original bitstring
}

// Worker -> Scheduler, Portal, Ping collector
Expand All @@ -39,59 +26,6 @@ message Heartbeat {
optional uint64 stored_bytes = 4;
}

// Worker -> Scheduler, Portal, Ping collector
message OldPing {
option deprecated = true;

optional string worker_id = 1;
optional string version = 2;
optional uint64 stored_bytes = 3;
repeated DatasetRanges stored_ranges = 4;
bytes signature = 5;
}

message HttpHeader {
option deprecated = true;

string name = 1;
string value = 2;
}

message AssignedChunk {
option deprecated = true;

string path = 1; // "0000000000/0000808640-0000816499-b0486318"
repeated uint32 filenames = 2; // index in known_filenames array
}

message DatasetChunks {
option deprecated = true;

string dataset_id = 1; // "s3://moonbeam-evm-1"
string download_url = 3; // "https://moonbeam-evm-1.sqd-datasets.io/"
repeated AssignedChunk chunks = 4;
}

message WorkerAssignment {
option deprecated = true;

repeated DatasetChunks dataset_chunks = 1;
repeated HttpHeader http_headers = 2;
repeated string known_filenames = 3; // "blocks.parquet"
}

// Scheduler -> Worker
message Pong {
option deprecated = true;

bytes ping_hash = 1;
oneof status {
google.protobuf.Empty unsupported_version = 3;
string jailed = 6;
WorkerAssignment active = 7;
}
}

// Portal -> Worker
message Query {
string query_id = 1;
Expand Down Expand Up @@ -146,7 +80,6 @@ message QueryFinished {
// Logs collector -> Worker
message LogsRequest {
uint64 from_timestamp_ms = 1;
uint64 to_timestamp_ms = 2;
optional string last_received_query_id = 3; // query ID of the last collected query
}

Expand All @@ -162,6 +95,7 @@ message QueryExecuted {

uint32 exec_time_micros = 6;
uint64 timestamp_ms = 11;
string worker_version = 15;
oneof result {
QueryOkSummary ok = 7;
QueryError err = 14;
Expand Down
Loading

0 comments on commit f0033c1

Please sign in to comment.