From f0033c16c69078f694e382c3a2c56afb823ce1bc Mon Sep 17 00:00:00 2001 From: Dzmitry Kalabuk Date: Thu, 12 Dec 2024 13:54:43 +0300 Subject: [PATCH] Update transport library according to the RFC (#137) Adapting the protocol for several breaking changes: - logs are pulled instead of pushed, - scheduler doesn't actively participate in the network, - pings (renamed to heartbeats) reference assignment files to minimize their size, - message signatures are updated to support verifying executed queries. https://github.com/subsquid/specs/tree/main/network-rfc --- Cargo.lock | 481 +++++++++++++++-- crates/bootnode/src/main.rs | 2 +- crates/contract-client/Cargo.toml | 7 +- crates/contract-client/src/cli.rs | 39 +- crates/contract-client/src/client.rs | 45 +- crates/messages/Cargo.toml | 22 +- crates/messages/build.rs | 4 +- crates/messages/proto/messages.proto | 74 +-- crates/messages/src/assignments.rs | 483 ++++++++++++++++++ crates/messages/src/bitstring.rs | 71 +++ crates/messages/src/data_chunk.rs | 20 +- crates/messages/src/lib.rs | 46 +- crates/messages/src/range.rs | 8 +- crates/messages/src/signatures.rs | 2 + crates/messages/src/tests/assignment.json | 32 ++ crates/messages/src/versions.rs | 11 +- crates/transport/Cargo.toml | 4 +- crates/transport/src/actors/gateway.rs | 159 ++---- crates/transport/src/actors/logs_collector.rs | 291 ++++++----- crates/transport/src/actors/observer.rs | 37 +- .../transport/src/actors/pings_collector.rs | 35 +- crates/transport/src/actors/scheduler.rs | 99 +--- crates/transport/src/actors/worker.rs | 262 ++++------ crates/transport/src/behaviour/base.rs | 198 +------ .../transport/src/behaviour/node_whitelist.rs | 2 +- crates/transport/src/behaviour/pubsub.rs | 2 +- .../transport/src/behaviour/request_client.rs | 4 +- crates/transport/src/behaviour/wrapped.rs | 2 +- crates/transport/src/builder.rs | 17 +- crates/transport/src/cli.rs | 9 +- crates/transport/src/codec.rs | 4 +- crates/transport/src/lib.rs | 7 +- crates/transport/src/metrics.rs | 48 +- crates/transport/src/protocol.rs | 25 +- 34 files changed, 1549 insertions(+), 1003 deletions(-) create mode 100644 crates/messages/src/assignments.rs create mode 100644 crates/messages/src/bitstring.rs create mode 100644 crates/messages/src/tests/assignment.json diff --git a/Cargo.lock b/Cargo.lock index aa148f1..cc1c213 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -284,13 +284,19 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "attohttpc" version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d9a9bf8b79a749ee0b911b91b671cc2b6c670bdbc7e3dfd537576ddc94bb2a2" dependencies = [ - "http", + "http 0.2.12", "log", "url", ] @@ -454,6 +460,12 @@ version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3ac9f8b63eca6fd385229b3675f6cc0dc5c8a5c8a54a59d4f52ffd670d87b0c" +[[package]] +name = "bytemuck" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d" + [[package]] name = "byteorder" version = "1.5.0" @@ -828,6 +840,35 @@ dependencies = [ "typenum", ] +[[package]] +name = "crypto_box" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16182b4f39a82ec8a6851155cc4c0cda3065bb1db33651726a29e1951de0f009" +dependencies = [ + "aead", + "crypto_secretbox", + "curve25519-dalek", + "salsa20", + "subtle", + "zeroize", +] + +[[package]] +name = "crypto_secretbox" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d6cf87adf719ddf43a805e92c6870a531aedda35ff640442cbaf8674e141e1" +dependencies = [ + "aead", + "cipher", + "generic-array", + "poly1305", + "salsa20", + "subtle", + "zeroize", +] + [[package]] name = "ctr" version = "0.9.2" @@ -1342,7 +1383,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "reqwest", + "reqwest 0.11.27", "serde", "serde_json", "syn 2.0.66", @@ -1404,7 +1445,7 @@ checksum = "e79e5973c26d4baf0ce55520bd732314328cabe53193286671b47144145b9649" dependencies = [ "chrono", "ethers-core", - "reqwest", + "reqwest 0.11.27", "semver", "serde", "serde_json", @@ -1429,7 +1470,7 @@ dependencies = [ "futures-locks", "futures-util", "instant", - "reqwest", + "reqwest 0.11.27", "serde", "serde_json", "thiserror", @@ -1457,12 +1498,12 @@ dependencies = [ "futures-timer", "futures-util", "hashers", - "http", + "http 0.2.12", "instant", "jsonwebtoken", "once_cell", "pin-project", - "reqwest", + "reqwest 0.11.27", "serde", "serde_json", "thiserror", @@ -1615,6 +1656,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1879,7 +1935,26 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", + "indexmap 2.2.6", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", "indexmap 2.2.6", "slab", "tokio", @@ -2046,6 +2121,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -2053,7 +2139,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -2085,9 +2194,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -2099,6 +2208,26 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.6", + "http 1.1.0", + "http-body 1.0.1", + "httparse", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -2106,11 +2235,64 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.29", "rustls 0.21.12", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", +] + +[[package]] +name = "hyper-rustls" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" +dependencies = [ + "futures-util", + "http 1.1.0", + "hyper 1.5.0", + "hyper-util", + "rustls 0.23.10", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.0", + "tower-service", +] + +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.5.0", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "hyper 1.5.0", + "pin-project-lite", + "socket2", + "tokio", + "tower", + "tower-service", + "tracing", ] [[package]] @@ -2201,8 +2383,8 @@ dependencies = [ "attohttpc", "bytes", "futures", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.29", "log", "rand", "tokio", @@ -2303,7 +2485,7 @@ dependencies = [ "socket2", "widestring", "windows-sys 0.48.0", - "winreg", + "winreg 0.50.0", ] [[package]] @@ -3134,6 +3316,23 @@ dependencies = [ "unsigned-varint 0.7.2", ] +[[package]] +name = "native-tls" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "netlink-packet-core" version = "0.4.2" @@ -3374,6 +3573,50 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "openssl" +version = "0.10.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" +dependencies = [ + "bitflags 2.5.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", +] + +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + +[[package]] +name = "openssl-sys" +version = "0.9.104" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45abf306cbf99debc8195b66b7346498d7b10c210de50418b5ccd7ceba08c741" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -4059,11 +4302,11 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", - "http", - "http-body", - "hyper", - "hyper-rustls", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.29", + "hyper-rustls 0.24.2", "ipnet", "js-sys", "log", @@ -4072,21 +4315,64 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rustls 0.21.12", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 0.1.2", "system-configuration", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", "webpki-roots", - "winreg", + "winreg 0.50.0", +] + +[[package]] +name = "reqwest" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7d6d2a27d57148378eb5e111173f4276ad26340ecc5c49a4a2152167a2d6a37" +dependencies = [ + "base64 0.22.1", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.4.6", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.5.0", + "hyper-rustls 0.27.3", + "hyper-tls", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls-pemfile 2.2.0", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "system-configuration", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg 0.52.0", ] [[package]] @@ -4269,11 +4555,20 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" -version = "1.7.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" +checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" [[package]] name = "rustls-webpki" @@ -4361,6 +4656,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "schannel" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -4403,6 +4707,29 @@ dependencies = [ "zeroize", ] +[[package]] +name = "security-framework" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" +dependencies = [ + "bitflags 2.5.0", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa39c7303dc58b5543c94d22c1766b0d31f2ee58306363ea622b10bbc075eaa2" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.23" @@ -4450,6 +4777,7 @@ version = "1.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" dependencies = [ + "indexmap 2.2.6", "itoa", "ryu", "serde", @@ -4478,9 +4806,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.9.0" +version = "3.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69cecfa94848272156ea67b2b1a53f20fc7bc638c4a46d2f8abde08f05f4b857" +checksum = "8e28bdad6db2b8340e449f7108f020b3b092e8583a9e3fb82713e1d4e71fe817" dependencies = [ "base64 0.22.1", "chrono", @@ -4496,9 +4824,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.9.0" +version = "3.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8fee4991ef4f274617a51ad4af30519438dacb2f56ac773b08a1922ff743350" +checksum = "9d846214a9854ef724f3da161b426242d8de7c1fc7de2f89bb1efcb154dca79d" dependencies = [ "darling", "proc-macro2", @@ -4685,7 +5013,7 @@ dependencies = [ [[package]] name = "sqd-contract-client" -version = "1.2.0" +version = "1.2.1" dependencies = [ "anyhow", "async-trait", @@ -4719,18 +5047,33 @@ name = "sqd-messages" version = "2.0.0" dependencies = [ "anyhow", + "base64 0.22.1", + "bs58", + "bytemuck", + "crypto_box", + "curve25519-dalek", + "flate2", "hex", + "hmac", "libp2p", + "log", "prost 0.13.1", "prost-build", + "reqwest 0.12.5", "semver", "serde", + "serde_json", + "serde_with", + "sha2", "sha3", + "sqd-network-transport", + "tokio", + "url", ] [[package]] name = "sqd-network-transport" -version = "1.0.25" +version = "2.0.0" dependencies = [ "anyhow", "async-trait", @@ -4821,7 +5164,7 @@ dependencies = [ "fs2", "hex", "once_cell", - "reqwest", + "reqwest 0.11.27", "semver", "serde", "serde_json", @@ -4859,6 +5202,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "synstructure" version = "0.13.1" @@ -5046,6 +5395,16 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -5056,6 +5415,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls 0.23.10", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.15" @@ -5077,7 +5447,7 @@ dependencies = [ "log", "rustls 0.21.12", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", "tungstenite", "webpki-roots", ] @@ -5140,6 +5510,27 @@ dependencies = [ "winnow 0.6.13", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.2" @@ -5202,7 +5593,7 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http", + "http 0.2.12", "httparse", "log", "rand", @@ -5331,6 +5722,12 @@ dependencies = [ "serde", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.4" @@ -5692,6 +6089,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "winreg" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "ws_stream_wasm" version = "0.7.4" diff --git a/crates/bootnode/src/main.rs b/crates/bootnode/src/main.rs index 9a3971e..2038dcf 100644 --- a/crates/bootnode/src/main.rs +++ b/crates/bootnode/src/main.rs @@ -58,7 +58,7 @@ async fn main() -> anyhow::Result<()> { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); let cli = Cli::parse(); let listen_addrs = cli.transport.listen_addrs(); - let keypair = get_keypair(cli.transport.key).await?; + let keypair = get_keypair(Some(cli.transport.key)).await?; let local_peer_id = PeerId::from(keypair.public()); log::info!("Local peer ID: {local_peer_id}"); diff --git a/crates/contract-client/Cargo.toml b/crates/contract-client/Cargo.toml index 0881add..d6c26d1 100644 --- a/crates/contract-client/Cargo.toml +++ b/crates/contract-client/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "sqd-contract-client" license = "AGPL-3.0-or-later" -version = "1.2.0" +version = "1.2.1" edition = "2021" [dependencies] @@ -9,14 +9,15 @@ edition = "2021" async-trait = "0.1" clap = { version = "4", features = ["derive", "env"] } ethers = { version = "2", features = ["ws"] } +futures = "0.3" libp2p = { workspace = true } log = "0.4" +num-rational = "0.4" serde = "1" thiserror = "1" -tokio = { version = "1", features = ["sync"] } +tokio = { version = "1", features = ["sync", "macros"] } tokio-stream = "0.1" url = "2" -num-rational = "0.4" [dev-dependencies] anyhow = "1" diff --git a/crates/contract-client/src/cli.rs b/crates/contract-client/src/cli.rs index d6c3ac4..aad7ea9 100644 --- a/crates/contract-client/src/cli.rs +++ b/crates/contract-client/src/cli.rs @@ -2,25 +2,25 @@ use clap::{Args, ValueEnum}; use crate::Address; -#[derive(Args)] +#[derive(Args, Clone)] pub struct RpcArgs { - #[arg( - long, - env, - help = "Blockchain RPC URL", - default_value = "http://127.0.0.1:8545/" - )] + /// Blockchain RPC URL + #[arg(long, env)] pub rpc_url: String, - #[arg( - long, - env, - help = "Layer 1 blockchain RPC URL. If not provided, rpc_url is assumed to be L1" - )] - pub l1_rpc_url: Option, + + /// Layer 1 blockchain RPC URL + #[arg(long, env)] + pub l1_rpc_url: String, + #[command(flatten)] contract_addrs: ContractAddrs, - #[arg(long, env, help = "Network to connect to (mainnet or testnet)")] + + /// Network to connect to (mainnet or testnet) + #[arg(long, env, default_value = "mainnet")] pub network: Network, + + #[arg(env, hide(true), default_value_t = 500)] + pub contract_workers_per_page: usize, } impl RpcArgs { @@ -55,7 +55,7 @@ impl RpcArgs { } } -#[derive(Args)] +#[derive(Args, Clone)] pub struct ContractAddrs { #[arg(long, env)] pub gateway_registry_contract_addr: Option
, @@ -114,3 +114,12 @@ impl Network { } } } + +impl std::fmt::Display for Network { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Tethys => write!(f, "tethys"), + Self::Mainnet => write!(f, "mainnet"), + } + } +} \ No newline at end of file diff --git a/crates/contract-client/src/client.rs b/crates/contract-client/src/client.rs index 6a2d06b..7f9c4fa 100644 --- a/crates/contract-client/src/client.rs +++ b/crates/contract-client/src/client.rs @@ -128,8 +128,7 @@ pub trait Client: Send + Sync + 'static { ) -> Result, ClientError>; /// Get the number of compute units available for the portal in the current epoch - async fn portal_compute_units_per_epoch(&self, portal_id: PeerId) - -> Result; + async fn portal_compute_units_per_epoch(&self, portal_id: PeerId) -> Result; /// Check if the portal uses the default strategy — allocates CUs evenly among workers async fn portal_uses_default_strategy(&self, portal_id: PeerId) -> Result; @@ -177,19 +176,13 @@ pub trait Client: Send + Sync + 'static { pub async fn get_client(rpc_args: &RpcArgs) -> Result, ClientError> { log::info!( - "Initializing contract client. network={:?} rpc_url={} l1_rpc_url={:?}", + "Initializing contract client. network={:?} rpc_url={} l1_rpc_url={}", rpc_args.network, rpc_args.rpc_url, rpc_args.l1_rpc_url ); let l2_client = Transport::connect(&rpc_args.rpc_url).await?; - let l1_client = match &rpc_args.l1_rpc_url { - Some(rpc_url) => Transport::connect(rpc_url).await?, - None => { - log::warn!("Layer 1 RPC URL not provided. Assuming the main RPC URL is L1"); - l2_client.clone() - } - }; + let l1_client = Transport::connect(&rpc_args.l1_rpc_url).await?; let client: Box = EthersClient::new(l1_client, l2_client, rpc_args).await?; Ok(client) } @@ -204,6 +197,7 @@ struct EthersClient { allocations_viewer: AllocationsViewer>, default_strategy_addr: Address, multicall_contract_addr: Option
, + active_workers_per_page: usize, } impl EthersClient { @@ -236,6 +230,7 @@ impl EthersClient { allocations_viewer, default_strategy_addr, multicall_contract_addr: Some(rpc_args.multicall_addr()), + active_workers_per_page: rpc_args.contract_workers_per_page, })) } @@ -308,16 +303,25 @@ impl Client for EthersClient { } async fn active_workers(&self) -> Result, ClientError> { - let workers_call = self.worker_registration.method("getActiveWorkers", ())?; - let onchain_ids_call = self.worker_registration.method("getActiveWorkerIds", ())?; - let mut multicall = self.multicall().await?; - multicall - .add_call::>(workers_call, false) - .add_call::>(onchain_ids_call, false); - let (workers, onchain_ids): (Vec, Vec) = multicall.call().await?; + // A single getActiveWorkers call should be used instead but it lacks pagination and runs out of gas + + let onchain_ids: Vec = + self.worker_registration.get_active_worker_ids().call().await?; + let calls = onchain_ids.chunks(self.active_workers_per_page).map(|ids| async move { + let mut multicall = self.multicall().await?; + for id in ids { + multicall.add_call::( + self.worker_registration.method("getWorker", *id)?, + false, + ); + } + let workers: Vec = multicall.call_array().await?; + Result::<_, ClientError>::Ok(workers) + }); + + let workers = futures::future::try_join_all(calls).await?.into_iter().flatten(); let workers = workers - .into_iter() .zip(onchain_ids) .filter_map(|(worker, onchain_id)| match Worker::new(&worker, onchain_id) { Ok(worker) => Some(worker), @@ -431,10 +435,7 @@ impl Client for EthersClient { .collect()) } - async fn portal_compute_units_per_epoch( - &self, - portal_id: PeerId, - ) -> Result { + async fn portal_compute_units_per_epoch(&self, portal_id: PeerId) -> Result { let id: Bytes = portal_id.to_bytes().into(); let cus = self.gateway_registry.computation_units_available(id).call().await?; Ok(cus.try_into().expect("Computation units should not exceed u64 range")) diff --git a/crates/messages/Cargo.toml b/crates/messages/Cargo.toml index c759b0e..4ec3999 100644 --- a/crates/messages/Cargo.toml +++ b/crates/messages/Cargo.toml @@ -5,18 +5,38 @@ version = "2.0.0" edition = "2021" [features] +assignment_reader = ["anyhow", "crypto_box", "flate2", "reqwest", "serde_json", "serde_with", "sha2", "tokio"] +assignment_writer = ["anyhow", "base64", "bs58", "crypto_box", "curve25519-dalek", "hmac", "log", "serde_json", "serde_with", "sha2", "url"] +bitstring = ["bytemuck", "flate2"] signatures = ["libp2p"] [dependencies] -anyhow = "1" +anyhow = { version = "1.0", optional = true } +base64 = { version = "0.22.1", optional = true } +bs58 = { version = "0.5.1", optional = true } +bytemuck = { version = "1.19.0", optional = true, features = ["extern_crate_alloc"] } +crypto_box = { version = "0.9.1", optional = true } +curve25519-dalek = { version = "4.1.3", optional = true } +flate2 = { version = "1.0.30", optional = true } hex = { version = "0.4", features = ["serde"] } +hmac = { version = "0.12.1", optional = true } +log = { version = "0.4", optional = true } prost = "0.13" +reqwest = { version = "0.12.4", optional = true, features = ["json"] } semver = { version = "1", optional = true } serde = { version = "1", features = ["derive"] } +serde_json = { version = "1.0.111", optional = true, features = ["preserve_order"] } +serde_with = { version = "3.11.0", optional = true, features = ["base64"] } +sha2 = { version = "0.10.8", optional = true } sha3 = "0.10" +tokio = { version = "1.38", optional = true } +url = { version = "2.5.0", optional = true } libp2p = { workspace = true, optional = true } +[dev-dependencies] +sqd-network-transport = { path = "../transport" } + [build-dependencies] prost-build = "0.12" diff --git a/crates/messages/build.rs b/crates/messages/build.rs index d73c4ef..f9c92c2 100644 --- a/crates/messages/build.rs +++ b/crates/messages/build.rs @@ -5,9 +5,9 @@ fn main() -> std::io::Result<()> { .type_attribute(".", "#[derive(Eq, serde::Serialize, serde::Deserialize)]") .type_attribute("messages.Range", "#[derive(Copy, Ord, PartialOrd)]") .skip_debug(["messages.QueryOk"]) - .field_attribute("messages.QueryResultSummary.hash", "#[serde(with = \"hex\")]") + .field_attribute("messages.QueryOkSummary.data_hash", "#[serde(with = \"hex\")]") .field_attribute("messages.Pong.ping_hash", "#[serde(with = \"hex\")]") - .field_attribute("messages.Ping.signature","#[serde(with = \"hex\")]") + .field_attribute("messages.OldPing.signature","#[serde(with = \"hex\")]") .field_attribute("messages.Query.signature", "#[serde(with = \"hex\")]") .protoc_arg("--experimental_allow_proto3_optional") .compile_protos(&["proto/messages.proto"], &["proto/"])?; diff --git a/crates/messages/proto/messages.proto b/crates/messages/proto/messages.proto index ec24846..e0f5660 100644 --- a/crates/messages/proto/messages.proto +++ b/crates/messages/proto/messages.proto @@ -8,26 +8,13 @@ message Range { } message RangeSet { - option deprecated = true; - repeated Range ranges = 1; } -message WorkerState { - option deprecated = true; - map datasets = 1; -} - -message DatasetRanges { - option deprecated = true; - - string url = 1; - repeated Range ranges = 2; -} - message BitString { - bytes data = 1; - uint64 size = 2; + bytes data = 1; // deflate-compressed bytes with each one being 0x00 or 0x01 + uint64 size = 2; // number of total bits in the original bitstring + uint64 ones = 3; // number of ones in the original bitstring } // Worker -> Scheduler, Portal, Ping collector @@ -39,59 +26,6 @@ message Heartbeat { optional uint64 stored_bytes = 4; } -// Worker -> Scheduler, Portal, Ping collector -message OldPing { - option deprecated = true; - - optional string worker_id = 1; - optional string version = 2; - optional uint64 stored_bytes = 3; - repeated DatasetRanges stored_ranges = 4; - bytes signature = 5; -} - -message HttpHeader { - option deprecated = true; - - string name = 1; - string value = 2; -} - -message AssignedChunk { - option deprecated = true; - - string path = 1; // "0000000000/0000808640-0000816499-b0486318" - repeated uint32 filenames = 2; // index in known_filenames array -} - -message DatasetChunks { - option deprecated = true; - - string dataset_id = 1; // "s3://moonbeam-evm-1" - string download_url = 3; // "https://moonbeam-evm-1.sqd-datasets.io/" - repeated AssignedChunk chunks = 4; -} - -message WorkerAssignment { - option deprecated = true; - - repeated DatasetChunks dataset_chunks = 1; - repeated HttpHeader http_headers = 2; - repeated string known_filenames = 3; // "blocks.parquet" -} - -// Scheduler -> Worker -message Pong { - option deprecated = true; - - bytes ping_hash = 1; - oneof status { - google.protobuf.Empty unsupported_version = 3; - string jailed = 6; - WorkerAssignment active = 7; - } -} - // Portal -> Worker message Query { string query_id = 1; @@ -146,7 +80,6 @@ message QueryFinished { // Logs collector -> Worker message LogsRequest { uint64 from_timestamp_ms = 1; - uint64 to_timestamp_ms = 2; optional string last_received_query_id = 3; // query ID of the last collected query } @@ -162,6 +95,7 @@ message QueryExecuted { uint32 exec_time_micros = 6; uint64 timestamp_ms = 11; + string worker_version = 15; oneof result { QueryOkSummary ok = 7; QueryError err = 14; diff --git a/crates/messages/src/assignments.rs b/crates/messages/src/assignments.rs new file mode 100644 index 0000000..f355025 --- /dev/null +++ b/crates/messages/src/assignments.rs @@ -0,0 +1,483 @@ +use core::str; +use std::borrow::Borrow; +use std::collections::HashMap; +#[cfg(feature = "assignment_reader")] +use std::collections::{BTreeMap, VecDeque}; +use std::ops::Deref; +use std::str::FromStr; + +use anyhow::anyhow; +#[cfg(feature = "assignment_writer")] +use crypto_box::aead::{AeadCore, OsRng}; +use crypto_box::{aead::Aead, PublicKey, SalsaBox, SecretKey}; +#[cfg(feature = "assignment_writer")] +use curve25519_dalek::edwards::CompressedEdwardsY; +#[cfg(feature = "assignment_reader")] +use flate2::read::GzDecoder; +use libp2p::PeerId; +#[cfg(feature = "assignment_writer")] +use log::error; +#[cfg(feature = "assignment_reader")] +use prost::bytes::Bytes; +use serde::{Deserialize, Serialize}; +#[cfg(feature = "assignment_reader")] +use serde_json::Value; +use serde_with::base64::Base64; +use serde_with::serde_as; +#[cfg(feature = "assignment_reader")] +use sha2::Digest; +#[cfg(feature = "assignment_reader")] +use sha2::Sha512; +#[cfg(feature = "assignment_reader")] +use sha3::digest::generic_array::GenericArray; + +#[cfg(feature = "assignment_writer")] +use base64::{engine::general_purpose::STANDARD as base64, Engine}; +#[cfg(feature = "assignment_writer")] +use hmac::{Hmac, Mac}; +#[cfg(feature = "assignment_writer")] +use sha2::Sha256; +#[cfg(feature = "assignment_writer")] +use url::form_urlencoded; + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Chunk { + pub id: String, + pub base_url: String, + pub files: HashMap, + pub size_bytes: u64, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Dataset { + pub id: String, + pub base_url: String, + pub chunks: Vec, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +struct Headers { + worker_id: String, + worker_signature: String, +} + +#[serde_as] +#[derive(Debug, Default, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +struct EncryptedHeaders { + #[serde_as(as = "Base64")] + identity: Vec, + #[serde_as(as = "Base64")] + nonce: Vec, + #[serde_as(as = "Base64")] + ciphertext: Vec, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum WorkerStatus { + #[serde(alias = "Ok")] + Ok, + Unreliable, + DeprecatedVersion, + UnsupportedVersion, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct WorkerAssignment { + status: WorkerStatus, + jail_reason: Option, + chunks_deltas: Vec, + encrypted_headers: EncryptedHeaders, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Assignment { + pub datasets: Vec, + worker_assignments: HashMap, + #[cfg(feature = "assignment_writer")] + #[serde(skip)] + chunk_map: Option>, + #[serde(skip)] + pub id: String, +} + +#[derive(Serialize, Deserialize)] +pub struct NetworkAssignment { + pub url: String, + pub id: String, +} + +#[derive(Serialize, Deserialize)] +pub struct NetworkState { + pub network: String, + pub assignment: NetworkAssignment, +} + +impl Assignment { + #[cfg(feature = "assignment_writer")] + pub fn add_chunk(&mut self, chunk: Chunk, dataset_id: String, dataset_url: String) { + match self.datasets.iter_mut().find(|dataset| dataset.id == dataset_id) { + Some(dataset) => dataset.chunks.push(chunk), + None => self.datasets.push(Dataset { + id: dataset_id, + base_url: dataset_url, + chunks: vec![chunk], + }), + } + self.chunk_map = None + } + + #[cfg(feature = "assignment_reader")] + async fn parse_compressed_assignment( + compressed_assignment: Bytes, + ) -> Result { + let task = tokio::task::spawn_blocking(move || { + let decoder = GzDecoder::new(&compressed_assignment[..]); + serde_json::from_reader(decoder) + }); + Ok(task.await??) + } + + #[cfg(feature = "assignment_reader")] + pub async fn try_download( + url: String, + previous_id: Option, + ) -> Result, anyhow::Error> { + let response_state = reqwest::get(url).await?; + let network_state: NetworkState = response_state.json().await?; + if Some(network_state.assignment.id.clone()) == previous_id { + return Ok(None); + } + let assignment_url = network_state.assignment.url; + let response_assignment = reqwest::get(assignment_url).await?; + let compressed_assignment = response_assignment.bytes().await?; + let mut result = Self::parse_compressed_assignment(compressed_assignment).await?; + result.id = network_state.assignment.id; + Ok(Some(result)) + } + + #[cfg(feature = "assignment_writer")] + pub fn insert_assignment( + &mut self, + peer_id: PeerId, + jail_reason: Option, + chunks_deltas: Vec, + ) { + let status = match jail_reason { + Some(_) => WorkerStatus::Unreliable, + None => WorkerStatus::Ok, + }; + self.worker_assignments.insert( + peer_id.into(), + WorkerAssignment { + status, + jail_reason, + chunks_deltas, + encrypted_headers: Default::default(), + }, + ); + } + + #[cfg(feature = "assignment_reader")] + pub fn get_all_peer_ids(&self) -> Vec { + self.worker_assignments.keys().map(|peer_id| **peer_id).collect() + } + + #[cfg(feature = "assignment_reader")] + pub fn dataset_chunks_for_peer_id(&self, peer_id: &PeerId) -> Option> { + let local_assignment = self.worker_assignments.get(peer_id)?; + let mut result: Vec = Default::default(); + let mut idxs: VecDeque = Default::default(); + let mut cursor = 0; + for v in &local_assignment.chunks_deltas { + cursor += v; + idxs.push_back(cursor); + } + cursor = 0; + for u in &self.datasets { + if idxs.is_empty() { + break; + } + let mut filtered_chunks: Vec = Default::default(); + for v in &u.chunks { + if idxs[0] < cursor { + return None; // Malformed diffs + } + if idxs[0] == cursor { + filtered_chunks.push(v.clone()); + idxs.pop_front(); + } + if idxs.is_empty() { + break; + } + cursor += 1; + } + if !filtered_chunks.is_empty() { + result.push(Dataset { + id: u.id.clone(), + base_url: u.base_url.clone(), + chunks: filtered_chunks, + }); + } + } + Some(result) + } + + #[cfg(feature = "assignment_reader")] + pub fn headers_for_peer_id( + &self, + peer_id: &PeerId, + secret_key: &Vec, + ) -> Result, anyhow::Error> { + let Some(local_assignment) = self.worker_assignments.get(peer_id) else { + return Err(anyhow!("Can not find assignment for {peer_id}")); + }; + let EncryptedHeaders { + identity, + nonce, + ciphertext, + } = &local_assignment.encrypted_headers; + let temporary_public_key = PublicKey::from_slice(identity.as_slice())?; + let big_slice = Sha512::default().chain_update(secret_key).finalize(); + let worker_secret_key = SecretKey::from_slice(&big_slice[00..32])?; + let shared_box = SalsaBox::new(&temporary_public_key, &worker_secret_key); + let generic_nonce = GenericArray::clone_from_slice(nonce); + let Ok(decrypted_plaintext) = shared_box.decrypt(&generic_nonce, &ciphertext[..]) else { + return Err(anyhow!("Can not decrypt payload")); + }; + let plaintext_headers = std::str::from_utf8(&decrypted_plaintext)?; + let headers = serde_json::from_str::(plaintext_headers)?; + let mut result: BTreeMap = Default::default(); + let Some(headers_dict) = headers.as_object() else { + return Err(anyhow!("Can not parse encrypted map")); + }; + for (k, v) in headers_dict { + result.insert(k.to_string(), v.as_str().unwrap().to_string()); + } + Ok(result) + } + + #[cfg(feature = "assignment_reader")] + pub fn worker_status(&self, peer_id: &PeerId) -> Option { + let local_assignment = self.worker_assignments.get(peer_id)?; + Some(local_assignment.status) + } + + #[cfg(feature = "assignment_reader")] + pub fn worker_jail_reason(&self, peer_id: &PeerId) -> Result, anyhow::Error> { + let Some(local_assignment) = self.worker_assignments.get(peer_id) else { + return Err(anyhow!("Can not find assignment for {peer_id}")); + }; + Ok(local_assignment.jail_reason.clone()) + } + + #[cfg(feature = "assignment_writer")] + pub fn chunk_index(&mut self, chunk_id: String) -> Option { + if self.chunk_map.is_none() { + let mut chunk_map: HashMap = Default::default(); + let mut idx = 0; + for dataset in &self.datasets { + for chunk in &dataset.chunks { + chunk_map.insert(chunk.id.clone(), idx); + idx += 1; + } + } + self.chunk_map = Some(chunk_map); + }; + self.chunk_map.as_ref().unwrap().get(&chunk_id).cloned() + } + + #[cfg(feature = "assignment_writer")] + fn encrypt( + worker_id: &String, + secret_key: &SecretKey, + plaintext: &[u8], + ) -> Result<(Vec, Vec), anyhow::Error> { + let peer_id_decoded = bs58::decode(worker_id).into_vec()?; + if peer_id_decoded.len() != 38 { + return Err(anyhow!("WorkerID parsing failed")); + } + let pub_key_edvards_bytes = &peer_id_decoded[6..]; + let public_edvards_compressed = CompressedEdwardsY::from_slice(pub_key_edvards_bytes)?; + let public_edvards = public_edvards_compressed + .decompress() + .ok_or(anyhow!("Failed to decompress Edwards point"))?; + let public_montgomery = public_edvards.to_montgomery(); + let worker_public_key = PublicKey::from(public_montgomery); + + let shared_box = SalsaBox::new(&worker_public_key, secret_key); + let nonce = SalsaBox::generate_nonce(&mut OsRng); + let ciphertext = + shared_box.encrypt(&nonce, plaintext).map_err(|err| anyhow!("Error {err:?}"))?; + Ok((ciphertext, nonce.to_vec())) + } + + #[cfg(feature = "assignment_writer")] + pub fn regenerate_headers(&mut self, cloudflare_storage_secret: &str) { + let temporary_secret_key = SecretKey::generate(&mut OsRng); + let temporary_public_key_bytes = *temporary_secret_key.public_key().as_bytes(); + + for (worker_id, worker_assignment) in &mut self.worker_assignments { + let worker_signature = + timed_hmac_now(&worker_id.to_string(), cloudflare_storage_secret); + + let headers = Headers { + worker_id: worker_id.to_string(), + worker_signature, + }; + let plaintext = serde_json::to_vec(&headers).unwrap(); + let (ciphertext, nonce) = + match Self::encrypt(&worker_id.to_string(), &temporary_secret_key, &plaintext) { + Ok(val) => val, + Err(err) => { + error!("Error while processing headers for {worker_id}: {err:?}"); + continue; + } + }; + + worker_assignment.encrypted_headers = EncryptedHeaders { + identity: temporary_public_key_bytes.to_vec(), + nonce, + ciphertext, + }; + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +struct SerdePeerId { + peer_id: PeerId, +} + +impl Deref for SerdePeerId { + type Target = PeerId; + + fn deref(&self) -> &Self::Target { + &self.peer_id + } +} + +impl From for SerdePeerId { + fn from(peer_id: PeerId) -> Self { + SerdePeerId { peer_id } + } +} + +impl Borrow for SerdePeerId { + fn borrow(&self) -> &PeerId { + &self.peer_id + } +} + +impl std::fmt::Display for SerdePeerId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.peer_id.to_base58()) + } +} + +impl Serialize for SerdePeerId { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.peer_id.to_base58().serialize(serializer) + } +} + +impl<'l> Deserialize<'l> for SerdePeerId { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'l>, + { + let s = String::deserialize(deserializer)?; + Ok(SerdePeerId { + peer_id: PeerId::from_str(&s).map_err(|_| { + serde::de::Error::invalid_value(serde::de::Unexpected::Str(&s), &"PeerId") + })?, + }) + } +} + +// Generate signature with timestamp as required by Cloudflare's is_timed_hmac_valid_v0 function +// https://developers.cloudflare.com/ruleset-engine/rules-language/functions/#hmac-validation +#[cfg(feature = "assignment_writer")] +pub fn timed_hmac(message: &str, secret: &str, timestamp: usize) -> String { + let mut mac = Hmac::::new_from_slice(secret.as_bytes()).unwrap(); + mac.update(format!("{}{}", message, timestamp).as_bytes()); + let digest = mac.finalize().into_bytes(); + let token: String = + form_urlencoded::byte_serialize(base64.encode(digest.as_slice()).as_bytes()).collect(); + format!("{timestamp}-{token}") +} + +#[cfg(feature = "assignment_writer")] +pub fn timed_hmac_now(message: &str, secret: &str) -> String { + let timestamp = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs().try_into().unwrap(); + timed_hmac(message, secret, timestamp) +} + +#[cfg(feature = "assignment_writer")] +#[test] +fn test_hmac_sign() { + let message = "12D3KooWBwbQFT48cNYGPbDwm8rjasbZkc1VMo6rCR6217qr165S"; + let secret = "test_secret"; + let timestamp = 1715662737; + let expected = "1715662737-E%2BaW1Y5hS587YGeJFKGTnp%2Fhn8rEMmSRlEslPiOQsuE%3D"; + assert_eq!(timed_hmac(message, secret, timestamp), expected); +} + +#[cfg(all(feature = "assignment_writer", feature = "assignment_reader"))] +#[cfg(test)] +mod tests { + use sqd_network_transport::Keypair; + + use super::*; + + #[test] + fn it_works() { + let mut assignment: Assignment = Default::default(); + let keypair = Keypair::generate_ed25519(); + let peer_id = keypair.public().to_peer_id(); + let private_key = keypair.try_into_ed25519().unwrap().secret(); + + assignment.insert_assignment(peer_id, Some("Ok".to_owned()), Default::default()); + assignment.regenerate_headers("SUPERSECRET"); + let headers = assignment + .headers_for_peer_id(&peer_id, &private_key.as_ref().to_vec()) + .unwrap_or_default(); + let decrypted_id = headers.get("worker-id").unwrap(); + assert_eq!(peer_id.to_base58(), decrypted_id.to_owned()); + } + + #[test] + fn serialization() -> anyhow::Result<()> { + let mut assignment: Assignment = Default::default(); + let chunk = Chunk { + id: "00000000/00000000-00001000-0xdeadbeef".to_owned(), + base_url: "00000000/00000000-00001000-0xdeadbeef".to_owned(), + files: [("blocks.parquet".to_owned(), "blocks.parquet".to_owned())] + .into_iter() + .collect(), + size_bytes: 100_000, + }; + assignment.add_chunk( + chunk, + "s3://arbitrum-one".to_owned(), + "https://arbitrum-one.sqd-datasets.io".to_owned(), + ); + assignment.insert_assignment( + PeerId::from_str("12D3KooWBwbQFT48cNYGPbDwm8rjasbZkc1VMo6rCR6217qr165S").unwrap(), + None, + vec![0], + ); + let serialized = serde_json::ser::to_string_pretty(&assignment)?; + assert_eq!(serialized, include_str!("tests/assignment.json")); + Ok(()) + } +} diff --git a/crates/messages/src/bitstring.rs b/crates/messages/src/bitstring.rs new file mode 100644 index 0000000..f3c2832 --- /dev/null +++ b/crates/messages/src/bitstring.rs @@ -0,0 +1,71 @@ +use crate::BitString; + +impl BitString { + pub fn new(slice: &[bool]) -> Self { + use flate2::write::DeflateEncoder; + use std::io::Write; + + let num_ones: u64 = slice.iter().map(|&b| b as u64).sum(); + + let byte_slice: &[u8] = bytemuck::cast_slice(slice); + let mut encoder = DeflateEncoder::new(Vec::new(), flate2::Compression::best()); + encoder.write_all(byte_slice).expect("Couldn't compress data"); + let compressed = encoder.finish().expect("Couldn't finish compressing data"); + + Self { + data: compressed, + size: slice.len() as u64, + ones: num_ones, + } + } + + /// Each byte of the result is either 0x00 or 0x01. + pub fn to_bytes(&self) -> Vec { + use flate2::bufread::DeflateDecoder; + use std::io::Read; + + let mut decoder = DeflateDecoder::new(&self.data[..]); + let mut decompressed = Vec::new(); + decoder.read_to_end(&mut decompressed).expect("Couldn't decompress data"); + decompressed + } + + pub fn to_vec(&self) -> Result, &'static str> { + self.to_bytes() + .into_iter() + .map(|byte| match byte { + 0 => Ok(false), + 1 => Ok(true), + _ => Err("invalid boolean value in parsed BitString"), + }) + .collect() + } + + /// The total number of ones in the encoded bitstring + pub fn ones(&self) -> u64 { + self.ones + } + + /// The total number of zeros in the encoded bitstring + pub fn zeros(&self) -> u64 { + self.size - self.ones + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_conversion() { + let original = vec![true, false, true, true, false, true, true, false]; + let bitstring = BitString::new(&original); + assert_eq!(bitstring.size, 8); + assert_eq!(bitstring.ones(), 5); + assert_eq!(bitstring.zeros(), 3); + let bytes = bitstring.to_bytes(); + assert_eq!(bytes, [1, 0, 1, 1, 0, 1, 1, 0]); + let bools = bitstring.to_vec().unwrap(); + assert_eq!(original, bools); + } +} diff --git a/crates/messages/src/data_chunk.rs b/crates/messages/src/data_chunk.rs index 208e2ae..1b2cd2a 100644 --- a/crates/messages/src/data_chunk.rs +++ b/crates/messages/src/data_chunk.rs @@ -2,14 +2,14 @@ use crate::Range; #[derive(Clone, Ord, PartialOrd, Eq, PartialEq)] pub struct DataChunk { - top: u32, - first_block: u32, - last_block: u32, + top: u64, + first_block: u64, + last_block: u64, last_hash: String, } impl DataChunk { - pub fn new(top: u32, first_block: u32, last_block: u32, last_hash: String) -> Self { + pub fn new(top: u64, first_block: u64, last_block: u64, last_hash: String) -> Self { assert!(top <= first_block); assert!(first_block <= last_block); Self { @@ -21,17 +21,17 @@ impl DataChunk { } #[inline] - pub fn top(&self) -> u32 { + pub fn top(&self) -> u64 { self.top } #[inline] - pub fn first_block(&self) -> u32 { + pub fn first_block(&self) -> u64 { self.first_block } #[inline] - pub fn last_block(&self) -> u32 { + pub fn last_block(&self) -> u64 { self.last_block } } @@ -58,11 +58,11 @@ impl std::str::FromStr for DataChunk { #[allow(clippy::get_first)] fn from_str(s: &str) -> Result { let top_range_split = s.split('/').collect::>(); - let top: u32 = top_range_split.get(0).ok_or(())?.parse().or(Err(()))?; + let top: u64 = top_range_split.get(0).ok_or(())?.parse().or(Err(()))?; let range_str = top_range_split.get(1).ok_or(())?; let range_split = range_str.split('-').collect::>(); - let beg: u32 = range_split.get(0).ok_or(())?.parse().or(Err(()))?; - let end: u32 = range_split.get(1).ok_or(())?.parse().or(Err(()))?; + let beg: u64 = range_split.get(0).ok_or(())?.parse().or(Err(()))?; + let end: u64 = range_split.get(1).ok_or(())?.parse().or(Err(()))?; let hash = range_split.get(2).ok_or(())?; if top <= beg && beg <= end { Ok(Self::new(top, beg, end, (*hash).to_string())) diff --git a/crates/messages/src/lib.rs b/crates/messages/src/lib.rs index ea19118..55032a4 100644 --- a/crates/messages/src/lib.rs +++ b/crates/messages/src/lib.rs @@ -14,15 +14,14 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::{ - collections::HashMap, - fmt::{Debug, Formatter}, - ops::{Deref, DerefMut}, - time::Duration, -}; +use std::fmt::{Debug, Formatter}; pub use prost::Message as ProstMsg; +#[cfg(any(feature = "assignment_reader", feature = "assignment_writer"))] +pub mod assignments; +#[cfg(feature = "bitstring")] +pub mod bitstring; pub mod data_chunk; pub mod range; #[cfg(feature = "signatures")] @@ -32,26 +31,6 @@ mod versions; include!(concat!(env!("OUT_DIR"), "/messages.rs")); -impl Deref for WorkerState { - type Target = HashMap; - - fn deref(&self) -> &Self::Target { - &self.datasets - } -} - -impl DerefMut for WorkerState { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.datasets - } -} - -impl From> for WorkerState { - fn from(datasets: HashMap) -> Self { - Self { datasets } - } -} - impl Debug for QueryOk { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "QueryOk {{ data: <{} bytes> }}", self.data.len(),) @@ -64,17 +43,8 @@ impl From for query_result::Result { } } -impl QueryResult { - pub fn new( - query_id: String, - result: impl Into, - retry_after: Option, - ) -> Self { - Self { - query_id, - result: Some(result.into()), - retry_after_ms: retry_after.map(|d| d.as_millis() as u32), - ..Default::default() - } +impl From for query_executed::Result { + fn from(err: query_error::Err) -> Self { + query_executed::Result::Err(QueryError { err: Some(err) }) } } diff --git a/crates/messages/src/range.rs b/crates/messages/src/range.rs index 873cf18..065911d 100644 --- a/crates/messages/src/range.rs +++ b/crates/messages/src/range.rs @@ -3,7 +3,7 @@ use std::cmp::{max, Ordering}; pub use crate::{Range, RangeSet}; impl Range { - pub fn new(begin: u32, end: u32) -> Self { + pub fn new(begin: u64, end: u64) -> Self { assert!(begin <= end); Self { begin, end } } @@ -14,15 +14,15 @@ impl RangeSet { Self::default() } - pub fn has(&self, point: u32) -> bool { + pub fn has(&self, point: u64) -> bool { self.containing_range(point).is_some() } - pub fn find_containing_range(&self, point: u32) -> Option { + pub fn find_containing_range(&self, point: u64) -> Option { self.containing_range(point).map(|i| self.ranges[i]) } - fn containing_range(&self, point: u32) -> Option { + fn containing_range(&self, point: u64) -> Option { self.ranges .binary_search_by(|i| { if point < i.begin { diff --git a/crates/messages/src/signatures.rs b/crates/messages/src/signatures.rs index df69e66..f33bd97 100644 --- a/crates/messages/src/signatures.rs +++ b/crates/messages/src/signatures.rs @@ -1,3 +1,5 @@ +use std::mem::{size_of, size_of_val}; + use sha3::{Digest, Sha3_256}; use libp2p::{ diff --git a/crates/messages/src/tests/assignment.json b/crates/messages/src/tests/assignment.json new file mode 100644 index 0000000..d76d3bb --- /dev/null +++ b/crates/messages/src/tests/assignment.json @@ -0,0 +1,32 @@ +{ + "datasets": [ + { + "id": "s3://arbitrum-one", + "baseUrl": "https://arbitrum-one.sqd-datasets.io", + "chunks": [ + { + "id": "00000000/00000000-00001000-0xdeadbeef", + "baseUrl": "00000000/00000000-00001000-0xdeadbeef", + "files": { + "blocks.parquet": "blocks.parquet" + }, + "sizeBytes": 100000 + } + ] + } + ], + "workerAssignments": { + "12D3KooWBwbQFT48cNYGPbDwm8rjasbZkc1VMo6rCR6217qr165S": { + "status": "ok", + "jailReason": null, + "chunksDeltas": [ + 0 + ], + "encryptedHeaders": { + "identity": "", + "nonce": "", + "ciphertext": "" + } + } + } +} \ No newline at end of file diff --git a/crates/messages/src/versions.rs b/crates/messages/src/versions.rs index 7b18296..963f090 100644 --- a/crates/messages/src/versions.rs +++ b/crates/messages/src/versions.rs @@ -1,4 +1,4 @@ -use crate::{Heartbeat, OldPing}; +use crate::Heartbeat; impl Heartbeat { pub fn version_matches(&self, req: &semver::VersionReq) -> bool { @@ -9,15 +9,6 @@ impl Heartbeat { } } -impl OldPing { - pub fn version_matches(&self, req: &semver::VersionReq) -> bool { - let Some(version) = self.version.as_ref().and_then(|v| v.parse().ok()) else { - return false; - }; - version_matches(&version, req) - } -} - fn version_matches(version: &semver::Version, req: &semver::VersionReq) -> bool { if req.matches(version) { return true; diff --git a/crates/transport/Cargo.toml b/crates/transport/Cargo.toml index 0dde6e6..590c4f7 100644 --- a/crates/transport/Cargo.toml +++ b/crates/transport/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "sqd-network-transport" license = "AGPL-3.0-or-later" -version = "1.0.25" +version = "2.0.0" edition = "2021" [dependencies] @@ -38,7 +38,7 @@ proto = [] request-client = [] request-server = [] gateway = ["actors", "request-client", "proto"] -logs-collector = ["actors", "request-server", "proto"] +logs-collector = ["actors", "request-client", "proto"] observer = ["actors"] peer-checker = ["actors"] pings-collector = ["actors"] diff --git a/crates/transport/src/actors/gateway.rs b/crates/transport/src/actors/gateway.rs index 97e1cba..6d0eaaf 100644 --- a/crates/transport/src/actors/gateway.rs +++ b/crates/transport/src/actors/gateway.rs @@ -17,10 +17,7 @@ use prost::Message; use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; -use sqd_messages::{ - gateway_log_msg, query_result, GatewayLogMsg, Ping, Query, QueryFinished, QueryResult, - QuerySubmitted, -}; +use sqd_messages::{Heartbeat, Query, QueryResult}; use crate::{ behaviour::{ @@ -28,11 +25,8 @@ use crate::{ request_client::{ClientBehaviour, ClientConfig, ClientEvent, Timeout}, wrapped::{BehaviourWrapper, TToSwarm, Wrapped}, }, - codec::{ProtoCodec, ACK_SIZE}, - protocol::{ - GATEWAY_LOGS_PROTOCOL, MAX_GATEWAY_LOG_SIZE, MAX_QUERY_RESULT_SIZE, MAX_QUERY_SIZE, - QUERY_PROTOCOL, - }, + codec::ProtoCodec, + protocol::{MAX_QUERY_RESULT_SIZE, MAX_QUERY_MSG_SIZE, QUERY_PROTOCOL}, record_event, util::{new_queue, Receiver, Sender, TaskManager, DEFAULT_SHUTDOWN_TIMEOUT}, QueueFull, @@ -40,34 +34,37 @@ use crate::{ #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum GatewayEvent { - Ping { + Heartbeat { peer_id: PeerId, - ping: Ping, + heartbeat: Heartbeat, }, QueryResult { peer_id: PeerId, - result: QueryResult, + query_id: String, + result: Result, }, QueryDropped { query_id: String, }, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum QueryFailure { + Timeout(Timeout), + ValidationError(String), + TransportError(String), +} + #[derive(NetworkBehaviour)] pub struct InnerBehaviour { base: Wrapped, query: Wrapped>>, - logs: Wrapped>>, } #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct GatewayConfig { - pub logs_collector_id: PeerId, pub query_config: ClientConfig, - pub logs_config: ClientConfig, - pub max_query_size: u64, pub max_query_result_size: u64, - pub max_query_log_size: u64, pub queries_queue_size: usize, pub logs_queue_size: usize, pub events_queue_size: usize, @@ -75,14 +72,10 @@ pub struct GatewayConfig { } impl GatewayConfig { - pub fn new(logs_collector_id: PeerId) -> Self { + pub fn new() -> Self { Self { - logs_collector_id, query_config: Default::default(), - logs_config: Default::default(), - max_query_size: MAX_QUERY_SIZE, max_query_result_size: MAX_QUERY_RESULT_SIZE, - max_query_log_size: MAX_GATEWAY_LOG_SIZE, queries_queue_size: 100, logs_queue_size: 100, events_queue_size: 100, @@ -93,33 +86,25 @@ impl GatewayConfig { pub struct GatewayBehaviour { inner: InnerBehaviour, - logs_collector_id: PeerId, query_ids: BTreeMap, dropped_queries: VecDeque, } impl GatewayBehaviour { pub fn new(mut base: BaseBehaviour, config: GatewayConfig) -> Wrapped { - base.subscribe_pings(); - base.allow_peer(config.logs_collector_id); + // TODO: whitelist service nodes + base.subscribe_heartbeats(); let inner = InnerBehaviour { base: base.into(), query: ClientBehaviour::new( - ProtoCodec::new(config.max_query_size, config.max_query_result_size), + ProtoCodec::new(MAX_QUERY_MSG_SIZE, config.max_query_result_size), QUERY_PROTOCOL, config.query_config, ) .into(), - logs: ClientBehaviour::new( - ProtoCodec::new(config.max_query_log_size, ACK_SIZE), - GATEWAY_LOGS_PROTOCOL, - config.logs_config, - ) - .into(), }; Self { inner, - logs_collector_id: config.logs_collector_id, query_ids: Default::default(), dropped_queries: Default::default(), } @@ -127,15 +112,17 @@ impl GatewayBehaviour { } fn on_base_event(&mut self, ev: BaseBehaviourEvent) -> Option { match ev { - BaseBehaviourEvent::Ping { peer_id, ping } => self.on_ping(peer_id, ping), + BaseBehaviourEvent::Heartbeat { peer_id, heartbeat } => { + self.on_heartbeat(peer_id, heartbeat) + } _ => None, } } - fn on_ping(&mut self, peer_id: PeerId, ping: Ping) -> Option { - log::debug!("Got ping from {peer_id}"); - log::trace!("{ping:?}"); - Some(GatewayEvent::Ping { peer_id, ping }) + fn on_heartbeat(&mut self, peer_id: PeerId, heartbeat: Heartbeat) -> Option { + log::debug!("Got heartbeat from {peer_id}"); + log::trace!("{heartbeat:?}"); + Some(GatewayEvent::Heartbeat { peer_id, heartbeat }) } fn take_query_id(&mut self, req_id: &OutboundRequestId) -> Option { @@ -154,19 +141,18 @@ impl GatewayBehaviour { ) -> Option { log::debug!("Got query result from {peer_id}: {result:?}"); let query_id = self.take_query_id(&req_id)?; - match validate_query_result(&query_id, &result) { - Ok(()) => Some(GatewayEvent::QueryResult { peer_id, result }), + let result = match validate_query_result(&query_id, &result) { + Ok(()) => Ok(result), Err(err) => { log::warn!("Invalid result for query {query_id} from peer {peer_id}: {err}"); - Some(GatewayEvent::QueryResult { - peer_id, - result: QueryResult { - query_id, - result: Some(query_result::Result::ServerError(err.to_string())), - }, - }) + Err(QueryFailure::ValidationError(err.to_owned())) } - } + }; + Some(GatewayEvent::QueryResult { + peer_id, + result, + query_id, + }) } fn on_query_failure( @@ -177,11 +163,11 @@ impl GatewayBehaviour { ) -> Option { log::debug!("Query failure: {error} (peer_id={peer_id})"); let query_id = self.take_query_id(&req_id)?; - let result = QueryResult { + Some(GatewayEvent::QueryResult { query_id, - result: Some(query_result::Result::ServerError(error)), - }; - Some(GatewayEvent::QueryResult { peer_id, result }) + peer_id, + result: Err(QueryFailure::TransportError(error)), + }) } fn on_query_event(&mut self, ev: ClientEvent) -> Option { @@ -214,39 +200,26 @@ impl GatewayBehaviour { peer_id: PeerId, timeout: Timeout, ) -> Option { - let Some(query_id) = self.query_ids.remove(&req_id) else { - log::error!("Unknown request ID: {req_id}"); - return None; - }; + let query_id = self.take_query_id(&req_id)?; log::debug!("Query {query_id} timed out"); Some(GatewayEvent::QueryResult { + query_id, peer_id, - result: QueryResult::new(query_id, query_result::Result::Timeout(timeout.to_string())), + result: Err(QueryFailure::Timeout(timeout)), }) } - fn on_logs_event(&mut self, ev: &ClientEvent) -> Option { - log::debug!("Logs event: {ev:?}"); - match ev { - ClientEvent::PeerUnknown { peer_id } => self.inner.base.find_and_dial(*peer_id), - ClientEvent::Timeout { .. } => log::warn!("Sending logs to collector timed out"), - _ => {} - } - None - } - pub fn send_query(&mut self, peer_id: PeerId, mut query: Query) { log::debug!("Sending query {query:?} to {peer_id}"); - // Validate if query has ID and sign it - let query_id = match query.query_id.as_ref() { - Some(id) => id.clone(), - None => return log::error!("Query without ID dropped"), - }; - self.inner.base.sign(&mut query); + // Sign the query + let query_id = query.query_id.clone(); + query + .sign(self.inner.base.keypair(), peer_id) + .expect("query should be valid to sign"); // Validate query size let query_size = query.encoded_len() as u64; - if query_size > MAX_QUERY_SIZE { + if query_size > MAX_QUERY_MSG_SIZE { return log::error!("Query size too large: {query_size}"); } @@ -254,21 +227,8 @@ impl GatewayBehaviour { if let Ok(req_id) = self.inner.query.try_send_request(peer_id, query) { self.query_ids.insert(req_id, query_id); } else { - log::warn!("Outbound message queue full. Query {query_id} dropped."); self.dropped_queries.push_back(query_id) - } - } - - pub fn send_log_msg(&mut self, msg: GatewayLogMsg) { - log::debug!("Sending log message: {msg:?}"); - - let msg_size = msg.encoded_len() as u64; - if msg_size > MAX_GATEWAY_LOG_SIZE { - return log::error!("Log message size too large: {msg_size}"); - } - - if self.inner.logs.try_send_request(self.logs_collector_id, msg).is_err() { - log::error!("Cannot send query logs: outbound queue full") + // TODO: notify the waker } } } @@ -300,7 +260,6 @@ impl BehaviourWrapper for GatewayBehaviour { let ev = match ev { InnerBehaviourEvent::Base(ev) => self.on_base_event(ev), InnerBehaviourEvent::Query(query_res) => self.on_query_event(query_res), - InnerBehaviourEvent::Logs(ev) => self.on_logs_event(&ev), }; ev.map(ToSwarm::GenerateEvent) } @@ -318,7 +277,6 @@ impl BehaviourWrapper for GatewayBehaviour { struct GatewayTransport { swarm: Swarm>, queries_rx: Receiver<(PeerId, Query)>, - logs_rx: Receiver, events_tx: Sender, } @@ -330,7 +288,6 @@ impl GatewayTransport { _ = cancel_token.cancelled() => break, ev = self.swarm.select_next_some() => self.on_swarm_event(ev), Some((peer_id, query)) = self.queries_rx.recv() => self.swarm.behaviour_mut().send_query(peer_id, query), - Some(log_msg) = self.logs_rx.recv() => self.swarm.behaviour_mut().send_log_msg(log_msg), } } log::info!("Shutting down gateway P2P transport"); @@ -348,14 +305,12 @@ impl GatewayTransport { #[derive(Clone)] pub struct GatewayTransportHandle { queries_tx: Sender<(PeerId, Query)>, - logs_tx: Sender, _task_manager: Arc, } impl GatewayTransportHandle { fn new( queries_tx: Sender<(PeerId, Query)>, - logs_tx: Sender, transport: GatewayTransport, shutdown_timeout: Duration, ) -> Self { @@ -363,7 +318,6 @@ impl GatewayTransportHandle { task_manager.spawn(|c| transport.run(c)); Self { queries_tx, - logs_tx, _task_manager: Arc::new(task_manager), } } @@ -371,18 +325,6 @@ impl GatewayTransportHandle { log::debug!("Queueing query {query:?}"); self.queries_tx.try_send((peer_id, query)) } - - pub fn query_submitted(&self, msg: QuerySubmitted) -> Result<(), QueueFull> { - log::debug!("Queueing QuerySubmitted message: {msg:?}"); - let msg = gateway_log_msg::Msg::QuerySubmitted(msg).into(); - self.logs_tx.try_send(msg) - } - - pub fn query_finished(&self, msg: QueryFinished) -> Result<(), QueueFull> { - log::debug!("Queueing QueryFinished message: {msg:?}"); - let msg = gateway_log_msg::Msg::QueryFinished(msg).into(); - self.logs_tx.try_send(msg) - } } pub fn start_transport( @@ -390,15 +332,12 @@ pub fn start_transport( config: GatewayConfig, ) -> (impl Stream, GatewayTransportHandle) { let (queries_tx, queries_rx) = new_queue(config.queries_queue_size, "queries"); - let (logs_tx, logs_rx) = new_queue(config.logs_queue_size, "logs"); let (events_tx, events_rx) = new_queue(config.events_queue_size, "events"); let transport = GatewayTransport { swarm, queries_rx, - logs_rx, events_tx, }; - let handle = - GatewayTransportHandle::new(queries_tx, logs_tx, transport, config.shutdown_timeout); + let handle = GatewayTransportHandle::new(queries_tx, transport, config.shutdown_timeout); (events_rx, handle) } diff --git a/crates/transport/src/actors/logs_collector.rs b/crates/transport/src/actors/logs_collector.rs index 67fbaf8..3d1f6b5 100644 --- a/crates/transport/src/actors/logs_collector.rs +++ b/crates/transport/src/actors/logs_collector.rs @@ -1,134 +1,151 @@ -use std::{sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + future::Future, + pin::Pin, + task::{Context, Poll}, +}; -use futures::StreamExt; -use futures_core::Stream; +use futures::{FutureExt, StreamExt}; use libp2p::{ swarm::{NetworkBehaviour, SwarmEvent, ToSwarm}, PeerId, Swarm, }; use libp2p_swarm_derive::NetworkBehaviour; +use parking_lot::Mutex; +use prost::Message; use serde::{Deserialize, Serialize}; -use tokio_util::sync::CancellationToken; +use thiserror::Error; +use tokio::sync::oneshot; -use sqd_messages::{ - gateway_log_msg, signatures::SignedMessage, GatewayLogMsg, LogsCollected, QueryExecuted, - QueryFinished, QueryLogs, QuerySubmitted, -}; +use sqd_messages::{LogsRequest, QueryLogs}; +use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned}; use crate::{ behaviour::{ - base::{BaseBehaviour, BaseBehaviourEvent}, - request_server::{Request, ServerBehaviour}, + base::BaseBehaviour, + request_client::{ClientBehaviour, ClientEvent, Timeout}, wrapped::{BehaviourWrapper, TToSwarm, Wrapped}, }, - codec::{ProtoCodec, ACK_SIZE}, - protocol::{GATEWAY_LOGS_PROTOCOL, MAX_GATEWAY_LOG_SIZE}, - record_event, - util::{new_queue, Receiver, Sender, TaskManager, DEFAULT_SHUTDOWN_TIMEOUT}, - QueueFull, + codec::ProtoCodec, + protocol::{MAX_LOGS_REQUEST_SIZE, MAX_LOGS_RESPONSE_SIZE, WORKER_LOGS_PROTOCOL}, + record_event, ClientConfig, QueueFull, }; #[derive(Debug, Clone, Serialize, Deserialize)] -pub enum LogsCollectorEvent { - /// Worker reports executed queries (a bundle) - WorkerLogs { - peer_id: PeerId, - logs: Vec, - }, - /// Gateway reports a submitted query - QuerySubmitted(QuerySubmitted), - /// Gateway reports a finished query (result received or timeout) - QueryFinished(QueryFinished), +pub enum LogsCollectorEvent {} + +#[derive(Debug, Clone, Serialize, Deserialize, Error)] +pub enum FetchLogsError { + #[error("Timeout: {0}")] + Timeout(Timeout), + #[error("Failure: {0}")] + Failure(String), } #[derive(NetworkBehaviour)] pub struct InnerBehaviour { base: Wrapped, - gateway_logs: Wrapped>>, + logs: Wrapped>>, } #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct LogsCollectorConfig { - pub max_gateway_log_size: u64, - pub logs_collected_queue_size: usize, - pub events_queue_size: usize, - pub shutdown_timeout: Duration, - pub logs_ack_timeout: Duration, + pub request_config: ClientConfig, } impl Default for LogsCollectorConfig { fn default() -> Self { Self { - max_gateway_log_size: MAX_GATEWAY_LOG_SIZE, - logs_collected_queue_size: 100, - events_queue_size: 100, - shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT, - logs_ack_timeout: Duration::from_secs(5), + request_config: Default::default(), } } } pub struct LogsCollectorBehaviour { inner: InnerBehaviour, + resp_senders: HashMap>>, } impl LogsCollectorBehaviour { pub fn new( - mut base: BaseBehaviour, - local_peer_id: PeerId, + base: BaseBehaviour, + _local_peer_id: PeerId, config: LogsCollectorConfig, ) -> Wrapped { - base.subscribe_worker_logs(local_peer_id); Self { inner: InnerBehaviour { base: base.into(), - gateway_logs: ServerBehaviour::new( - ProtoCodec::new(config.max_gateway_log_size, ACK_SIZE), - GATEWAY_LOGS_PROTOCOL, - config.logs_ack_timeout, + logs: ClientBehaviour::new( + ProtoCodec::new(MAX_LOGS_REQUEST_SIZE, MAX_LOGS_RESPONSE_SIZE), + WORKER_LOGS_PROTOCOL, + config.request_config, ) .into(), }, + resp_senders: Default::default(), } .into() } - fn on_worker_logs(&mut self, peer_id: PeerId, logs: QueryLogs) -> Option { - let mut logs = logs.queries_executed; - log::debug!("Got {} query logs from {peer_id}", logs.len()); - let worker_id = peer_id.to_base58(); - logs = logs - .into_iter() - .filter_map(|mut log| { - (log.worker_id == worker_id && log.verify_signature(&peer_id)).then_some(log) - }) - .collect(); - (!logs.is_empty()).then_some(LogsCollectorEvent::WorkerLogs { peer_id, logs }) - } - - fn on_gateway_log( + fn on_worker_logs( &mut self, peer_id: PeerId, - log_msg: GatewayLogMsg, + mut logs: QueryLogs, ) -> Option { - log::debug!("Got gateway log from {peer_id}: {log_msg:?}"); - let gateway_id = peer_id.to_base58(); - match log_msg.msg { - Some(gateway_log_msg::Msg::QueryFinished(log)) if log.client_id == gateway_id => { - Some(LogsCollectorEvent::QueryFinished(log)) + log::debug!("Got {} query logs from {peer_id}", logs.queries_executed.len()); + logs.queries_executed.retain(|log| { + if log.verify_client_signature(peer_id) { + true + } else { + log::warn!("Invalid client signature in query log: {log:?}"); + false } - Some(gateway_log_msg::Msg::QuerySubmitted(log)) if log.client_id == gateway_id => { - Some(LogsCollectorEvent::QuerySubmitted(log)) - } - _ => { - log::warn!("Invalid gateway log message: {log_msg:?}"); - None + }); + if let Some(sender) = self.resp_senders.remove(&peer_id) { + if sender.send(Ok(logs)).is_err() { + log::warn!("Logs response channel closed for {peer_id}"); } + } else { + log::warn!("Not expecting query logs for peer {peer_id}"); } + None } - pub fn logs_collected(&mut self, logs_collected: &LogsCollected) { - self.inner.base.publish_logs_collected(logs_collected) + fn on_failure(&mut self, peer_id: PeerId, error: FetchLogsError) -> Option { + log::debug!("Couldn't get query logs from {peer_id}: {error:?}"); + if let Some(sender) = self.resp_senders.remove(&peer_id) { + sender.send(Err(error)).ok(); + } else { + log::warn!("Not expecting query logs for peer {peer_id}"); + } + None + } + + pub fn request_logs( + &mut self, + peer_id: PeerId, + request: LogsRequest, + resp_tx: oneshot::Sender>, + ) -> Result<(), QueueFull> { + let request_size = request.encoded_len() as u64; + if request_size > MAX_LOGS_REQUEST_SIZE { + log::error!("Logs request size too large: {request_size}"); + return Ok(()); + } + + log::debug!( + "Requesting logs from {peer_id} from {}, last query id: {:?}", + request.from_timestamp_ms, + request.last_received_query_id + ); + + let prev = self.resp_senders.insert(peer_id, resp_tx); + if prev.is_some() { + log::warn!("Dropping ongoing logs request to {peer_id}"); + } + + self.inner.logs.try_send_request(peer_id, request)?; + Ok(()) } } @@ -145,91 +162,91 @@ impl BehaviourWrapper for LogsCollectorBehaviour { ev: ::ToSwarm, ) -> impl IntoIterator> { let ev = match ev { - InnerBehaviourEvent::Base(BaseBehaviourEvent::WorkerQueryLogs { - peer_id, - query_logs, - }) => self.on_worker_logs(peer_id, query_logs), - InnerBehaviourEvent::GatewayLogs(Request { - peer_id, - request, - response_channel, - }) => { - _ = self.inner.gateway_logs.try_send_response(response_channel, 1); - self.on_gateway_log(peer_id, request) - } - _ => None, + InnerBehaviourEvent::Base(_) => None, + InnerBehaviourEvent::Logs(client_event) => match client_event { + ClientEvent::Response { + peer_id, response, .. + } => self.on_worker_logs(peer_id, response), + ClientEvent::Timeout { + peer_id, timeout, .. + } => self.on_failure(peer_id, FetchLogsError::Timeout(timeout)), + ClientEvent::PeerUnknown { peer_id } => { + self.inner.base.find_and_dial(peer_id); + None + } + ClientEvent::Failure { peer_id, error, .. } => { + self.on_failure(peer_id, FetchLogsError::Failure(error)) + } + }, }; ev.map(ToSwarm::GenerateEvent) } } -struct LogsCollectorTransport { - swarm: Swarm>, - logs_collected_rx: Receiver, - events_tx: Sender, +pub struct LogsCollectorTransport { + swarm: Mutex>>, } impl LogsCollectorTransport { - pub async fn run(mut self, cancel_token: CancellationToken) { + pub async fn request_logs( + &self, + peer_id: PeerId, + request: LogsRequest, + ) -> Result { + let (resp_tx, resp_rx) = oneshot::channel(); + self.swarm + .lock() + .behaviour_mut() + .request_logs(peer_id, request, resp_tx) + .map_err(|_| FetchLogsError::Failure("Logs request queue full".to_string()))?; + resp_rx + .await + .map_err(|_| FetchLogsError::Failure("Logs response channel closed".to_string()))? + } + + pub fn run(&self, cancel_token: CancellationToken) -> LogsCollectorRunFuture { log::info!("Starting logs collector P2P transport"); - loop { - tokio::select! { - _ = cancel_token.cancelled() => break, - ev = self.swarm.select_next_some() => self.on_swarm_event(ev), - Some(logs_collected) = self.logs_collected_rx.recv() => self.swarm.behaviour_mut().logs_collected(&logs_collected), - } + LogsCollectorRunFuture { + transport: self, + cancelled: Box::pin(cancel_token.cancelled_owned()), } - log::info!("Shutting down logs collector P2P transport"); } - fn on_swarm_event(&mut self, ev: SwarmEvent) { + fn on_swarm_event(&self, ev: SwarmEvent) { log::trace!("Swarm event: {ev:?}"); record_event(&ev); - if let SwarmEvent::Behaviour(ev) = ev { - self.events_tx.send_lossy(ev) - } - } -} - -#[derive(Clone)] -pub struct LogsCollectorTransportHandle { - logs_collected_tx: Sender, - _task_manager: Arc, -} - -impl LogsCollectorTransportHandle { - fn new( - logs_collected_tx: Sender, - transport: LogsCollectorTransport, - shutdown_timeout: Duration, - ) -> Self { - let mut task_manager = TaskManager::new(shutdown_timeout); - task_manager.spawn(|c| transport.run(c)); - Self { - logs_collected_tx, - _task_manager: Arc::new(task_manager), - } - } - - pub fn logs_collected(&self, logs_collected: LogsCollected) -> Result<(), QueueFull> { - log::debug!("Queueing LogsCollected message: {logs_collected:?}"); - self.logs_collected_tx.try_send(logs_collected) } } pub fn start_transport( swarm: Swarm>, - config: LogsCollectorConfig, -) -> (impl Stream, LogsCollectorTransportHandle) { - let (logs_collected_tx, logs_collected_rx) = - new_queue(config.logs_collected_queue_size, "logs_collected"); - let (events_tx, events_rx) = new_queue(config.events_queue_size, "events"); + _config: LogsCollectorConfig, +) -> LogsCollectorTransport { let transport = LogsCollectorTransport { - swarm, - logs_collected_rx, - events_tx, + swarm: Mutex::new(swarm), }; - let handle = - LogsCollectorTransportHandle::new(logs_collected_tx, transport, config.shutdown_timeout); - (events_rx, handle) + transport +} + +pub struct LogsCollectorRunFuture<'t> { + transport: &'t LogsCollectorTransport, + cancelled: Pin>, +} + +impl<'t> Future for LogsCollectorRunFuture<'t> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.cancelled.poll_unpin(cx).is_ready() { + log::info!("Shutting down logs collector P2P transport"); + return Poll::Ready(()); + } + loop { + match self.transport.swarm.lock().poll_next_unpin(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Some(ev)) => self.transport.on_swarm_event(ev), + Poll::Ready(None) => unreachable!("Swarm stream ended"), + } + } + } } diff --git a/crates/transport/src/actors/observer.rs b/crates/transport/src/actors/observer.rs index c457dbf..53253d9 100644 --- a/crates/transport/src/actors/observer.rs +++ b/crates/transport/src/actors/observer.rs @@ -9,7 +9,7 @@ use libp2p::{ use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; -use sqd_messages::{LogsCollected, Ping, QueryLogs}; +use sqd_messages::Heartbeat; use crate::{ behaviour::{ @@ -22,28 +22,18 @@ use crate::{ #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum ObserverEvent { - Ping { - peer_id: PeerId, - ping: Ping, - }, - LogsCollected(LogsCollected), - WorkerQueryLogs { - peer_id: PeerId, - query_logs: QueryLogs, - }, + Ping { peer_id: PeerId, ping: Heartbeat }, } #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct ObserverConfig { - pub logs_collector_id: PeerId, pub events_queue_size: usize, pub shutdown_timeout: Duration, } impl ObserverConfig { - pub fn new(logs_collector_id: PeerId) -> Self { + pub fn new() -> Self { Self { - logs_collector_id, events_queue_size: 100, shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT, } @@ -55,28 +45,17 @@ pub struct ObserverBehaviour { } impl ObserverBehaviour { - pub fn new(mut base: BaseBehaviour, logs_collector_id: PeerId) -> Wrapped { - base.subscribe_pings(); - base.subscribe_worker_logs(logs_collector_id); - base.allow_peer(logs_collector_id); + pub fn new(mut base: BaseBehaviour) -> Wrapped { + base.subscribe_heartbeats(); Self { base: base.into() }.into() } fn on_base_event(&mut self, ev: BaseBehaviourEvent) -> Option { match ev { - BaseBehaviourEvent::LogsCollected(logs_collected) => { - Some(ObserverEvent::LogsCollected(logs_collected)) - } - BaseBehaviourEvent::Ping { peer_id, ping } => { - Some(ObserverEvent::Ping { peer_id, ping }) - } - BaseBehaviourEvent::WorkerQueryLogs { - peer_id, - query_logs, - } => Some(ObserverEvent::WorkerQueryLogs { + BaseBehaviourEvent::Heartbeat { peer_id, - query_logs, - }), + heartbeat: ping, + } => Some(ObserverEvent::Ping { peer_id, ping }), _ => None, } } diff --git a/crates/transport/src/actors/pings_collector.rs b/crates/transport/src/actors/pings_collector.rs index a13ed35..ea72ce2 100644 --- a/crates/transport/src/actors/pings_collector.rs +++ b/crates/transport/src/actors/pings_collector.rs @@ -21,9 +21,9 @@ use crate::{ }; #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Ping { +pub struct Heartbeat { pub peer_id: PeerId, - pub ping: sqd_messages::Ping, + pub heartbeat: sqd_messages::Heartbeat, } #[derive(Debug, Clone, Copy, Serialize, Deserialize)] @@ -47,14 +47,14 @@ pub struct PingsCollectorBehaviour { impl PingsCollectorBehaviour { pub fn new(mut base: BaseBehaviour) -> Wrapped { - base.subscribe_pings(); + base.subscribe_heartbeats(); Self { base: base.into() }.into() } } impl BehaviourWrapper for PingsCollectorBehaviour { type Inner = Wrapped; - type Event = Ping; + type Event = Heartbeat; fn inner(&mut self) -> &mut Self::Inner { &mut self.base @@ -65,10 +65,10 @@ impl BehaviourWrapper for PingsCollectorBehaviour { ev: ::ToSwarm, ) -> impl IntoIterator> { match ev { - BaseBehaviourEvent::Ping { peer_id, ping } => { - log::debug!("Got ping from {peer_id}"); - log::trace!("{ping:?}"); - Some(ToSwarm::GenerateEvent(Ping { peer_id, ping })) + BaseBehaviourEvent::Heartbeat { peer_id, heartbeat } => { + log::debug!("Got heartbeat from {peer_id}"); + log::trace!("{heartbeat:?}"); + Some(ToSwarm::GenerateEvent(Heartbeat { peer_id, heartbeat })) } _ => None, } @@ -77,7 +77,7 @@ impl BehaviourWrapper for PingsCollectorBehaviour { struct PingsCollectorTransport { swarm: Swarm>, - pings_tx: Sender, + heartbeats_tx: Sender, } impl PingsCollectorTransport { @@ -92,11 +92,11 @@ impl PingsCollectorTransport { log::info!("Shutting down pings collector P2P transport"); } - fn on_swarm_event(&mut self, ev: SwarmEvent) { + fn on_swarm_event(&mut self, ev: SwarmEvent) { log::trace!("Swarm event: {ev:?}"); record_event(&ev); - if let SwarmEvent::Behaviour(ping) = ev { - self.pings_tx.send_lossy(ping) + if let SwarmEvent::Behaviour(heartbeat) = ev { + self.heartbeats_tx.send_lossy(heartbeat) } } } @@ -119,9 +119,12 @@ impl PingsCollectorTransportHandle { pub fn start_transport( swarm: Swarm>, config: PingsCollectorConfig, -) -> (impl Stream, PingsCollectorTransportHandle) { - let (pings_tx, pings_rx) = new_queue(config.events_queue_size, "events"); - let transport = PingsCollectorTransport { swarm, pings_tx }; +) -> (impl Stream, PingsCollectorTransportHandle) { + let (heartbeats_tx, heartbeats_rx) = new_queue(config.events_queue_size, "events"); + let transport = PingsCollectorTransport { + swarm, + heartbeats_tx, + }; let handle = PingsCollectorTransportHandle::new(transport, config.shutdown_timeout); - (pings_rx, handle) + (heartbeats_rx, handle) } diff --git a/crates/transport/src/actors/scheduler.rs b/crates/transport/src/actors/scheduler.rs index a7f8249..45a4be6 100644 --- a/crates/transport/src/actors/scheduler.rs +++ b/crates/transport/src/actors/scheduler.rs @@ -8,23 +8,16 @@ use libp2p::{ PeerId, Swarm, }; use libp2p_swarm_derive::NetworkBehaviour; -use prost::Message; use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; -use sqd_messages::{Ping, Pong}; - -#[cfg(feature = "metrics")] -use crate::metrics::PONGS_SENT; +use sqd_messages::Heartbeat; use crate::{ behaviour::{ base::{BaseBehaviour, BaseBehaviourEvent, PeerProbed, ProbeResult, TryProbeError}, - request_client::{ClientBehaviour, ClientConfig, ClientEvent}, wrapped::{BehaviourWrapper, TToSwarm, Wrapped}, }, - codec::{ProtoCodec, ACK_SIZE}, - protocol::{MAX_PONG_SIZE, PONG_PROTOCOL}, record_event, util::{new_queue, Receiver, Sender, TaskManager, DEFAULT_SHUTDOWN_TIMEOUT}, QueueFull, @@ -32,28 +25,22 @@ use crate::{ #[derive(Debug, Clone, Serialize, Deserialize)] pub enum SchedulerEvent { - /// Ping received from a worker - Ping { peer_id: PeerId, ping: Ping }, + /// Heartbeat received from a worker + Heartbeat { + peer_id: PeerId, + heartbeat: Heartbeat, + }, /// Peer was probed for reachability PeerProbed { peer_id: PeerId, reachable: bool }, } -type PongBehaviour = Wrapped>>; - #[derive(NetworkBehaviour)] pub struct InnerBehaviour { base: Wrapped, - pong: PongBehaviour, } #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct SchedulerConfig { - /// `ClientConfig` for the `pong` protocol - pub pong_config: ClientConfig, - /// Maximum size of a pong message in bytes (default: `MAX_PONG_SIZE`) - pub max_pong_size: u64, - /// Maximum number of outbound pongs stored in a queue (default: 1000) - pub pongs_queue_size: usize, /// Maximum number of pending peer probes (reachability checks) stored in a queue (default: 1000) pub probes_queue_size: usize, /// Maximum number of inbound events (`SchedulerEvent`) stored in a queue (default: 1000) @@ -67,9 +54,6 @@ pub struct SchedulerConfig { impl Default for SchedulerConfig { fn default() -> Self { Self { - pong_config: Default::default(), - max_pong_size: MAX_PONG_SIZE, - pongs_queue_size: 1000, probes_queue_size: 1000, events_queue_size: 1000, ignore_existing_conns: false, @@ -83,18 +67,10 @@ pub struct SchedulerBehaviour { } impl SchedulerBehaviour { - pub fn new(mut base: BaseBehaviour, config: SchedulerConfig) -> Wrapped { - base.subscribe_pings(); + pub fn new(mut base: BaseBehaviour, _config: SchedulerConfig) -> Wrapped { + base.subscribe_heartbeats(); Self { - inner: InnerBehaviour { - base: base.into(), - pong: ClientBehaviour::new( - ProtoCodec::new(config.max_pong_size, ACK_SIZE), - PONG_PROTOCOL, - config.pong_config, - ) - .into(), - }, + inner: InnerBehaviour { base: base.into() }, } .into() } @@ -102,16 +78,15 @@ impl SchedulerBehaviour { #[rustfmt::skip] fn on_base_event(&mut self, ev: BaseBehaviourEvent) -> Option { match ev { - BaseBehaviourEvent::Ping { peer_id, ping } => self.on_ping(peer_id, ping), + BaseBehaviourEvent::Heartbeat { peer_id, heartbeat } => self.on_heartbeat(peer_id, heartbeat), BaseBehaviourEvent::PeerProbed(PeerProbed { peer_id, result }) => self.on_peer_probed(peer_id, &result), - _ => None } } - fn on_ping(&mut self, peer_id: PeerId, ping: Ping) -> Option { - log::debug!("Got ping from {peer_id}"); - log::trace!("{ping:?}"); - Some(SchedulerEvent::Ping { peer_id, ping }) + fn on_heartbeat(&mut self, peer_id: PeerId, heartbeat: Heartbeat) -> Option { + log::debug!("Got heartbeat from {peer_id}"); + log::trace!("{heartbeat:?}"); + Some(SchedulerEvent::Heartbeat { peer_id, heartbeat }) } fn on_peer_probed(&mut self, peer_id: PeerId, result: &ProbeResult) -> Option { @@ -120,36 +95,6 @@ impl SchedulerBehaviour { Some(SchedulerEvent::PeerProbed { peer_id, reachable }) } - fn on_pong_event(&mut self, ev: ClientEvent) -> Option { - match ev { - ClientEvent::Response { .. } => {} // response is just ACK, no useful information - ClientEvent::PeerUnknown { peer_id } => self.inner.base.find_and_dial(peer_id), - ClientEvent::Timeout { peer_id, .. } => { - log::warn!("Sending pong to {peer_id} timed out") - } - ClientEvent::Failure { peer_id, error, .. } => { - log::warn!("Sending pong to {peer_id} failed: {error}") - } - } - None - } - - pub fn send_pong(&mut self, peer_id: PeerId, pong: Pong) { - log::debug!("Sending pong to {peer_id}"); - - // Check pong size limit - let pong_size = pong.encoded_len() as u64; - if pong_size > MAX_PONG_SIZE { - return log::error!("Pong size too large: {pong_size}"); - } - - if self.inner.pong.try_send_request(peer_id, pong).is_err() { - log::error!("Cannot send pong to {peer_id}: outbound queue full") - } - #[cfg(feature = "metrics")] - PONGS_SENT.inc(); - } - pub fn try_probe_peer(&mut self, peer_id: PeerId) -> Result<(), TryProbeError> { self.inner.base.try_probe_dht(peer_id) } @@ -169,7 +114,6 @@ impl BehaviourWrapper for SchedulerBehaviour { ) -> impl IntoIterator> { let ev = match ev { InnerBehaviourEvent::Base(ev) => self.on_base_event(ev), - InnerBehaviourEvent::Pong(ev) => self.on_pong_event(ev), }; ev.map(ToSwarm::GenerateEvent) } @@ -177,7 +121,6 @@ impl BehaviourWrapper for SchedulerBehaviour { struct SchedulerTransport { swarm: Swarm>, - pongs_rx: Receiver<(PeerId, Pong)>, probes_rx: Receiver, events_tx: Sender, ignore_existing_conns: bool, @@ -190,7 +133,6 @@ impl SchedulerTransport { tokio::select! { _ = cancel_token.cancelled() => break, ev = self.swarm.select_next_some() => self.on_swarm_event(ev), - Some((peer_id, pong)) = self.pongs_rx.recv() => self.swarm.behaviour_mut().send_pong(peer_id, pong), Some(peer_id) = self.probes_rx.recv() => self.probe_peer(peer_id), } } @@ -226,14 +168,12 @@ impl SchedulerTransport { #[derive(Clone)] pub struct SchedulerTransportHandle { - pongs_tx: Sender<(PeerId, Pong)>, probes_tx: Sender, _task_manager: Arc, } impl SchedulerTransportHandle { fn new( - pongs_tx: Sender<(PeerId, Pong)>, probes_tx: Sender, transport: SchedulerTransport, shutdown_timeout: Duration, @@ -241,17 +181,11 @@ impl SchedulerTransportHandle { let mut task_manager = TaskManager::new(shutdown_timeout); task_manager.spawn(|c| transport.run(c)); Self { - pongs_tx, probes_tx, _task_manager: Arc::new(task_manager), } } - pub fn send_pong(&self, peer_id: PeerId, pong: Pong) -> Result<(), QueueFull> { - log::debug!("Queueing pong to {peer_id}: {pong:?}"); - self.pongs_tx.try_send((peer_id, pong)) - } - pub fn probe_peer(&self, peer_id: PeerId) -> Result<(), QueueFull> { log::debug!("Queueing probe of peer {peer_id}"); self.probes_tx.try_send(peer_id) @@ -262,17 +196,14 @@ pub fn start_transport( swarm: Swarm>, config: SchedulerConfig, ) -> (impl Stream, SchedulerTransportHandle) { - let (pongs_tx, pongs_rx) = new_queue(config.pongs_queue_size, "pongs"); let (probes_tx, probes_rx) = new_queue(config.probes_queue_size, "probes"); let (events_tx, events_rx) = new_queue(config.events_queue_size, "events"); let transport = SchedulerTransport { swarm, - pongs_rx, probes_rx, events_tx, ignore_existing_conns: config.ignore_existing_conns, }; - let handle = - SchedulerTransportHandle::new(pongs_tx, probes_tx, transport, config.shutdown_timeout); + let handle = SchedulerTransportHandle::new(probes_tx, transport, config.shutdown_timeout); (events_rx, handle) } diff --git a/crates/transport/src/actors/worker.rs b/crates/transport/src/actors/worker.rs index 3eedda7..b1f9bc8 100644 --- a/crates/transport/src/actors/worker.rs +++ b/crates/transport/src/actors/worker.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use futures::StreamExt; use futures_core::Stream; @@ -8,17 +8,10 @@ use libp2p::{ PeerId, Swarm, }; use libp2p_swarm_derive::NetworkBehaviour; -use prost::Message; use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; -use sqd_messages::{ - query_result, signatures::SignedMessage, LogsCollected, Ping, Pong, Query, QueryExecuted, - QueryResult, -}; - -#[cfg(feature = "metrics")] -use crate::metrics::PONGS_RECEIVED; +use sqd_messages::{Heartbeat, LogsRequest, Query, QueryLogs, QueryResult}; use crate::{ behaviour::{ @@ -26,192 +19,122 @@ use crate::{ request_server::{Request, ServerBehaviour}, wrapped::{BehaviourWrapper, TToSwarm, Wrapped}, }, - codec::{ProtoCodec, ACK_SIZE}, + codec::ProtoCodec, protocol::{ - MAX_PONG_SIZE, MAX_QUERY_RESULT_SIZE, MAX_QUERY_SIZE, PONG_PROTOCOL, QUERY_PROTOCOL, + MAX_LOGS_REQUEST_SIZE, MAX_LOGS_RESPONSE_SIZE, MAX_QUERY_RESULT_SIZE, MAX_QUERY_MSG_SIZE, + QUERY_PROTOCOL, WORKER_LOGS_PROTOCOL, }, record_event, util::{new_queue, Receiver, Sender, TaskManager, DEFAULT_SHUTDOWN_TIMEOUT}, QueueFull, }; -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug)] pub enum WorkerEvent { - /// Pong message received from the scheduler - Pong(Pong), /// Query received from a gateway - Query { peer_id: PeerId, query: Query }, - /// Logs up to `last_seq_no` have been saved by logs collector - LogsCollected { last_seq_no: Option }, + Query { + peer_id: PeerId, + query: Query, + /// If this channel is dropped, the connection will be closed + resp_chan: ResponseChannel, + }, + /// Logs requested by a collector + LogsRequest { + request: LogsRequest, + /// If this channel is dropped, the connection will be closed + resp_chan: ResponseChannel, + }, } -type PongBehaviour = Wrapped>>; type QueryBehaviour = Wrapped>>; +type LogsBehaviour = Wrapped>>; #[derive(NetworkBehaviour)] pub struct InnerBehaviour { base: Wrapped, - pong: PongBehaviour, query: QueryBehaviour, + logs: LogsBehaviour, } -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkerConfig { - pub scheduler_id: PeerId, - pub logs_collector_id: PeerId, - pub max_pong_size: u64, - pub max_query_size: u64, - pub max_query_result_size: u64, - pub pings_queue_size: usize, + pub heartbeats_queue_size: usize, pub query_results_queue_size: usize, pub logs_queue_size: usize, pub events_queue_size: usize, pub shutdown_timeout: Duration, pub query_execution_timeout: Duration, - pub pong_ack_timeout: Duration, + pub send_logs_timeout: Duration, } impl WorkerConfig { - pub fn new(scheduler_id: PeerId, logs_collector_id: PeerId) -> Self { + pub fn new() -> Self { Self { - scheduler_id, - logs_collector_id, - max_pong_size: MAX_PONG_SIZE, - max_query_size: MAX_QUERY_SIZE, - max_query_result_size: MAX_QUERY_RESULT_SIZE, - pings_queue_size: 100, + heartbeats_queue_size: 100, query_results_queue_size: 100, - logs_queue_size: 100, + logs_queue_size: 1, events_queue_size: 100, shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT, query_execution_timeout: Duration::from_secs(20), - pong_ack_timeout: Duration::from_secs(5), + send_logs_timeout: Duration::from_secs(5), } } } pub struct WorkerBehaviour { inner: InnerBehaviour, - local_peer_id: String, - scheduler_id: PeerId, - query_response_channels: HashMap>, } impl WorkerBehaviour { - pub fn new( - mut base: BaseBehaviour, - local_peer_id: PeerId, - config: WorkerConfig, - ) -> Wrapped { - base.subscribe_pings(); - base.subscribe_worker_logs(config.logs_collector_id); - base.allow_peer(config.logs_collector_id); - base.allow_peer(config.scheduler_id); + pub fn new(mut base: BaseBehaviour, config: WorkerConfig) -> Wrapped { + base.subscribe_heartbeats(); Self { inner: InnerBehaviour { base: base.into(), - pong: ServerBehaviour::new( - ProtoCodec::new(config.max_pong_size, ACK_SIZE), - PONG_PROTOCOL, - config.pong_ack_timeout, - ) - .into(), query: ServerBehaviour::new( - ProtoCodec::new(config.max_query_size, config.max_query_result_size), + ProtoCodec::new(MAX_QUERY_MSG_SIZE, MAX_QUERY_RESULT_SIZE), QUERY_PROTOCOL, config.query_execution_timeout, ) .into(), + logs: ServerBehaviour::new( + ProtoCodec::new(MAX_LOGS_REQUEST_SIZE, MAX_LOGS_RESPONSE_SIZE), + WORKER_LOGS_PROTOCOL, + config.query_execution_timeout, + ) + .into(), }, - local_peer_id: local_peer_id.to_base58(), - scheduler_id: config.scheduler_id, - query_response_channels: Default::default(), } .into() } - fn on_base_event(&mut self, ev: BaseBehaviourEvent) -> Option { - match ev { - BaseBehaviourEvent::LogsCollected(logs_collected) => { - self.on_logs_collected(logs_collected) - } - _ => None, - } - } - - fn on_logs_collected(&mut self, mut logs_collected: LogsCollected) -> Option { - log::debug!("Received logs collected message"); - // Extract last_seq_no for the local worker - let last_seq_no = logs_collected.sequence_numbers.remove(&self.local_peer_id); - Some(WorkerEvent::LogsCollected { last_seq_no }) + fn on_base_event(&mut self, _: BaseBehaviourEvent) -> Option { + None } fn on_query( &mut self, peer_id: PeerId, - mut query: Query, - resp_chan: Option>, + query: Query, + resp_chan: ResponseChannel, ) -> Option { - // Verify query signature - if !query.verify_signature(&peer_id) { - log::warn!("Dropping query with invalid signature from {peer_id}"); - return None; - } - // Check if query has ID - let query_id = match &query.query_id { - Some(id) => id.clone(), - None => { - log::warn!("Dropping query without ID from {peer_id}"); - return None; - } - }; - log::debug!("Query {query_id} verified"); - if let Some(resp_chan) = resp_chan { - self.query_response_channels.insert(query_id, resp_chan); - } - Some(WorkerEvent::Query { peer_id, query }) - } - - fn on_pong_event( - &mut self, - Request { + Some(WorkerEvent::Query { peer_id, - request, - response_channel, - }: Request, - ) -> Option { - if peer_id != self.scheduler_id { - log::warn!("Peer {peer_id} impersonating scheduler"); - return None; - } - log::debug!("Received pong from scheduler: {request:?}"); - #[cfg(feature = "metrics")] - PONGS_RECEIVED.inc(); - // Send minimal response to avoid getting errors - _ = self.inner.pong.try_send_response(response_channel, 1); - Some(WorkerEvent::Pong(request)) + query, + resp_chan, + }) } - pub fn send_ping(&mut self, ping: Ping) { - self.inner.base.publish_ping(ping); + pub fn send_heartbeat(&mut self, heartbeat: Heartbeat) { + self.inner.base.publish_heartbeat(heartbeat); } - pub fn send_query_result(&mut self, mut result: QueryResult) { + pub fn send_query_result( + &mut self, + result: QueryResult, + resp_chan: ResponseChannel, + ) { log::debug!("Sending query result {result:?}"); - let Some(resp_chan) = self.query_response_channels.remove(&result.query_id) else { - return log::error!("No response channel for query: {}", result.query_id); - }; - - // Check query result size limit - let result_size = result.encoded_len() as u64; - if result_size > MAX_QUERY_RESULT_SIZE { - let err = format!("Query result size too large: {result_size}"); - log::error!("{err}"); - result = QueryResult { - query_id: result.query_id, - result: Some(query_result::Result::ServerError(err)), - }; - } self.inner .query @@ -219,9 +142,22 @@ impl WorkerBehaviour { .unwrap_or_else(|e| log::error!("Cannot send result for query {}", e.query_id)); } - pub fn send_logs(&mut self, logs: Vec) { - log::debug!("Sending query logs"); - self.inner.base.publish_worker_logs(logs); + fn on_logs_request( + &mut self, + _peer_id: PeerId, + request: LogsRequest, + resp_chan: ResponseChannel, + ) -> Option { + Some(WorkerEvent::LogsRequest { request, resp_chan }) + } + + pub fn send_logs(&mut self, logs: QueryLogs, resp_chan: ResponseChannel) { + log::debug!("Sending {} query logs", logs.queries_executed.len()); + + self.inner + .logs + .try_send_response(resp_chan, logs) + .unwrap_or_else(|_| log::error!("Couldn't send logs")); } } @@ -239,12 +175,16 @@ impl BehaviourWrapper for WorkerBehaviour { ) -> impl IntoIterator> { let ev = match ev { InnerBehaviourEvent::Base(ev) => self.on_base_event(ev), - InnerBehaviourEvent::Pong(ev) => self.on_pong_event(ev), InnerBehaviourEvent::Query(Request { peer_id, request, response_channel, - }) => self.on_query(peer_id, request, Some(response_channel)), + }) => self.on_query(peer_id, request, response_channel), + InnerBehaviourEvent::Logs(Request { + peer_id, + request, + response_channel, + }) => self.on_logs_request(peer_id, request, response_channel), }; ev.map(ToSwarm::GenerateEvent) } @@ -252,9 +192,9 @@ impl BehaviourWrapper for WorkerBehaviour { struct WorkerTransport { swarm: Swarm>, - pings_rx: Receiver, - query_results_rx: Receiver, - logs_rx: Receiver>, + heartbeats_rx: Receiver, + query_results_rx: Receiver<(QueryResult, ResponseChannel)>, + logs_rx: Receiver<(QueryLogs, ResponseChannel)>, events_tx: Sender, } @@ -265,9 +205,9 @@ impl WorkerTransport { tokio::select! { _ = cancel_token.cancelled() => break, ev = self.swarm.select_next_some() => self.on_swarm_event(ev), - Some(ping) = self.pings_rx.recv() => self.swarm.behaviour_mut().send_ping(ping), - Some(res) = self.query_results_rx.recv() => self.swarm.behaviour_mut().send_query_result(res), - Some(logs) = self.logs_rx.recv() => self.swarm.behaviour_mut().send_logs(logs), + Some(heartbeat) = self.heartbeats_rx.recv() => self.swarm.behaviour_mut().send_heartbeat(heartbeat), + Some((res, resp_chan)) = self.query_results_rx.recv() => self.swarm.behaviour_mut().send_query_result(res, resp_chan), + Some((logs, resp_chan)) = self.logs_rx.recv() => self.swarm.behaviour_mut().send_logs(logs, resp_chan), } } log::info!("Shutting down worker P2P transport"); @@ -284,43 +224,51 @@ impl WorkerTransport { #[derive(Clone)] pub struct WorkerTransportHandle { - pings_tx: Sender, - query_results_tx: Sender, - logs_tx: Sender>, + heartbeats_tx: Sender, + query_results_tx: Sender<(QueryResult, ResponseChannel)>, + logs_tx: Sender<(QueryLogs, ResponseChannel)>, _task_manager: Arc, // This ensures that transport is stopped when the last handle is dropped } impl WorkerTransportHandle { fn new( - pings_tx: Sender, - query_results_tx: Sender, - logs_tx: Sender>, + heartbeats_tx: Sender, + query_results_tx: Sender<(QueryResult, ResponseChannel)>, + logs_tx: Sender<(QueryLogs, ResponseChannel)>, transport: WorkerTransport, shutdown_timeout: Duration, ) -> Self { let mut task_manager = TaskManager::new(shutdown_timeout); task_manager.spawn(|c| transport.run(c)); Self { - pings_tx, + heartbeats_tx, query_results_tx, logs_tx, _task_manager: Arc::new(task_manager), } } - pub fn send_ping(&self, ping: Ping) -> Result<(), QueueFull> { - log::trace!("Queueing ping {ping:?}"); - self.pings_tx.try_send(ping) + pub fn send_heartbeat(&self, heartbeat: Heartbeat) -> Result<(), QueueFull> { + log::trace!("Queueing heartbeat {heartbeat:?}"); + self.heartbeats_tx.try_send(heartbeat) } - pub fn send_query_result(&self, result: QueryResult) -> Result<(), QueueFull> { + pub fn send_query_result( + &self, + result: QueryResult, + resp_chan: ResponseChannel, + ) -> Result<(), QueueFull> { log::debug!("Queueing query result {result:?}"); - self.query_results_tx.try_send(result) + self.query_results_tx.try_send((result, resp_chan)) } - pub fn send_logs(&self, logs: Vec) -> Result<(), QueueFull> { - log::debug!("Queueing {} query logs", logs.len()); - self.logs_tx.try_send(logs) + pub fn send_logs( + &self, + logs: QueryLogs, + resp_chan: ResponseChannel, + ) -> Result<(), QueueFull> { + log::debug!("Queueing {} query logs", logs.queries_executed.len()); + self.logs_tx.try_send((logs, resp_chan)) } } @@ -328,20 +276,20 @@ pub fn start_transport( swarm: Swarm>, config: WorkerConfig, ) -> (impl Stream, WorkerTransportHandle) { - let (pings_tx, pings_rx) = new_queue(config.pings_queue_size, "pings"); + let (heartbeats_tx, heartbeats_rx) = new_queue(config.heartbeats_queue_size, "heartbeats"); let (query_results_tx, query_results_rx) = new_queue(config.query_results_queue_size, "query_results"); let (logs_tx, logs_rx) = new_queue(config.logs_queue_size, "logs"); let (events_tx, events_rx) = new_queue(config.events_queue_size, "events"); let transport = WorkerTransport { swarm, - pings_rx, + heartbeats_rx, query_results_rx, logs_rx, events_tx, }; let handle = WorkerTransportHandle::new( - pings_tx, + heartbeats_tx, query_results_tx, logs_tx, transport, diff --git a/crates/transport/src/behaviour/base.rs b/crates/transport/src/behaviour/base.rs index 27776d5..ef653cd 100644 --- a/crates/transport/src/behaviour/base.rs +++ b/crates/transport/src/behaviour/base.rs @@ -3,7 +3,7 @@ use std::{ num::NonZeroUsize, sync::Arc, task::{Context, Poll}, - time::{Duration, SystemTime}, + time::Duration, vec, }; @@ -34,15 +34,11 @@ use prost::Message; use serde::{Deserialize, Serialize}; use sqd_contract_client::{Client as ContractClient, NetworkNodes}; -use sqd_messages::{ - signatures::SignedMessage, worker_logs_msg, LogsCollected, Ping, QueryExecuted, QueryLogs, - WorkerLogsMsg, -}; +use sqd_messages::Heartbeat; #[cfg(feature = "metrics")] use crate::metrics::{ - ACTIVE_CONNECTIONS, LOGS_COLLECTED_PUBLISHED, LOGS_COLLECTED_RECEIVED, ONGOING_PROBES, - ONGOING_QUERIES, PINGS_PUBLISHED, PINGS_RECEIVED, WORKER_LOGS_PUBLISHED, WORKER_LOGS_RECEIVED, + ACTIVE_CONNECTIONS, HEARTBEATS_PUBLISHED, HEARTBEATS_RECEIVED, ONGOING_PROBES, ONGOING_QUERIES, }; use crate::{ behaviour::{ @@ -52,11 +48,7 @@ use crate::{ wrapped::{BehaviourWrapper, TToSwarm, Wrapped}, }, cli::BootNode, - protocol::{ - APPROX_EPOCH_LEN, EPOCH_SEAL_TIMEOUT, ID_PROTOCOL, KEEP_LAST_WORKER_LOGS, - LOGS_COLLECTED_MIN_INTERVAL, LOGS_COLLECTED_TOPIC, LOGS_MIN_INTERVAL, MAX_PUBSUB_MSG_SIZE, - PINGS_MIN_INTERVAL, PING_TOPIC, WORKER_LOGS_TOPIC, - }, + protocol::{HEARTBEATS_MIN_INTERVAL, HEARTBEAT_TOPIC, ID_PROTOCOL, MAX_PUBSUB_MSG_SIZE}, record_event, util::{addr_is_reachable, parse_env_var}, AgentInfo, Multiaddr, PeerId, @@ -129,8 +121,6 @@ pub struct BaseBehaviour { outbound_conns: HashMap, probe_timeouts: FuturesMap, registered_workers: Arc>>, - logs_collected: Arc>>, // peer_id (base58) -> highest collected seq_no - max_pubsub_msg_size: usize, } #[allow(dead_code)] @@ -192,103 +182,30 @@ impl BaseBehaviour { outbound_conns: Default::default(), probe_timeouts: FuturesMap::new(config.probe_timeout, config.max_concurrent_probes), registered_workers: Arc::new(RwLock::new(Default::default())), - logs_collected: Arc::new(RwLock::new(Default::default())), - max_pubsub_msg_size: config.max_pubsub_msg_size, } } - pub fn subscribe_pings(&mut self) { - let registered_workers = self.registered_workers.clone(); - let config = MsgValidationConfig::new(PINGS_MIN_INTERVAL).max_burst(2).msg_validator( - move |peer_id: PeerId, _seq_no: u64, _data: &[u8]| { - if !registered_workers.read().contains(&peer_id) { - return Err(ValidationError::Invalid("Worker not registered")); - } - Ok(()) - }, - ); - self.inner.pubsub.subscribe(PING_TOPIC, config); + pub fn keypair(&self) -> &Keypair { + &self.keypair } - pub fn subscribe_worker_logs(&mut self, logs_collector_id: PeerId) { - // Unordered messages need to be allowed, because we're interested in all messages from - // each worker, not only the most recent one (as in the case of pings). + pub fn subscribe_heartbeats(&mut self) { let registered_workers = self.registered_workers.clone(); - let logs_collected = self.logs_collected.clone(); - let config = MsgValidationConfig::new(LOGS_MIN_INTERVAL) - .max_burst(10) - .keep_last(KEEP_LAST_WORKER_LOGS) - .msg_validator(move |peer_id: PeerId, _seq_no: u64, msg: &[u8]| { + let config = MsgValidationConfig::new(HEARTBEATS_MIN_INTERVAL).max_burst(2).msg_validator( + move |peer_id: PeerId, _seq_no: u64, _data: &[u8]| { if !registered_workers.read().contains(&peer_id) { return Err(ValidationError::Invalid("Worker not registered")); } - let Ok(WorkerLogsMsg { - msg: Some(worker_logs_msg::Msg::QueryLogs(msg)), - }) = WorkerLogsMsg::decode(msg) - else { - return Err(ValidationError::Invalid("Invalid worker logs")); - }; - // Logs are sorted by seq_no, so we need to check the last one only - let (last_timestamp, last_seq_no) = match msg.queries_executed.last() { - Some(query_executed) => ( - query_executed.timestamp_ms.unwrap_or_default(), - query_executed.seq_no.unwrap_or_default(), - ), - None => return Err(ValidationError::Invalid("Empty worker logs")), - }; - // Don't propagate logs which are old & no longer relevant - let last_log_time = SystemTime::UNIX_EPOCH + Duration::from_millis(last_timestamp); - if last_log_time + EPOCH_SEAL_TIMEOUT + APPROX_EPOCH_LEN < SystemTime::now() { - return Err(ValidationError::Ignored("Old worker logs")); - } - // Don't propagate worker logs which have already been collected - match logs_collected.read().get(&peer_id.to_base58()) { - Some(seq_no) if *seq_no >= last_seq_no => { - Err(ValidationError::Ignored("Logs already collected")) - } - _ => Ok(()), - } - }); - self.inner.pubsub.subscribe(WORKER_LOGS_TOPIC, config); - - let config = MsgValidationConfig::new(LOGS_COLLECTED_MIN_INTERVAL).msg_validator( - move |peer_id: PeerId, _seq_no: u64, _msg: &[u8]| { - if peer_id == logs_collector_id { - Ok(()) - } else { - Err(ValidationError::Invalid("Invalid logs collector ID")) - } + Ok(()) }, ); - self.inner.pubsub.subscribe(LOGS_COLLECTED_TOPIC, config); - } - - pub fn sign(&self, msg: &mut T) { - msg.sign(&self.keypair) - } - - pub fn publish_ping(&mut self, mut ping: Ping) { - self.sign(&mut ping); - self.inner.pubsub.publish(PING_TOPIC, ping.encode_to_vec()); - #[cfg(feature = "metrics")] - PINGS_PUBLISHED.inc(); - } - - pub fn publish_worker_logs(&mut self, mut logs: Vec) { - for log in &mut logs { - self.sign(log); - } - for msg in bundle_logs(logs, self.max_pubsub_msg_size / 2) { - self.inner.pubsub.publish(WORKER_LOGS_TOPIC, msg.encode_to_vec()); - #[cfg(feature = "metrics")] - WORKER_LOGS_PUBLISHED.inc(); - } + self.inner.pubsub.subscribe(HEARTBEAT_TOPIC, config); } - pub fn publish_logs_collected(&mut self, logs_collected: &LogsCollected) { - self.inner.pubsub.publish(LOGS_COLLECTED_TOPIC, logs_collected.encode_to_vec()); + pub fn publish_heartbeat(&mut self, heartbeat: Heartbeat) { + self.inner.pubsub.publish(HEARTBEAT_TOPIC, heartbeat.encode_to_vec()); #[cfg(feature = "metrics")] - LOGS_COLLECTED_PUBLISHED.inc(); + HEARTBEATS_PUBLISHED.inc(); } pub fn find_and_dial(&mut self, peer_id: PeerId) { @@ -392,15 +309,10 @@ impl BaseBehaviour { #[derive(Debug, Clone)] pub enum BaseBehaviourEvent { - Ping { + Heartbeat { peer_id: PeerId, - ping: Ping, + heartbeat: Heartbeat, }, - WorkerQueryLogs { - peer_id: PeerId, - query_logs: QueryLogs, - }, - LogsCollected(LogsCollected), PeerProbed(PeerProbed), } @@ -631,24 +543,12 @@ impl BaseBehaviour { log::trace!("Pub-sub message received: peer_id={peer_id} topic={topic}"); let data = data.as_ref(); let ev = match topic { - PING_TOPIC => decode_ping(peer_id, data)?, - WORKER_LOGS_TOPIC => decode_worker_logs_msg(peer_id, data)?, - LOGS_COLLECTED_TOPIC => self.on_logs_collected(data)?, + HEARTBEAT_TOPIC => decode_heartbeat(peer_id, data)?, _ => return None, }; Some(ToSwarm::GenerateEvent(ev)) } - fn on_logs_collected(&mut self, data: &[u8]) -> Option { - let logs_collected = LogsCollected::decode(data) - .map_err(|e| log::warn!("Error decoding logs collected msg: {e:?}")) - .ok()?; - *self.logs_collected.write() = logs_collected.sequence_numbers.clone(); - #[cfg(feature = "metrics")] - LOGS_COLLECTED_RECEIVED.inc(); - Some(BaseBehaviourEvent::LogsCollected(logs_collected)) - } - fn on_nodes_update(&mut self, nodes: NetworkNodes) -> Option> { log::debug!("Updating registered workers"); *self.registered_workers.write() = nodes.workers; @@ -656,63 +556,11 @@ impl BaseBehaviour { } } -fn decode_ping(peer_id: PeerId, data: &[u8]) -> Option { - let mut ping = Ping::decode(data).map_err(|e| log::warn!("Error decoding ping: {e:?}")).ok()?; - let worker_id = peer_id.to_base58(); - if !ping.worker_id.as_ref().is_some_and(|id| id == &worker_id) { - log::warn!("Invalid worker_id in ping from {worker_id}"); - return None; - } - if !ping.verify_signature(&peer_id) { - log::warn!("Invalid ping signature from {worker_id}"); - return None; - } - #[cfg(feature = "metrics")] - PINGS_RECEIVED.inc(); - Some(BaseBehaviourEvent::Ping { peer_id, ping }) -} - -fn decode_worker_logs_msg(peer_id: PeerId, data: &[u8]) -> Option { - let msg = WorkerLogsMsg::decode(data) - .map_err(|e| log::warn!("Error decoding worker logs: {e:?}")) +fn decode_heartbeat(peer_id: PeerId, data: &[u8]) -> Option { + let heartbeat = Heartbeat::decode(data) + .map_err(|e| log::warn!("Error decoding heartbeat: {e:?}")) .ok()?; - match msg.msg { - Some(worker_logs_msg::Msg::QueryLogs(query_logs)) => { - #[cfg(feature = "metrics")] - WORKER_LOGS_RECEIVED.inc(); - Some(BaseBehaviourEvent::WorkerQueryLogs { - peer_id, - query_logs, - }) - } - _ => None, - } -} - -fn bundle_logs( - logs: impl IntoIterator, - size_limit: usize, -) -> impl Iterator { - let mut logs: VecDeque = logs.into_iter().collect(); - - // Bundle into chunks of at most `size_limit` total size - std::iter::from_fn(move || { - let mut msg = WorkerLogsMsg::default(); - while let Some(next_log) = logs.pop_front() { - msg.push(next_log); - if msg.encoded_len() > size_limit { - if msg.len() == 1 { - // Single message is larger than the limit – drop it - msg.pop(); - log::warn!("Query log is too large and will be dropped"); - } else { - // Put the message back in the queue to include in the next bundle - let log = msg.pop().unwrap(); - logs.push_front(log); - break; - } - } - } - (!msg.is_empty()).then_some(msg) - }) + #[cfg(feature = "metrics")] + HEARTBEATS_RECEIVED.inc(); + Some(BaseBehaviourEvent::Heartbeat { peer_id, heartbeat }) } diff --git a/crates/transport/src/behaviour/node_whitelist.rs b/crates/transport/src/behaviour/node_whitelist.rs index 4e79b61..d626973 100644 --- a/crates/transport/src/behaviour/node_whitelist.rs +++ b/crates/transport/src/behaviour/node_whitelist.rs @@ -65,7 +65,7 @@ impl WhitelistBehavior { result: Result, ) -> Option { let nodes = result - .map_err(|e| log::error!("Error retrieving registered nodes from chain: {e:?}")) + .map_err(|e| log::warn!("Error retrieving registered nodes from chain: {e:?}")) .ok()?; let all_nodes = nodes.clone().all(); diff --git a/crates/transport/src/behaviour/pubsub.rs b/crates/transport/src/behaviour/pubsub.rs index 26fa140..f6e333c 100644 --- a/crates/transport/src/behaviour/pubsub.rs +++ b/crates/transport/src/behaviour/pubsub.rs @@ -246,7 +246,7 @@ impl PubsubBehaviour { Err(PublishError::InsufficientPeers) if topic.subscribed_at.elapsed() <= SUBSCRIPTION_TIMEOUT => { - log::info!("Waiting for peers to publish to {topic_name}") + log::info!("Waiting for peers to be able to publish to {topic_name}") } Err(e) => log::error!("Error publishing message to {topic_name}: {e:?}"), Ok(_) => log::debug!("Message published to {topic_name}"), diff --git a/crates/transport/src/behaviour/request_client.rs b/crates/transport/src/behaviour/request_client.rs index 0337028..0eb2de7 100644 --- a/crates/transport/src/behaviour/request_client.rs +++ b/crates/transport/src/behaviour/request_client.rs @@ -45,7 +45,7 @@ pub enum ClientEvent { }, } -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum Timeout { /// Peer lookup or connection establishing timed out Lookup, @@ -212,7 +212,7 @@ where log::debug!("Request {req_id} failed: {error}"); // If request was submitted for the first time and dial failed, try to find peer and connect - // Keep the request contents in buffer for re-submitting, if loookup is successful + // Keep the request contents in buffer for re-submitting, if lookup is successful if matches!(&error, OutboundFailure::DialFailure) && !self.resubmitted_requests.contains_key(&req_id) { diff --git a/crates/transport/src/behaviour/wrapped.rs b/crates/transport/src/behaviour/wrapped.rs index 509c946..73e9a8d 100644 --- a/crates/transport/src/behaviour/wrapped.rs +++ b/crates/transport/src/behaviour/wrapped.rs @@ -32,7 +32,7 @@ pub trait BehaviourWrapper { None } fn poll(&mut self, _cx: &mut Context<'_>) -> Poll>> { - Poll::>>>::Pending + Poll::>::Pending } } diff --git a/crates/transport/src/builder.rs b/crates/transport/src/builder.rs index 818aa7e..35f11be 100644 --- a/crates/transport/src/builder.rs +++ b/crates/transport/src/builder.rs @@ -24,8 +24,7 @@ use crate::actors::gateway::{ }; #[cfg(feature = "logs-collector")] use crate::actors::logs_collector::{ - self, LogsCollectorBehaviour, LogsCollectorConfig, LogsCollectorEvent, - LogsCollectorTransportHandle, + self, LogsCollectorBehaviour, LogsCollectorConfig, LogsCollectorTransport, }; #[cfg(feature = "observer")] use crate::actors::observer::{ @@ -37,7 +36,7 @@ use crate::actors::peer_checker::{ }; #[cfg(feature = "pings-collector")] use crate::actors::pings_collector::{ - self, Ping, PingsCollectorBehaviour, PingsCollectorConfig, PingsCollectorTransportHandle, + self, Heartbeat, PingsCollectorBehaviour, PingsCollectorConfig, PingsCollectorTransportHandle, }; #[cfg(feature = "scheduler")] use crate::actors::scheduler::{ @@ -65,7 +64,7 @@ pub struct P2PTransportBuilder { impl P2PTransportBuilder { pub async fn from_cli(args: TransportArgs, agent_info: AgentInfo) -> anyhow::Result { let listen_addrs = args.listen_addrs(); - let keypair = get_keypair(args.key).await?; + let keypair = get_keypair(Some(args.key)).await?; let contract_client = sqd_contract_client::get_client(&args.rpc).await?; let dht_protocol = dht_protocol(args.rpc.network); Ok(Self { @@ -210,7 +209,7 @@ impl P2PTransportBuilder { pub fn build_logs_collector( self, config: LogsCollectorConfig, - ) -> Result<(impl Stream, LogsCollectorTransportHandle), Error> { + ) -> Result { let local_peer_id = self.local_peer_id(); let swarm = self.build_swarm(|base| LogsCollectorBehaviour::new(base, local_peer_id, config))?; @@ -231,8 +230,7 @@ impl P2PTransportBuilder { self, config: ObserverConfig, ) -> Result<(impl Stream, ObserverTransportHandle), Error> { - let swarm = - self.build_swarm(|base| ObserverBehaviour::new(base, config.logs_collector_id))?; + let swarm = self.build_swarm(|base| ObserverBehaviour::new(base))?; Ok(observer::start_transport(swarm, config)) } @@ -240,7 +238,7 @@ impl P2PTransportBuilder { pub fn build_pings_collector( self, config: PingsCollectorConfig, - ) -> Result<(impl Stream, PingsCollectorTransportHandle), Error> { + ) -> Result<(impl Stream, PingsCollectorTransportHandle), Error> { let swarm = self.build_swarm(PingsCollectorBehaviour::new)?; Ok(pings_collector::start_transport(swarm, config)) } @@ -276,7 +274,8 @@ impl P2PTransportBuilder { } break; } - let swarm = self.build_swarm(|base| WorkerBehaviour::new(base, local_peer_id, config))?; + let swarm = + self.build_swarm(|base| WorkerBehaviour::new(base, config.clone()))?; Ok(worker::start_transport(swarm, config)) } } diff --git a/crates/transport/src/cli.rs b/crates/transport/src/cli.rs index c805d43..0755688 100644 --- a/crates/transport/src/cli.rs +++ b/crates/transport/src/cli.rs @@ -4,27 +4,24 @@ use libp2p::Multiaddr; use sqd_contract_client::RpcArgs; use std::{path::PathBuf, str::FromStr}; -#[derive(Args)] +#[derive(Args, Clone)] pub struct TransportArgs { #[arg(short, long, env = "KEY_PATH", help = "Path to libp2p key file")] - pub key: Option, + pub key: PathBuf, #[arg( long, env, help = "Addresses on which the p2p node will listen", value_delimiter = ',', - num_args = 1.., - default_value = "/ip4/0.0.0.0/udp/0/quic-v1" )] - p2p_listen_addrs: Vec, + pub p2p_listen_addrs: Vec, #[arg( long, env, help = "Public address(es) on which the p2p node can be reached", value_delimiter = ',', - num_args = 1.., )] pub p2p_public_addrs: Vec, diff --git a/crates/transport/src/codec.rs b/crates/transport/src/codec.rs index 74d2f7a..18aeac9 100644 --- a/crates/transport/src/codec.rs +++ b/crates/transport/src/codec.rs @@ -5,8 +5,6 @@ use futures::{AsyncReadExt, AsyncWriteExt}; use libp2p::request_response; use prost::Message; -pub const ACK_SIZE: u64 = 4; - pub struct ProtoCodec { _req: PhantomData, _res: PhantomData, @@ -70,7 +68,7 @@ impl request T: futures::AsyncRead + Unpin + Send, { let mut buf = Vec::new(); - let bytes_read = io.take(self.max_res_size).read_to_end(&mut buf).await?; + let bytes_read = io.take(self.max_res_size + 1).read_to_end(&mut buf).await?; if bytes_read as u64 > self.max_res_size { return Err(std::io::Error::new(ErrorKind::InvalidData, "Response too large to read")); } diff --git a/crates/transport/src/lib.rs b/crates/transport/src/lib.rs index 8fb6b35..d401402 100644 --- a/crates/transport/src/lib.rs +++ b/crates/transport/src/lib.rs @@ -22,6 +22,7 @@ use tokio::sync::mpsc; pub use libp2p::{ identity::{Keypair, ParseError as IdParseError, PublicKey}, + request_response::ResponseChannel, Multiaddr, PeerId, }; @@ -46,11 +47,11 @@ pub mod util; #[cfg(feature = "gateway")] pub use crate::actors::gateway::{ - GatewayBehaviour, GatewayConfig, GatewayEvent, GatewayTransportHandle, + GatewayBehaviour, GatewayConfig, GatewayEvent, GatewayTransportHandle, QueryFailure, }; #[cfg(feature = "logs-collector")] pub use crate::actors::logs_collector::{ - LogsCollectorBehaviour, LogsCollectorConfig, LogsCollectorEvent, LogsCollectorTransportHandle, + LogsCollectorBehaviour, LogsCollectorConfig, LogsCollectorEvent, LogsCollectorTransport, }; #[cfg(feature = "observer")] pub use crate::actors::observer::{ @@ -63,7 +64,7 @@ pub use crate::actors::peer_checker::{ }; #[cfg(feature = "pings-collector")] pub use crate::actors::pings_collector::{ - Ping, PingsCollectorBehaviour, PingsCollectorConfig, PingsCollectorTransportHandle, + Heartbeat, PingsCollectorBehaviour, PingsCollectorConfig, PingsCollectorTransportHandle, }; #[cfg(feature = "scheduler")] pub use crate::actors::scheduler::{ diff --git a/crates/transport/src/metrics.rs b/crates/transport/src/metrics.rs index 8238ebd..f53aef4 100644 --- a/crates/transport/src/metrics.rs +++ b/crates/transport/src/metrics.rs @@ -17,14 +17,8 @@ lazy_static! { pub static ref DROPPED: Family, Counter> = Default::default(); pub static ref DISCARDED_MESSAGES: Counter = Default::default(); - pub static ref PINGS_PUBLISHED: Counter = Default::default(); - pub static ref PINGS_RECEIVED: Counter = Default::default(); - pub static ref PONGS_SENT: Counter = Default::default(); - pub static ref PONGS_RECEIVED: Counter = Default::default(); - pub static ref WORKER_LOGS_PUBLISHED: Counter = Default::default(); - pub static ref WORKER_LOGS_RECEIVED: Counter = Default::default(); - pub static ref LOGS_COLLECTED_PUBLISHED: Counter = Default::default(); - pub static ref LOGS_COLLECTED_RECEIVED: Counter = Default::default(); + pub static ref HEARTBEATS_PUBLISHED: Counter = Default::default(); + pub static ref HEARTBEATS_RECEIVED: Counter = Default::default(); } pub static LIBP2P_METRICS: OnceCell = OnceCell::const_new(); @@ -61,39 +55,13 @@ pub fn register_metrics(registry: &mut Registry) { ); registry.register("dropped", "The number of dropped messages/events", DROPPED.clone()); registry.register( - "pings_published", - "The number of published ping messages", - PINGS_PUBLISHED.clone(), + "heartbeats_published", + "The number of published heartbeat messages", + HEARTBEATS_PUBLISHED.clone(), ); registry.register( - "pings_received", - "The number of received ping messages", - PINGS_RECEIVED.clone(), - ); - registry.register("pongs_sent", "The number of sent pong messages", PONGS_SENT.clone()); - registry.register( - "pongs_received", - "The number of received pong messages", - PONGS_RECEIVED.clone(), - ); - registry.register( - "worker_logs_published", - "The number of published worker logs messages", - WORKER_LOGS_PUBLISHED.clone(), - ); - registry.register( - "worker_logs_received", - "The number of received worker logs messages", - WORKER_LOGS_RECEIVED.clone(), - ); - registry.register( - "logs_collected_published", - "The number of published logs collected messages", - LOGS_COLLECTED_PUBLISHED.clone(), - ); - registry.register( - "logs_collected_received", - "The number of received logs collected messages", - LOGS_COLLECTED_RECEIVED.clone(), + "heartbeats_received", + "The number of received heartbeat messages", + HEARTBEATS_RECEIVED.clone(), ); } diff --git a/crates/transport/src/protocol.rs b/crates/transport/src/protocol.rs index dfeef90..9ada3d7 100644 --- a/crates/transport/src/protocol.rs +++ b/crates/transport/src/protocol.rs @@ -4,27 +4,24 @@ use libp2p::StreamProtocol; use sqd_contract_client::Network; -pub const PING_TOPIC: &str = "/subsquid/worker_pings/1.0.0"; -pub const WORKER_LOGS_TOPIC: &str = "/subsquid/worker_query_logs/1.1.0"; -pub const LOGS_COLLECTED_TOPIC: &str = "/subsquid/logs_collected/1.0.0"; +pub const OLD_PING_TOPIC: &str = "/subsquid/worker_pings/1.0.0"; +pub const HEARTBEAT_TOPIC: &str = "/sqd/worker_heartbeats/1.1.0"; +pub const PORTAL_LOGS_TOPIC: &str = "/sqd/portal_logs/1.1.0"; pub const ID_PROTOCOL: &str = "/subsquid/1.0.0"; -pub const QUERY_PROTOCOL: &str = "/subsquid/query/1.0.0"; -pub const GATEWAY_LOGS_PROTOCOL: &str = "/subsquid/gateway-logs/1.0.0"; -pub const PONG_PROTOCOL: &str = "/subsquid/pong/1.0.0"; +pub const QUERY_PROTOCOL: &str = "/sqd/query/1.1.0"; +pub const WORKER_LOGS_PROTOCOL: &str = "/sqd/worker_logs/1.1.0"; -pub const MAX_QUERY_SIZE: u64 = 1024 * 1024; +pub const MAX_RAW_QUERY_SIZE: u64 = 256 * 1024; +pub const MAX_QUERY_MSG_SIZE: u64 = 257 * 1024; pub const MAX_QUERY_RESULT_SIZE: u64 = 100 * 1024 * 1024; -pub const MAX_GATEWAY_LOG_SIZE: u64 = 1024 * 1024; -pub const MAX_PONG_SIZE: u64 = 10 * 1024 * 1024; +pub const MAX_LOGS_REQUEST_SIZE: u64 = 100; +pub const MAX_LOGS_RESPONSE_SIZE: u64 = 10 * 1024 * 1024; pub const MAX_PUBSUB_MSG_SIZE: usize = 65536; -pub const KEEP_LAST_WORKER_LOGS: u64 = 100; -pub const PINGS_MIN_INTERVAL: Duration = Duration::from_secs(20); -pub const LOGS_MIN_INTERVAL: Duration = Duration::from_secs(120); -pub const LOGS_COLLECTED_MIN_INTERVAL: Duration = Duration::from_secs(60); -pub const EPOCH_SEAL_TIMEOUT: Duration = Duration::from_secs(600); +pub const HEARTBEATS_MIN_INTERVAL: Duration = Duration::from_secs(20); pub const APPROX_EPOCH_LEN: Duration = Duration::from_secs(1200); +pub const MAX_TIME_LAG: Duration = Duration::from_secs(60); pub const fn dht_protocol(network: Network) -> StreamProtocol { match network {