From ae8858c58e0fd099ec4a81d55270a0bfd8521df7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petar=20Vujovi=C4=87?= Date: Sun, 7 Jul 2024 11:12:29 +0200 Subject: [PATCH] feat(host): extract worker message handling (#307) * feat(host): extract worker message handling * [WIP](host): add initial cancel API for proof tasks * fix(host): add return expression * fix(host): wrap prover into a option string * fix(host): cleanup imports * refactor(host): move proof handlers into separate module * feat(host): use actor pattern to spawn tasks * refactor(host): clean up task spawning * chore(deps): remove hyper * chore(deps): update dependencies * feat(host): add concurrency limit to proof tasks * fix(pipeline): convert buffer types * fix(harness): pass second argument * refactor(host): move allocator next to memory module * feat(host): handle task cancell status and refactor handler * feat(host,task_manager): add documentation to v2 API * fix(host): fix typo in docs * chore(lib,core,host): use tracing instead of println * feat(host): update cancellation route * refactor(host): move proof file into module * [WIP](core,host,task_manager): stub prune and report endpoints * refactor(task_manager): use `TaskDescriptor` inside `TaskReport` * feat(host): don't start work unless the latest status is `Registered` * feat(host): update prune docs with `PruneStatus` * feat(task_manager): refactor in memory implementation --- Cargo.lock | 252 +++++++-------- Cargo.toml | 11 +- core/src/interfaces.rs | 14 + core/src/preflight.rs | 6 +- harness/macro/src/lib.rs | 2 +- host/Cargo.toml | 9 +- host/src/lib.rs | 297 ++++-------------- host/src/proof.rs | 212 +++++++++++++ host/src/server/api/v2/mod.rs | 102 +++++- host/src/server/api/v2/proof/cancel.rs | 70 +++++ .../server/api/v2/{proof.rs => proof/mod.rs} | 123 ++++---- host/src/server/api/v2/proof/prune.rs | 33 ++ host/src/server/api/v2/proof/report.rs | 36 +++ lib/src/utils.rs | 13 +- pipeline/src/builder.rs | 6 +- provers/sgx/setup/Cargo.toml | 1 - task_manager/Cargo.toml | 1 + task_manager/src/adv_sqlite.rs | 129 +++----- task_manager/src/lib.rs | 51 +-- task_manager/src/mem_db.rs | 124 +++----- 20 files changed, 849 insertions(+), 643 deletions(-) create mode 100644 host/src/proof.rs create mode 100644 host/src/server/api/v2/proof/cancel.rs rename host/src/server/api/v2/{proof.rs => proof/mod.rs} (53%) create mode 100644 host/src/server/api/v2/proof/prune.rs create mode 100644 host/src/server/api/v2/proof/report.rs diff --git a/Cargo.lock b/Cargo.lock index a3f9fb144..7915a025e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -107,7 +107,7 @@ dependencies = [ "getrandom 0.2.15", "once_cell", "version_check", - "zerocopy 0.7.34", + "zerocopy 0.7.35", ] [[package]] @@ -331,9 +331,9 @@ dependencies = [ [[package]] name = "alloy-rlp" -version = "0.3.5" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b155716bab55763c95ba212806cf43d05bcc70e5f35b02bad20cf5ec7fe11fed" +checksum = "a43b18702501396fa9bcdeecd533bc85fac75150d308fc0f6800a01e6234a003" dependencies = [ "alloy-rlp-derive", "arrayvec", @@ -342,9 +342,9 @@ dependencies = [ [[package]] name = "alloy-rlp-derive" -version = "0.3.5" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8037e03c7f462a063f28daec9fda285a9a89da003c552f8637a80b9c8fd96241" +checksum = "d83524c1f6162fcb5b0decf775498a125066c86dda6066ed609531b0e912f85a" dependencies = [ "proc-macro2", "quote", @@ -416,7 +416,7 @@ dependencies = [ "alloy-primitives", "alloy-rpc-types-engine", "serde", - "serde_with 3.8.1", + "serde_with 3.8.3", "thiserror", ] @@ -1118,7 +1118,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.3.1", + "hyper 1.4.0", "hyper-util", "itoa", "matchit", @@ -1183,7 +1183,7 @@ dependencies = [ "cfg-if", "libc", "miniz_oxide", - "object 0.36.0", + "object 0.36.1", "rustc-demangle", "serde", ] @@ -1581,19 +1581,6 @@ dependencies = [ "serde", ] -[[package]] -name = "cargo_metadata" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7714a157da7991e23d90686b9524b9e12e0407a108647f52e9328f4b3d51ac7f" -dependencies = [ - "cargo-platform", - "semver 0.11.0", - "semver-parser", - "serde", - "serde_json", -] - [[package]] name = "cargo_metadata" version = "0.17.0" @@ -1624,9 +1611,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.101" +version = "1.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac367972e516d45567c7eafc73d24e1c193dcf200a8d94e9db7b3d38b349572d" +checksum = "74b6a57f98764a267ff415d50a25e6e166f3831a5071af4995296ea97d210490" dependencies = [ "jobserver", "libc", @@ -1660,7 +1647,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -1710,9 +1697,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.7" +version = "4.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5db83dced34638ad474f39f250d7fea9598bdd239eaced1bdf45d597da0f433f" +checksum = "84b3edb18336f4df585bc9aa31dd99c036dfa5dc5e9a2939a722a188f3a8970d" dependencies = [ "clap_builder", "clap_derive", @@ -1720,9 +1707,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.7" +version = "4.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7e204572485eb3fbf28f871612191521df159bc3e15a9f5064c66dba3a8c05f" +checksum = "c1c09dd5ada6c6c78075d6fd0da3f90d8080651e2d6cc8eb2f1aaa4034ced708" dependencies = [ "anstream", "anstyle", @@ -1732,9 +1719,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.5" +version = "4.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c780290ccf4fb26629baa7a1081e68ced113f1d3ec302fa5948f1c381ebf06c6" +checksum = "2bac35c6dafb060fd4d275d9a4ffae97917c13a6327903a8be2153cd964f7085" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -3329,9 +3316,9 @@ dependencies = [ [[package]] name = "generic-array" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe739944a5406424e080edccb6add95685130b9f160d5407c639c7df0c5836b0" +checksum = "96512db27971c2c3eece70a1e106fbe6c87760234e31e8f7e5634912fe52794a" dependencies = [ "serde", "typenum", @@ -3511,7 +3498,7 @@ version = "0.1.0" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.68", ] [[package]] @@ -3521,7 +3508,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.68", ] [[package]] @@ -3771,9 +3758,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.3.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +checksum = "c4fe55fb7a772d59a5ff1dfbff4fe0258d19b89fec4b233e75d35d5d2316badc" dependencies = [ "bytes", "futures-channel", @@ -3812,7 +3799,7 @@ checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.3.1", + "hyper 1.4.0", "hyper-util", "rustls 0.23.10", "rustls-pki-types", @@ -3843,7 +3830,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper 1.3.1", + "hyper 1.4.0", "hyper-util", "native-tls", "tokio", @@ -3853,16 +3840,16 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" +checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956" dependencies = [ "bytes", "futures-channel", "futures-util", "http 1.1.0", "http-body 1.0.0", - "hyper 1.3.1", + "hyper 1.4.0", "pin-project-lite", "socket2 0.5.7", "tokio", @@ -4275,7 +4262,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e310b3a6b5907f99202fcdb4960ff45b93735d7c7d96b760fcff8db2dc0e103d" dependencies = [ "cfg-if", - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -4463,9 +4450,9 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "mime_guess" -version = "2.0.4" +version = "2.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" dependencies = [ "mime", "unicase", @@ -4798,9 +4785,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "576dfe1fc8f9df304abb159d767a29d0476f7750fbf8aa7ad07816004a207434" +checksum = "081b846d1d56ddfc18fdf1a922e4f6e07a11768ea1b92dec44e42b72712ccfce" dependencies = [ "memchr", ] @@ -5234,7 +5221,7 @@ dependencies = [ "libc", "redox_syscall 0.5.2", "smallvec", - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -5325,9 +5312,9 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pest" -version = "2.7.10" +version = "2.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "560131c633294438da9f7c4b08189194b20946c8274c6b9e38881a7874dc8ee8" +checksum = "cd53dff83f26735fdc1ca837098ccf133605d794cdae66acfc2bfac3ec809d95" dependencies = [ "memchr", "thiserror", @@ -5712,7 +5699,7 @@ dependencies = [ "anyhow", "assert_cmd", "bincode", - "clap 4.5.7", + "clap 4.5.8", "ethers-core 2.0.10", "kzg", "raiko-lib", @@ -5728,7 +5715,7 @@ dependencies = [ "rust-kzg-zkcrypto", "serde", "serde_json", - "serde_with 3.8.1", + "serde_with 3.8.3", "sgx-prover", "sp1-driver", "thiserror", @@ -5758,11 +5745,10 @@ dependencies = [ "bytemuck", "cap", "cfg-if", - "clap 4.5.7", + "clap 4.5.8", "env_logger", "ethers-core 2.0.10", "flate2", - "hyper 0.14.29", "kzg", "lazy_static", "lru_time_cache", @@ -5784,7 +5770,7 @@ dependencies = [ "rust-kzg-zkcrypto", "serde", "serde_json", - "serde_with 3.8.1", + "serde_with 3.8.3", "sgx-prover", "sha2", "sp1-driver", @@ -5832,7 +5818,7 @@ dependencies = [ "rust-kzg-zkcrypto", "serde", "serde_json", - "serde_with 3.8.1", + "serde_with 3.8.3", "sha2", "sha3", "thiserror", @@ -5847,7 +5833,7 @@ name = "raiko-pipeline" version = "0.1.0" dependencies = [ "anyhow", - "cargo_metadata 0.12.3", + "cargo_metadata 0.18.1", "cfg-if", "chrono", "dirs", @@ -5877,12 +5863,11 @@ dependencies = [ "bytemuck", "cap", "cfg-if", - "clap 4.5.7", + "clap 4.5.8", "dirs", "env_logger", "ethers-core 2.0.10", "flate2", - "hyper 0.14.29", "lazy_static", "lru_time_cache", "once_cell", @@ -5895,7 +5880,7 @@ dependencies = [ "rstest", "serde", "serde_json", - "serde_with 3.8.1", + "serde_with 3.8.3", "sgx-prover", "sha2", "structopt", @@ -5929,6 +5914,7 @@ dependencies = [ "thiserror", "tokio", "tracing", + "utoipa", ] [[package]] @@ -6216,7 +6202,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.3.1", + "hyper 1.4.0", "hyper-rustls 0.27.2", "hyper-tls 0.6.0", "hyper-util", @@ -6253,9 +6239,9 @@ dependencies = [ [[package]] name = "reqwest-middleware" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a45d100244a467870f6cb763c4484d010a6bed6bd610b3676e3825d93fb4cfbd" +checksum = "39346a33ddfe6be00cbc17a34ce996818b97b230b87229f10114693becca1268" dependencies = [ "anyhow", "async-trait", @@ -6631,7 +6617,7 @@ source = "git+https://github.com/taikoxyz/taiko-reth.git?branch=v1.0.0-rc.2-taik dependencies = [ "futures-util", "reqwest 0.12.5", - "serde_with 3.8.1", + "serde_with 3.8.3", "thiserror", "tokio", ] @@ -6678,7 +6664,7 @@ dependencies = [ "alloy-rlp", "enr 0.12.1", "secp256k1", - "serde_with 3.8.1", + "serde_with 3.8.3", "thiserror", "tokio", "url", @@ -6902,7 +6888,7 @@ name = "reth-tracing" version = "1.0.0-rc.2" source = "git+https://github.com/taikoxyz/taiko-reth.git?branch=v1.0.0-rc.2-taiko#3043c66ac743ef765f7c403f589d124d115ff9eb" dependencies = [ - "clap 4.5.7", + "clap 4.5.8", "eyre", "rolling-file", "tracing", @@ -7209,7 +7195,7 @@ dependencies = [ "risc0-zkvm", "serde", "serde_json", - "serde_with 3.8.1", + "serde_with 3.8.3", "tracing", "typetag", ] @@ -7360,9 +7346,9 @@ dependencies = [ [[package]] name = "roaring" -version = "0.10.5" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7699249cc2c7d71939f30868f47e9d7add0bdc030d90ee10bfd16887ff8bb1c8" +checksum = "8f4b84ba6e838ceb47b41de5194a60244fac43d9fe03b71dbe8c5a201081d6d1" dependencies = [ "bytemuck", "byteorder", @@ -7399,9 +7385,9 @@ dependencies = [ [[package]] name = "rstest" -version = "0.18.2" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97eeab2f3c0a199bc4be135c36c924b6590b88c377d416494288c14f2db30199" +checksum = "9afd55a67069d6e434a95161415f5beeada95a01c7b815508a82dcb0e1593682" dependencies = [ "futures", "futures-timer", @@ -7411,12 +7397,13 @@ dependencies = [ [[package]] name = "rstest_macros" -version = "0.18.2" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d428f8247852f894ee1be110b375111b586d4fa431f6c46e64ba5a0dcccbe605" +checksum = "4165dfae59a39dd41d8dec720d3cbfbc71f69744efb480a3920f5d4e0cc6798d" dependencies = [ "cfg-if", "glob", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "regex", @@ -7592,7 +7579,7 @@ dependencies = [ "once_cell", "ring 0.17.8", "rustls-pki-types", - "rustls-webpki 0.102.4", + "rustls-webpki 0.102.5", "subtle", "zeroize", ] @@ -7634,9 +7621,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.102.4" +version = "0.102.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e" +checksum = "f9a6fccd794a42c2c105b513a2f62bc3fd8f3ba57a4593677ceb0bd035164d78" dependencies = [ "ring 0.17.8", "rustls-pki-types", @@ -7845,7 +7832,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6" dependencies = [ "semver-parser", - "serde", ] [[package]] @@ -7909,9 +7895,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.118" +version = "1.0.120" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d947f6b3163d8857ea16c4fa0dd4840d52f3041039a85decd46867eb1abef2e4" +checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5" dependencies = [ "indexmap 2.2.6", "itoa", @@ -7968,9 +7954,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.8.1" +version = "3.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ad483d2ab0149d5a5ebcd9972a3852711e0153d863bf5a5d0391d28883c4a20" +checksum = "e73139bc5ec2d45e6c5fd85be5a46949c1c39a4c18e56915f5eb4c12f975e377" dependencies = [ "base64 0.22.1", "chrono", @@ -7980,7 +7966,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "serde_with_macros 3.8.1", + "serde_with_macros 3.8.3", "time", ] @@ -7998,9 +7984,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.8.1" +version = "3.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65569b702f41443e8bc8bbb1c5779bd0450bbe723b56198980e80ec45780bce2" +checksum = "b80d3d6b56b64335c0180e5ffde23b3c5e08c14c585b51a15bd0e95393f46703" dependencies = [ "darling", "proc-macro2", @@ -8038,10 +8024,10 @@ name = "sgx-guest" version = "0.1.0" dependencies = [ "anyhow", - "base64 0.21.7", + "base64 0.22.1", "base64-serde", "bincode", - "clap 4.5.7", + "clap 4.5.8", "dirs", "hex", "raiko-lib", @@ -8073,7 +8059,7 @@ dependencies = [ "raiko-lib", "serde", "serde_json", - "serde_with 3.8.1", + "serde_with 3.8.3", "tokio", "url", ] @@ -8259,7 +8245,7 @@ dependencies = [ "curve25519-dalek", "elf", "elliptic-curve", - "generic-array 1.0.0", + "generic-array 1.1.0", "hex", "itertools 0.13.0", "k256", @@ -8289,7 +8275,7 @@ dependencies = [ "rayon-scan", "rrs-lib 0.1.0 (git+https://github.com/GregAC/rrs.git)", "serde", - "serde_with 3.8.1", + "serde_with 3.8.3", "size", "snowbridge-amcl", "sp1-derive", @@ -8352,7 +8338,7 @@ dependencies = [ "backtrace", "bincode", "bytemuck", - "clap 4.5.7", + "clap 4.5.8", "dirs", "futures", "hex", @@ -8460,7 +8446,7 @@ dependencies = [ "p3-symmetric", "p3-util", "serde", - "serde_with 3.8.1", + "serde_with 3.8.3", "sp1-core", "sp1-derive", "sp1-primitives", @@ -9377,7 +9363,7 @@ dependencies = [ "futures", "http 1.1.0", "http-body-util", - "hyper 1.3.1", + "hyper 1.4.0", "prost", "reqwest 0.12.5", "serde", @@ -9590,18 +9576,20 @@ dependencies = [ [[package]] name = "utoipa-swagger-ui" -version = "6.0.0" +version = "7.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b39868d43c011961e04b41623e050aedf2cc93652562ff7935ce0f819aaf2da" +checksum = "943e0ff606c6d57d410fd5663a4d7c074ab2c5f14ab903b9514565e59fa1189e" dependencies = [ "axum", "mime_guess", "regex", + "reqwest 0.12.5", "rust-embed", "serde", "serde_json", + "url", "utoipa", - "zip 0.6.6", + "zip 1.1.4", ] [[package]] @@ -9888,7 +9876,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" dependencies = [ "windows-core", - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -9897,7 +9885,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -9915,7 +9903,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -9935,18 +9923,18 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm 0.52.5", - "windows_aarch64_msvc 0.52.5", - "windows_i686_gnu 0.52.5", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", "windows_i686_gnullvm", - "windows_i686_msvc 0.52.5", - "windows_x86_64_gnu 0.52.5", - "windows_x86_64_gnullvm 0.52.5", - "windows_x86_64_msvc 0.52.5", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] [[package]] @@ -9957,9 +9945,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" [[package]] name = "windows_aarch64_msvc" @@ -9969,9 +9957,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" [[package]] name = "windows_i686_gnu" @@ -9981,15 +9969,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" [[package]] name = "windows_i686_gnullvm" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" [[package]] name = "windows_i686_msvc" @@ -9999,9 +9987,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" [[package]] name = "windows_x86_64_gnu" @@ -10011,9 +9999,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" [[package]] name = "windows_x86_64_gnullvm" @@ -10023,9 +10011,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" [[package]] name = "windows_x86_64_msvc" @@ -10035,9 +10023,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winnow" @@ -10122,11 +10110,11 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.34" +version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae87e3fcd617500e5d106f0380cf7b77f3c6092aae37191433159dda23cfb087" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ - "zerocopy-derive 0.7.34", + "zerocopy-derive 0.7.35", ] [[package]] @@ -10140,9 +10128,9 @@ dependencies = [ [[package]] name = "zerocopy-derive" -version = "0.7.34" +version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15e934569e47891f7d9411f1a451d947a60e000ab3bd24fbb970f000387d1b3b" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", @@ -10182,14 +10170,18 @@ dependencies = [ [[package]] name = "zip" -version = "0.6.6" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "760394e246e4c28189f19d488c058bf16f564016aefac5d32bb1f3b51d5e9261" +checksum = "9cc23c04387f4da0374be4533ad1208cbb091d5c11d070dfef13676ad6497164" dependencies = [ - "byteorder", + "arbitrary", "crc32fast", "crossbeam-utils", + "displaydoc", "flate2", + "indexmap 2.2.6", + "num_enum 0.7.2", + "thiserror", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 83ef74500..0531c25c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,7 +108,7 @@ rlp = "0.5.2" hex = { version = "0.4.3" } hex-literal = "0.4" base64-serde = "0.7.0" -base64 = "0.21.7" +base64 = "0.22.1" libflate = { version = "2.0.0" } typetag = { version = "0.2.15" } num_enum = "0.7.2" @@ -129,13 +129,12 @@ cap = { git = "https://github.com/brechtpd/cap", branch = "more-stats", features axum = { version = "0.7.4", features = ["macros"] } tower-http = { version = "0.5.2", features = ["full"] } tower = { version = "0.4.13", features = ["full"] } -utoipa-swagger-ui = { version = "6.0.0", features = ["axum"] } +utoipa-swagger-ui = { version = "7.1.0", features = ["axum"] } utoipa-scalar = { version = "0.1.0", features = ["axum"] } utoipa = { version = "4.2.0", features = ["axum_extras"] } structopt = "0.3.24" prometheus = { version = "0.13.3", features = ["process"] } tokio = { version = "^1.23", features = ["full"] } -hyper = { version = "0.14.27", features = ["server"] } reqwest = { version = "0.11.22", features = ["json"] } url = "2.5.0" async-trait = "0.1.80" @@ -151,7 +150,7 @@ secp256k1 = { version = "0.29", default-features = false, features = [ ] } # macro -syn = { version = "1.0", features = ["full"] } +syn = { version = "2.0.68", features = ["full"] } quote = "1.0" proc-macro2 = "1.0" @@ -166,7 +165,7 @@ rusqlite = { version = "0.31.0", features = ["bundled"] } # misc hashbrown = { version = "0.14", features = ["inline-more"] } tempfile = "3.8" -cargo_metadata = "0.12.1" +cargo_metadata = "0.18.1" clap = { version = "4.4.6", features = ["derive", "string", "env"] } lru_time_cache = "0.11.11" lazy_static = "1.4.0" @@ -176,7 +175,7 @@ cfg-if = "1.0.0" chrono = { version = "0.4", default-features = false } assert_cmd = "2.0" regex = "1.5.4" -rstest = "0.18" +rstest = "0.21" rand = "0.8.5" rand_core = "0.6.4" dirs = "5.0.1" diff --git a/core/src/interfaces.rs b/core/src/interfaces.rs index 33ed25780..ebd86e127 100644 --- a/core/src/interfaces.rs +++ b/core/src/interfaces.rs @@ -139,6 +139,20 @@ impl FromStr for ProofType { } } +impl TryFrom for ProofType { + type Error = RaikoError; + + fn try_from(value: u8) -> Result { + match value { + 0 => Ok(Self::Native), + 1 => Ok(Self::Sp1), + 2 => Ok(Self::Sgx), + 3 => Ok(Self::Risc0), + _ => Err(RaikoError::Conversion("Invalid u8".to_owned())), + } + } +} + impl ProofType { /// Run the prover driver depending on the proof type. pub async fn run_prover( diff --git a/core/src/preflight.rs b/core/src/preflight.rs index b426de656..ba1895ad0 100644 --- a/core/src/preflight.rs +++ b/core/src/preflight.rs @@ -32,7 +32,7 @@ use reth_evm_ethereum::taiko::decode_anchor; use reth_primitives::Block; use serde::{Deserialize, Serialize}; use std::collections::HashSet; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; pub async fn preflight( provider: BDP, @@ -137,7 +137,7 @@ pub async fn preflight( num_iterations += 1; } clear_line(); - println!("State data fetched in {num_iterations} iterations"); + info!("State data fetched in {num_iterations} iterations"); let provider_db = builder.db.as_mut().unwrap(); @@ -406,7 +406,7 @@ async fn get_blob_data_blobscan( let blob: BlobScanData = response.json().await?; Ok(blob_to_bytes(&blob.data)) } else { - println!( + error!( "Request {url} failed with status code: {}", response.status() ); diff --git a/harness/macro/src/lib.rs b/harness/macro/src/lib.rs index c5fdf7bcb..43c4092d8 100644 --- a/harness/macro/src/lib.rs +++ b/harness/macro/src/lib.rs @@ -20,7 +20,7 @@ impl syn::parse::Parse for EntryArgs { let test_modules: Option> = if input.peek(Token![,]) { input.parse::()?; // Parse and consume the comma // Now parse a list of module paths if they are present - Some(input.parse_terminated(Path::parse)?) + Some(input.parse_terminated(Path::parse, Token![,])?) } else { None }; diff --git a/host/Cargo.toml b/host/Cargo.toml index 6e997bb34..f41b538cf 100644 --- a/host/Cargo.toml +++ b/host/Cargo.toml @@ -50,7 +50,6 @@ serde = { workspace = true } serde_with = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } -hyper = { workspace = true } env_logger = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } @@ -70,10 +69,10 @@ cfg-if = { workspace = true } cap = { workspace = true } # reth -reth-primitives.workspace = true -reth-evm.workspace = true -reth-evm-ethereum.workspace = true -reth-provider.workspace = true +reth-primitives = { workspace = true } +reth-evm = { workspace = true } +reth-evm-ethereum = { workspace = true } +reth-provider = { workspace = true } [dev-dependencies] assert_cmd = { workspace = true } diff --git a/host/src/lib.rs b/host/src/lib.rs index 651496932..c962a25ba 100644 --- a/host/src/lib.rs +++ b/host/src/lib.rs @@ -1,59 +1,24 @@ -pub mod interfaces; -pub mod metrics; -pub mod server; - use std::{alloc, path::PathBuf}; use anyhow::Context; use cap::Cap; use clap::Parser; use raiko_core::{ - interfaces::{ProofRequest, ProofRequestOpt, RaikoError}, + interfaces::{ProofRequest, ProofRequestOpt}, merge, - provider::{get_task_data, rpc::RpcBlockDataProvider}, - Raiko, }; -use raiko_lib::{consts::SupportedChainSpecs, Measurement}; -use raiko_task_manager::{get_task_manager, TaskManager, TaskManagerOpts, TaskStatus}; +use raiko_lib::consts::SupportedChainSpecs; +use raiko_task_manager::TaskManagerOpts; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::sync::mpsc; -use tracing::{debug, error, info}; - -use crate::{ - interfaces::{HostError, HostResult}, - metrics::{ - inc_guest_error, inc_guest_req_count, inc_guest_success, inc_host_error, - inc_host_req_count, observe_guest_time, observe_prepare_input_time, observe_total_time, - }, - server::api::v1::{ - proof::{get_cached_input, set_cached_input, validate_cache_input}, - ProofResponse, - }, -}; - -#[global_allocator] -static ALLOCATOR: Cap = Cap::new(alloc::System, usize::MAX); - -fn default_address() -> String { - "0.0.0.0:8080".to_string() -} - -fn default_concurrency_limit() -> usize { - 16 -} - -fn default_max_log() -> usize { - 16 -} -fn default_config_path() -> PathBuf { - PathBuf::from("host/config/config.json") -} +use crate::{interfaces::HostResult, proof::ProofActor}; -fn default_log_level() -> String { - "info".to_string() -} +pub mod interfaces; +pub mod metrics; +pub mod proof; +pub mod server; #[derive(Default, Clone, Serialize, Deserialize, Debug, Parser)] #[command( @@ -62,15 +27,15 @@ fn default_log_level() -> String { long_about = None )] #[serde(default)] -pub struct Cli { +pub struct Opts { #[arg(long, require_equals = true, default_value = "0.0.0.0:8080")] - #[serde(default = "default_address")] + #[serde(default = "Opts::default_address")] /// Server bind address /// [default: 0.0.0.0:8080] address: String, #[arg(long, require_equals = true, default_value = "16")] - #[serde(default = "default_concurrency_limit")] + #[serde(default = "Opts::default_concurrency_limit")] /// Limit the max number of in-flight requests pub concurrency_limit: usize, @@ -78,11 +43,11 @@ pub struct Cli { pub log_path: Option, #[arg(long, require_equals = true, default_value = "7")] - #[serde(default = "default_max_log")] + #[serde(default = "Opts::default_max_log")] pub max_log: usize, #[arg(long, require_equals = true, default_value = "host/config/config.json")] - #[serde(default = "default_config_path")] + #[serde(default = "Opts::default_config_path")] /// Path to a config file that includes sufficient json args to request /// a proof of specified type. Curl json-rpc overrides its contents config_path: PathBuf, @@ -96,7 +61,7 @@ pub struct Cli { cache_path: Option, #[arg(long, require_equals = true, env = "RUST_LOG", default_value = "info")] - #[serde(default = "default_log_level")] + #[serde(default = "Opts::default_log_level")] /// Set the log level pub log_level: String, @@ -117,7 +82,27 @@ pub struct Cli { max_db_size: usize, } -impl Cli { +impl Opts { + fn default_address() -> String { + "0.0.0.0:8080".to_string() + } + + fn default_concurrency_limit() -> usize { + 16 + } + + fn default_max_log() -> usize { + 16 + } + + fn default_config_path() -> PathBuf { + PathBuf::from("host/config/config.json") + } + + fn default_log_level() -> String { + "info".to_string() + } + /// Read the options from a file and merge it with the current options. pub fn merge_from_file(&mut self) -> HostResult<()> { let file = std::fs::File::open(&self.config_path)?; @@ -131,17 +116,8 @@ impl Cli { } } -type TaskChannelOpts = (ProofRequest, Cli, SupportedChainSpecs); - -#[derive(Debug, Clone)] -pub struct ProverState { - pub opts: Cli, - pub chain_specs: SupportedChainSpecs, - pub task_channel: mpsc::Sender, -} - -impl From for TaskManagerOpts { - fn from(val: Cli) -> Self { +impl From for TaskManagerOpts { + fn from(val: Opts) -> Self { Self { sqlite_file: val.sqlite_file, max_db_size: val.max_db_size, @@ -149,8 +125,8 @@ impl From for TaskManagerOpts { } } -impl From<&Cli> for TaskManagerOpts { - fn from(val: &Cli) -> Self { +impl From<&Opts> for TaskManagerOpts { + fn from(val: &Opts) -> Self { Self { sqlite_file: val.sqlite_file.clone(), max_db_size: val.max_db_size, @@ -158,10 +134,19 @@ impl From<&Cli> for TaskManagerOpts { } } +pub type TaskChannelOpts = (ProofRequest, Opts, SupportedChainSpecs); + +#[derive(Debug, Clone)] +pub struct ProverState { + pub opts: Opts, + pub chain_specs: SupportedChainSpecs, + pub task_channel: mpsc::Sender, +} + impl ProverState { pub fn init() -> HostResult { // Read the command line arguments; - let mut opts = Cli::parse(); + let mut opts = Opts::parse(); // Read the config file. opts.merge_from_file()?; @@ -178,78 +163,12 @@ impl ProverState { } } - let (task_channel, mut receiver) = mpsc::channel::(opts.concurrency_limit); - - let _spawn = tokio::spawn(async move { - while let Some((proof_request, opts, chain_specs)) = receiver.recv().await { - let Ok((chain_id, blockhash)) = get_task_data( - &proof_request.network, - proof_request.block_number, - &chain_specs, - ) - .await - else { - error!("Could not retrieve chain ID and blockhash"); - continue; - }; - let mut manager = get_task_manager(&opts.clone().into()); - if manager - .update_task_progress( - chain_id, - blockhash, - proof_request.proof_type, - Some(proof_request.prover.to_string()), - TaskStatus::WorkInProgress, - None, - ) - .await - .is_err() - { - error!("Could not update task to work in progress via task manager"); - } - match handle_proof(&proof_request, &opts, &chain_specs).await { - Ok(proof) => { - let proof = proof.proof.unwrap_or_default(); - let proof = proof.as_bytes(); - debug!( - "Proof generated for block {} on {} is {:?}", - proof_request.block_number, proof_request.network, proof - ); - if manager - .update_task_progress( - chain_id, - blockhash, - proof_request.proof_type, - Some(proof_request.prover.to_string()), - TaskStatus::Success, - Some(proof), - ) - .await - .is_err() - { - error!("Could not update task progress to success via task manager"); - } - } - Err(error) => { - if manager - .update_task_progress( - chain_id, - blockhash, - proof_request.proof_type, - Some(proof_request.prover.to_string()), - error.into(), - None, - ) - .await - .is_err() - { - error!( - "Could not update task progress to error state via task manager" - ); - } - } - } - } + let (task_channel, receiver) = mpsc::channel::(opts.concurrency_limit); + + tokio::spawn(async move { + ProofActor::new(receiver, opts.concurrency_limit) + .run() + .await; }); Ok(Self { @@ -260,102 +179,8 @@ impl ProverState { } } -pub async fn handle_proof( - proof_request: &ProofRequest, - opts: &Cli, - chain_specs: &SupportedChainSpecs, -) -> HostResult { - inc_host_req_count(proof_request.block_number); - inc_guest_req_count(&proof_request.proof_type, proof_request.block_number); - - info!( - "# Generating proof for block {} on {}", - proof_request.block_number, proof_request.network - ); - - // Check for a cached input for the given request config. - let cached_input = get_cached_input( - &opts.cache_path, - proof_request.block_number, - &proof_request.network.to_string(), - ); - - let l1_chain_spec = chain_specs - .get_chain_spec(&proof_request.l1_network.to_string()) - .ok_or_else(|| HostError::InvalidRequestConfig("Unsupported l1 network".to_string()))?; - - let taiko_chain_spec = chain_specs - .get_chain_spec(&proof_request.network.to_string()) - .ok_or_else(|| HostError::InvalidRequestConfig("Unsupported raiko network".to_string()))?; - - // Execute the proof generation. - let total_time = Measurement::start("", false); - - let raiko = Raiko::new( - l1_chain_spec.clone(), - taiko_chain_spec.clone(), - proof_request.clone(), - ); - let provider = RpcBlockDataProvider::new( - &taiko_chain_spec.rpc.clone(), - proof_request.block_number - 1, - )?; - let input = match validate_cache_input(cached_input, &provider).await { - Ok(cache_input) => cache_input, - Err(_) => { - // no valid cache - memory::reset_stats(); - let measurement = Measurement::start("Generating input...", false); - let input = raiko.generate_input(provider).await?; - let input_time = measurement.stop_with("=> Input generated"); - observe_prepare_input_time(proof_request.block_number, input_time, true); - memory::print_stats("Input generation peak memory used: "); - input - } - }; - memory::reset_stats(); - let output = raiko.get_output(&input)?; - memory::print_stats("Guest program peak memory used: "); - - memory::reset_stats(); - let measurement = Measurement::start("Generating proof...", false); - let proof = raiko.prove(input.clone(), &output).await.map_err(|e| { - let total_time = total_time.stop_with("====> Proof generation failed"); - observe_total_time(proof_request.block_number, total_time, false); - match e { - RaikoError::Guest(e) => { - inc_guest_error(&proof_request.proof_type, proof_request.block_number); - HostError::Core(e.into()) - } - e => { - inc_host_error(proof_request.block_number); - e.into() - } - } - })?; - let guest_time = measurement.stop_with("=> Proof generated"); - observe_guest_time( - &proof_request.proof_type, - proof_request.block_number, - guest_time, - true, - ); - memory::print_stats("Prover peak memory used: "); - - inc_guest_success(&proof_request.proof_type, proof_request.block_number); - let total_time = total_time.stop_with("====> Complete proof generated"); - observe_total_time(proof_request.block_number, total_time, true); - - // Cache the input for future use. - set_cached_input( - &opts.cache_path, - proof_request.block_number, - &proof_request.network.to_string(), - &input, - )?; - - ProofResponse::try_from(proof) -} +#[global_allocator] +static ALLOCATOR: Cap = Cap::new(alloc::System, usize::MAX); mod memory { use tracing::debug; @@ -372,10 +197,8 @@ mod memory { pub(crate) fn print_stats(title: &str) { let max_memory = get_max_allocated(); - debug!( - "{title}{}.{:06} MB", - max_memory / 1_000_000, - max_memory % 1_000_000 - ); + let mbs = max_memory / 1_000_000; + let kbs = max_memory % 1_000_000; + debug!("{title}{mbs}.{kbs:06} MB"); } } diff --git a/host/src/proof.rs b/host/src/proof.rs new file mode 100644 index 000000000..41a6df550 --- /dev/null +++ b/host/src/proof.rs @@ -0,0 +1,212 @@ +use std::sync::Arc; + +use raiko_core::{ + interfaces::{ProofRequest, RaikoError}, + provider::{get_task_data, rpc::RpcBlockDataProvider}, + Raiko, +}; +use raiko_lib::{consts::SupportedChainSpecs, Measurement}; +use raiko_task_manager::{get_task_manager, TaskManager, TaskStatus}; +use tokio::sync::{mpsc::Receiver, Semaphore}; +use tracing::{error, info}; + +use crate::{ + interfaces::{HostError, HostResult}, + memory, + metrics::{ + inc_guest_error, inc_guest_success, inc_host_error, observe_guest_time, + observe_prepare_input_time, observe_total_time, + }, + server::api::v1::{ + proof::{get_cached_input, set_cached_input, validate_cache_input}, + ProofResponse, + }, + Opts, TaskChannelOpts, +}; + +pub struct ProofActor { + rx: Receiver, + task_count: usize, +} + +impl ProofActor { + pub fn new(rx: Receiver, task_count: usize) -> Self { + Self { rx, task_count } + } + + pub async fn run(&mut self) { + let semaphore = Arc::new(Semaphore::new(self.task_count)); + while let Some(message) = self.rx.recv().await { + let permit = Arc::clone(&semaphore).acquire_owned().await; + tokio::spawn(async move { + let _permit = permit; + if let Err(error) = Self::handle_message(message).await { + error!("Worker failed due to: {error:?}"); + } + }); + } + } + + pub async fn handle_message( + (proof_request, opts, chain_specs): TaskChannelOpts, + ) -> HostResult<()> { + let (chain_id, blockhash) = get_task_data( + &proof_request.network, + proof_request.block_number, + &chain_specs, + ) + .await?; + let mut manager = get_task_manager(&opts.clone().into()); + let status = manager + .get_task_proving_status( + chain_id, + blockhash, + proof_request.proof_type, + Some(proof_request.prover.clone().to_string()), + ) + .await?; + + if let Some(latest_status) = status.iter().last() { + if !matches!(latest_status.0, TaskStatus::Registered) { + return Ok(()); + } + } + + manager + .update_task_progress( + chain_id, + blockhash, + proof_request.proof_type, + Some(proof_request.prover.to_string()), + TaskStatus::WorkInProgress, + None, + ) + .await?; + + match handle_proof(&proof_request, &opts, &chain_specs).await { + Ok(result) => { + let proof_string = result.proof.unwrap_or_default(); + let proof = proof_string.as_bytes(); + + manager + .update_task_progress( + chain_id, + blockhash, + proof_request.proof_type, + Some(proof_request.prover.to_string()), + TaskStatus::Success, + Some(proof), + ) + .await?; + } + Err(error) => { + manager + .update_task_progress( + chain_id, + blockhash, + proof_request.proof_type, + Some(proof_request.prover.to_string()), + error.into(), + None, + ) + .await?; + } + } + + Ok(()) + } +} + +pub async fn handle_proof( + proof_request: &ProofRequest, + opts: &Opts, + chain_specs: &SupportedChainSpecs, +) -> HostResult { + info!( + "# Generating proof for block {} on {}", + proof_request.block_number, proof_request.network + ); + + // Check for a cached input for the given request config. + let cached_input = get_cached_input( + &opts.cache_path, + proof_request.block_number, + &proof_request.network.to_string(), + ); + + let l1_chain_spec = chain_specs + .get_chain_spec(&proof_request.l1_network.to_string()) + .ok_or_else(|| HostError::InvalidRequestConfig("Unsupported l1 network".to_string()))?; + + let taiko_chain_spec = chain_specs + .get_chain_spec(&proof_request.network.to_string()) + .ok_or_else(|| HostError::InvalidRequestConfig("Unsupported raiko network".to_string()))?; + + // Execute the proof generation. + let total_time = Measurement::start("", false); + + let raiko = Raiko::new( + l1_chain_spec.clone(), + taiko_chain_spec.clone(), + proof_request.clone(), + ); + let provider = RpcBlockDataProvider::new( + &taiko_chain_spec.rpc.clone(), + proof_request.block_number - 1, + )?; + let input = match validate_cache_input(cached_input, &provider).await { + Ok(cache_input) => cache_input, + Err(_) => { + // no valid cache + memory::reset_stats(); + let measurement = Measurement::start("Generating input...", false); + let input = raiko.generate_input(provider).await?; + let input_time = measurement.stop_with("=> Input generated"); + observe_prepare_input_time(proof_request.block_number, input_time, true); + memory::print_stats("Input generation peak memory used: "); + input + } + }; + memory::reset_stats(); + let output = raiko.get_output(&input)?; + memory::print_stats("Guest program peak memory used: "); + + memory::reset_stats(); + let measurement = Measurement::start("Generating proof...", false); + let proof = raiko.prove(input.clone(), &output).await.map_err(|e| { + let total_time = total_time.stop_with("====> Proof generation failed"); + observe_total_time(proof_request.block_number, total_time, false); + match e { + RaikoError::Guest(e) => { + inc_guest_error(&proof_request.proof_type, proof_request.block_number); + HostError::Core(e.into()) + } + e => { + inc_host_error(proof_request.block_number); + e.into() + } + } + })?; + let guest_time = measurement.stop_with("=> Proof generated"); + observe_guest_time( + &proof_request.proof_type, + proof_request.block_number, + guest_time, + true, + ); + memory::print_stats("Prover peak memory used: "); + + inc_guest_success(&proof_request.proof_type, proof_request.block_number); + let total_time = total_time.stop_with("====> Complete proof generated"); + observe_total_time(proof_request.block_number, total_time, true); + + // Cache the input for future use. + set_cached_input( + &opts.cache_path, + proof_request.block_number, + &proof_request.network.to_string(), + &input, + )?; + + ProofResponse::try_from(proof) +} diff --git a/host/src/server/api/v2/mod.rs b/host/src/server/api/v2/mod.rs index 47d9b49ff..22aaa36c6 100644 --- a/host/src/server/api/v2/mod.rs +++ b/host/src/server/api/v2/mod.rs @@ -1,10 +1,12 @@ -use axum::Router; -use utoipa::OpenApi; +use axum::{response::IntoResponse, Json, Router}; +use raiko_task_manager::TaskStatus; +use serde::{Serialize, Serializer}; +use utoipa::{OpenApi, ToSchema}; use utoipa_scalar::{Scalar, Servable}; use utoipa_swagger_ui::SwaggerUi; use crate::{ - server::api::v1::{self, GuestOutputDoc, ProofResponse, Status}, + server::api::v1::{self, GuestOutputDoc}, ProverState, }; @@ -23,7 +25,7 @@ mod proof; ), license( name = "MIT", - url = "https://github.com/taikoxyz/raiko/blob/taiko/unstable/LICENSE" + url = "https://github.com/taikoxyz/raiko/blob/main/LICENSE" ), ), components( @@ -33,6 +35,9 @@ mod proof; crate::interfaces::HostError, GuestOutputDoc, ProofResponse, + TaskStatus, + CancelStatus, + PruneStatus, Status, ) ), @@ -45,6 +50,95 @@ mod proof; /// The root API struct which is generated from the `OpenApi` derive macro. pub struct Docs; +#[derive(Debug, Serialize, ToSchema)] +#[serde(untagged)] +pub enum ProofResponse { + Status { + /// The status of the submitted task. + status: TaskStatus, + }, + Proof { + #[serde(serialize_with = "ProofResponse::serialize_proof")] + /// The proof. + proof: Option>, + }, +} + +impl ProofResponse { + fn serialize_proof(proof: &Option>, serializer: S) -> Result + where + S: Serializer, + { + match proof { + Some(value) => serializer.serialize_str(&String::from_utf8(value.clone()).unwrap()), + None => serializer.serialize_str(""), + } + } +} + +#[derive(Debug, Serialize, ToSchema)] +#[serde(tag = "status", rename_all = "lowercase")] +pub enum Status { + Ok { data: ProofResponse }, + Error { error: String, message: String }, +} + +impl From> for Status { + fn from(proof: Vec) -> Self { + Self::Ok { + data: ProofResponse::Proof { proof: Some(proof) }, + } + } +} + +impl From for Status { + fn from(status: TaskStatus) -> Self { + Self::Ok { + data: ProofResponse::Status { status }, + } + } +} + +impl IntoResponse for Status { + fn into_response(self) -> axum::response::Response { + Json(serde_json::to_value(self).unwrap()).into_response() + } +} + +#[derive(Debug, Serialize, ToSchema)] +#[serde(tag = "status", rename_all = "lowercase")] +/// Status of cancellation request. +/// Can be `ok` for a successful cancellation or `error` with message and error type for errors. +pub enum CancelStatus { + /// Cancellation was successful. + Ok, + /// Cancellation failed. + Error { error: String, message: String }, +} + +impl IntoResponse for CancelStatus { + fn into_response(self) -> axum::response::Response { + Json(serde_json::to_value(self).unwrap()).into_response() + } +} + +#[derive(Debug, Serialize, ToSchema)] +#[serde(tag = "status", rename_all = "lowercase")] +/// Status of prune request. +/// Can be `ok` for a successful prune or `error` with message and error type for errors. +pub enum PruneStatus { + /// Prune was successful. + Ok, + /// Prune failed. + Error { error: String, message: String }, +} + +impl IntoResponse for PruneStatus { + fn into_response(self) -> axum::response::Response { + Json(serde_json::to_value(self).unwrap()).into_response() + } +} + #[must_use] pub fn create_docs() -> utoipa::openapi::OpenApi { [ diff --git a/host/src/server/api/v2/proof/cancel.rs b/host/src/server/api/v2/proof/cancel.rs new file mode 100644 index 000000000..209fdb60b --- /dev/null +++ b/host/src/server/api/v2/proof/cancel.rs @@ -0,0 +1,70 @@ +use axum::{debug_handler, extract::State, routing::post, Json, Router}; +use raiko_core::{interfaces::ProofRequest, provider::get_task_data}; +use raiko_task_manager::{get_task_manager, TaskManager, TaskStatus}; +use serde_json::Value; +use utoipa::OpenApi; + +use crate::{interfaces::HostResult, server::api::v2::CancelStatus, ProverState}; + +#[utoipa::path(post, path = "/proof/cancel", + tag = "Proving", + request_body = ProofRequestOpt, + responses ( + (status = 200, description = "Successfully cancelled proof task", body = CancelStatus) + ) +)] +#[debug_handler(state = ProverState)] +/// Cancel a proof task with requested config. +/// +/// Accepts a proof request and cancels a proving task with the specified guest prover. +/// The guest provers currently available are: +/// - native - constructs a block and checks for equality +/// - sgx - uses the sgx environment to construct a block and produce proof of execution +/// - sp1 - uses the sp1 prover +/// - risc0 - uses the risc0 prover +async fn cancel_handler( + State(prover_state): State, + Json(req): Json, +) -> HostResult { + // Override the existing proof request config from the config file and command line + // options with the request from the client. + let mut config = prover_state.opts.proof_request_opt.clone(); + config.merge(&req)?; + + // Construct the actual proof request from the available configs. + let proof_request = ProofRequest::try_from(config)?; + + let (chain_id, block_hash) = get_task_data( + &proof_request.network, + proof_request.block_number, + &prover_state.chain_specs, + ) + .await?; + + let mut manager = get_task_manager(&(&prover_state.opts).into()); + + manager + .update_task_progress( + chain_id, + block_hash, + proof_request.proof_type, + Some(proof_request.prover.to_string()), + TaskStatus::Cancelled, + None, + ) + .await?; + + Ok(CancelStatus::Ok) +} + +#[derive(OpenApi)] +#[openapi(paths(cancel_handler))] +struct Docs; + +pub fn create_docs() -> utoipa::openapi::OpenApi { + Docs::openapi() +} + +pub fn create_router() -> Router { + Router::new().route("/", post(cancel_handler)) +} diff --git a/host/src/server/api/v2/proof.rs b/host/src/server/api/v2/proof/mod.rs similarity index 53% rename from host/src/server/api/v2/proof.rs rename to host/src/server/api/v2/proof/mod.rs index 6460ffddc..b2f787fa0 100644 --- a/host/src/server/api/v2/proof.rs +++ b/host/src/server/api/v2/proof/mod.rs @@ -1,23 +1,27 @@ use axum::{debug_handler, extract::State, routing::post, Json, Router}; -use raiko_core::interfaces::ProofRequest; -use raiko_core::provider::get_task_data; -use raiko_task_manager::{get_task_manager, EnqueueTaskParams, TaskManager, TaskStatus}; +use raiko_core::{interfaces::ProofRequest, provider::get_task_data}; +use raiko_task_manager::{ + get_task_manager, EnqueueTaskParams, TaskManager, TaskProvingStatus, TaskStatus, +}; use serde_json::Value; -use tracing::info; use utoipa::OpenApi; use crate::{ interfaces::HostResult, metrics::{inc_current_req, inc_guest_req_count, inc_host_req_count}, - server::api::v1::ProofResponse, + server::api::v2::Status, ProverState, }; +mod cancel; +mod prune; +mod report; + #[utoipa::path(post, path = "/proof", tag = "Proving", request_body = ProofRequestOpt, responses ( - (status = 200, description = "Successfully submitted proof task", body = Status) + (status = 200, description = "Successfully submitted proof task, queried tasks in progress or retrieved proof.", body = Status) ) )] #[debug_handler(state = ProverState)] @@ -32,7 +36,7 @@ use crate::{ async fn proof_handler( State(prover_state): State, Json(req): Json, -) -> HostResult> { +) -> HostResult { inc_current_req(); // Override the existing proof request config from the config file and command line // options with the request from the client. @@ -61,12 +65,8 @@ async fn proof_handler( ) .await?; - if status.is_empty() { - info!( - "# Generating proof for block {} on {}", - proof_request.block_number, proof_request.network - ); - + let Some(TaskProvingStatus(latest_status, ..)) = status.last() else { + // If there are no tasks with provided config, create a new one. manager .enqueue_task(&EnqueueTaskParams { chain_id, @@ -83,45 +83,49 @@ async fn proof_handler( prover_state.chain_specs, ))?; - return Ok(Json(serde_json::json!( - { - "status": "ok", - "data": { - "status": TaskStatus::Registered, - } - } - ))); - } - - let status = status.last().unwrap().0; - - if matches!(status, TaskStatus::Success) { - let proof = manager - .get_task_proof( - chain_id, - block_hash, - proof_request.proof_type, - Some(proof_request.prover.to_string()), - ) - .await?; - - let response = ProofResponse { - proof: Some(String::from_utf8(proof).unwrap()), - output: None, - quote: None, - }; - - return Ok(Json(response.to_response())); - } - - Ok(Json(serde_json::json!( - { - "status": "ok", - "data": { - "status": status, - } + return Ok(TaskStatus::Registered.into()); + }; + + match latest_status { + // If task has been cancelled add it to the queue again + TaskStatus::Cancelled + | TaskStatus::Cancelled_Aborted + | TaskStatus::Cancelled_NeverStarted + | TaskStatus::CancellationInProgress => { + manager + .enqueue_task(&EnqueueTaskParams { + chain_id, + blockhash: block_hash, + proof_type: proof_request.proof_type, + prover: proof_request.prover.to_string(), + block_number: proof_request.block_number, + }) + .await?; + + prover_state.task_channel.try_send(( + proof_request.clone(), + prover_state.opts, + prover_state.chain_specs, + ))?; + + Ok(TaskStatus::Registered.into()) + } + // If the task has succeeded, return the proof. + TaskStatus::Success => { + let proof = manager + .get_task_proof( + chain_id, + block_hash, + proof_request.proof_type, + Some(proof_request.prover.to_string()), + ) + .await?; + + Ok(proof.into()) } - ))) + // For all other statuses just return the status. + status => Ok((*status).into()), + } } #[derive(OpenApi)] @@ -129,9 +133,22 @@ async fn proof_handler( struct Docs; pub fn create_docs() -> utoipa::openapi::OpenApi { - Docs::openapi() + [ + cancel::create_docs(), + report::create_docs(), + prune::create_docs(), + ] + .into_iter() + .fold(Docs::openapi(), |mut docs, curr| { + docs.merge(curr); + docs + }) } pub fn create_router() -> Router { - Router::new().route("/", post(proof_handler)) + Router::new() + .route("/", post(proof_handler)) + .nest("/cancel", cancel::create_router()) + .nest("/report", report::create_router()) + .nest("/prune", prune::create_router()) } diff --git a/host/src/server/api/v2/proof/prune.rs b/host/src/server/api/v2/proof/prune.rs new file mode 100644 index 000000000..c58482808 --- /dev/null +++ b/host/src/server/api/v2/proof/prune.rs @@ -0,0 +1,33 @@ +use axum::{debug_handler, extract::State, routing::post, Router}; +use raiko_task_manager::{get_task_manager, TaskManager}; +use utoipa::OpenApi; + +use crate::{interfaces::HostResult, server::api::v2::PruneStatus, ProverState}; + +#[utoipa::path(post, path = "/proof/prune", + tag = "Proving", + responses ( + (status = 200, description = "Successfully pruned tasks", body = PruneStatus) + ) +)] +#[debug_handler(state = ProverState)] +/// Prune all tasks. +async fn prune_handler(State(prover_state): State) -> HostResult { + let mut manager = get_task_manager(&(&prover_state.opts).into()); + + manager.prune_db().await?; + + Ok(PruneStatus::Ok) +} + +#[derive(OpenApi)] +#[openapi(paths(prune_handler))] +struct Docs; + +pub fn create_docs() -> utoipa::openapi::OpenApi { + Docs::openapi() +} + +pub fn create_router() -> Router { + Router::new().route("/", post(prune_handler)) +} diff --git a/host/src/server/api/v2/proof/report.rs b/host/src/server/api/v2/proof/report.rs new file mode 100644 index 000000000..2f84afae8 --- /dev/null +++ b/host/src/server/api/v2/proof/report.rs @@ -0,0 +1,36 @@ +use axum::{debug_handler, extract::State, routing::post, Json, Router}; +use raiko_task_manager::{get_task_manager, TaskManager}; +use serde_json::Value; +use utoipa::OpenApi; + +use crate::{interfaces::HostResult, ProverState}; + +#[utoipa::path(post, path = "/proof/report", + tag = "Proving", + responses ( + (status = 200, description = "Successfully listed all current tasks", body = CancelStatus) + ) +)] +#[debug_handler(state = ProverState)] +/// List all tasks. +/// +/// Retrieve a list of `{ chain_id, blockhash, prover_type, prover, status }` items. +async fn report_handler(State(prover_state): State) -> HostResult> { + let mut manager = get_task_manager(&(&prover_state.opts).into()); + + let task_report = manager.list_all_tasks().await?; + + Ok(Json(serde_json::to_value(task_report)?)) +} + +#[derive(OpenApi)] +#[openapi(paths(report_handler))] +struct Docs; + +pub fn create_docs() -> utoipa::openapi::OpenApi { + Docs::openapi() +} + +pub fn create_router() -> Router { + Router::new().route("/", post(report_handler)) +} diff --git a/lib/src/utils.rs b/lib/src/utils.rs index 14ff9bff0..1fd370103 100644 --- a/lib/src/utils.rs +++ b/lib/src/utils.rs @@ -1,11 +1,10 @@ -use std::io::Read; -use std::io::Write; +use std::io::{Read, Write}; use alloy_rlp::Decodable; use anyhow::Result; -use libflate::zlib::Decoder as zlibDecoder; -use libflate::zlib::Encoder as zlibEncoder; +use libflate::zlib::{Decoder as zlibDecoder, Encoder as zlibEncoder}; use reth_primitives::TransactionSigned; +use tracing::warn; use crate::consts::{ChainSpec, Network}; #[cfg(not(feature = "std"))] @@ -15,7 +14,7 @@ pub fn decode_transactions(tx_list: &[u8]) -> Vec { #[allow(clippy::useless_asref)] Vec::::decode(&mut tx_list.as_ref()).unwrap_or_else(|e| { // If decoding fails we need to make an empty block - println!("decode_transactions not successful: {e:?}, use empty tx_list"); + warn!("decode_transactions not successful: {e:?}, use empty tx_list"); vec![] }) } @@ -38,14 +37,14 @@ fn get_tx_list(chain_spec: &ChainSpec, is_blob_data: bool, tx_list: &[u8]) -> Ve if validate_calldata_tx_list(&tx_list) { tx_list } else { - println!("validate_calldata_tx_list failed, use empty tx_list"); + warn!("validate_calldata_tx_list failed, use empty tx_list"); vec![] } } else { if validate_calldata_tx_list(tx_list) { zlib_decompress_data(tx_list).unwrap_or_default() } else { - println!("validate_calldata_tx_list failed, use empty tx_list"); + warn!("validate_calldata_tx_list failed, use empty tx_list"); vec![] } } diff --git a/pipeline/src/builder.rs b/pipeline/src/builder.rs index ac3ead89f..7560fcf9d 100644 --- a/pipeline/src/builder.rs +++ b/pipeline/src/builder.rs @@ -240,7 +240,8 @@ impl CommandBuilder { .meta .target_directory .join(self.target.clone()) - .join(profile); + .join(profile) + .into(); let artifacts = self .meta @@ -279,7 +280,8 @@ impl CommandBuilder { .target_directory .join(self.target.clone()) .join(profile) - .join("deps"); + .join("deps") + .into(); println!("tests {bins:?}"); diff --git a/provers/sgx/setup/Cargo.toml b/provers/sgx/setup/Cargo.toml index 2f1a94af4..10dfd2d4c 100644 --- a/provers/sgx/setup/Cargo.toml +++ b/provers/sgx/setup/Cargo.toml @@ -33,7 +33,6 @@ serde = { workspace = true } serde_with = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } -hyper = { workspace = true } env_logger = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/task_manager/Cargo.toml b/task_manager/Cargo.toml index ec888c35e..8fe4187f3 100644 --- a/task_manager/Cargo.toml +++ b/task_manager/Cargo.toml @@ -18,6 +18,7 @@ tracing = { workspace = true } anyhow = { workspace = true } tokio = { workspace = true } async-trait = { workspace = true } +utoipa = { workspace = true } [dev-dependencies] rand = "0.9.0-alpha.1" # This is an alpha version, that has rng.gen_iter::() diff --git a/task_manager/src/adv_sqlite.rs b/task_manager/src/adv_sqlite.rs index 25cd215a4..692cb543f 100644 --- a/task_manager/src/adv_sqlite.rs +++ b/task_manager/src/adv_sqlite.rs @@ -167,8 +167,8 @@ use rusqlite::{ use tokio::sync::Mutex; use crate::{ - EnqueueTaskParams, TaskManager, TaskManagerError, TaskManagerOpts, TaskManagerResult, - TaskProvingStatus, TaskProvingStatusRecords, TaskStatus, + EnqueueTaskParams, TaskDescriptor, TaskManager, TaskManagerError, TaskManagerOpts, + TaskManagerResult, TaskProvingStatus, TaskProvingStatusRecords, TaskReport, TaskStatus, }; // Types @@ -251,9 +251,9 @@ impl TaskDb { proofsys(id, desc) VALUES (0, 'Native'), - (1, 'Risc0'), - (2, 'SP1'), - (3, 'SGX'); + (1, 'SP1'), + (2, 'SGX'), + (3, 'Risc0'); CREATE TABLE status_codes( id INTEGER UNIQUE NOT NULL PRIMARY KEY, @@ -612,41 +612,6 @@ impl TaskDb { Ok(query.collect::, _>>()?) } - pub fn get_task_proving_status_by_id( - &self, - task_id: u64, - ) -> TaskManagerResult { - let mut statement = self.conn.prepare_cached( - r#" - SELECT - ts.status_id, - t.prover, - timestamp - FROM - task_status ts - LEFT JOIN tasks t ON ts.task_id = t.id - WHERE - t.id = :task_id - ORDER BY - ts.timestamp; - "#, - )?; - let query = statement.query_map( - named_params! { - ":task_id": task_id, - }, - |row| { - Ok(TaskProvingStatus( - TaskStatus::from(row.get::<_, i32>(0)?), - Some(row.get::<_, String>(1)?), - row.get::<_, DateTime>(2)?, - )) - }, - )?; - - Ok(query.collect::, _>>()?) - } - pub fn get_task_proof( &self, chain_id: ChainId, @@ -683,30 +648,6 @@ impl TaskDb { Ok(query) } - pub fn get_task_proof_by_id(&self, task_id: u64) -> TaskManagerResult> { - let mut statement = self.conn.prepare_cached( - r#" - SELECT - proof - FROM - task_proofs tp - LEFT JOIN tasks t ON tp.task_id = t.id - WHERE - t.id = :task_id - LIMIT - 1; - "#, - )?; - let query = statement.query_row( - named_params! { - ":task_id": task_id, - }, - |row| row.get(0), - )?; - - Ok(query) - } - pub fn get_db_size(&self) -> TaskManagerResult<(usize, Vec<(String, usize)>)> { let mut statement = self.conn.prepare_cached( r#" @@ -745,6 +686,47 @@ impl TaskDb { Ok(()) } + + pub fn list_all_tasks(&self) -> TaskManagerResult> { + let mut statement = self.conn.prepare_cached( + r#" + SELECT + chain_id, + blockhash, + proofsys_id, + prover, + status_id + FROM + tasks + LEFT JOIN task_status on task.id = task_status.task_id + JOIN ( + SELECT + task_id, + MAX(timestamp) as latest_timestamp + FROM + task_status + GROUP BY + task_id + ) latest_ts ON task_status.task_id = latest_ts.task_id + AND task_status.timestamp = latest_ts.latest_timestamp + "#, + )?; + let query = statement + .query_map([], |row| { + Ok(TaskReport( + TaskDescriptor { + chain_id: row.get(0)?, + blockhash: B256::from_slice(&row.get::<_, Vec>(1)?), + proof_system: row.get::<_, u8>(2)?.try_into().unwrap(), + prover: row.get(3)?, + }, + TaskStatus::from(row.get::<_, i32>(4)?), + )) + })? + .collect::, _>>()?; + + Ok(query) + } } #[async_trait::async_trait] @@ -799,15 +781,6 @@ impl TaskManager for SqliteTaskManager { task_db.get_task_proving_status(chain_id, blockhash, proof_type, prover) } - /// Returns the latest triplet (submitter or fulfiller, status, last update time) - async fn get_task_proving_status_by_id( - &mut self, - task_id: u64, - ) -> TaskManagerResult { - let task_db = self.arc_task_db.lock().await; - task_db.get_task_proving_status_by_id(task_id) - } - async fn get_task_proof( &mut self, chain_id: ChainId, @@ -819,11 +792,6 @@ impl TaskManager for SqliteTaskManager { task_db.get_task_proof(chain_id, blockhash, proof_type, prover) } - async fn get_task_proof_by_id(&mut self, task_id: u64) -> TaskManagerResult> { - let task_db = self.arc_task_db.lock().await; - task_db.get_task_proof_by_id(task_id) - } - /// Returns the total and detailed database size async fn get_db_size(&mut self) -> TaskManagerResult<(usize, Vec<(String, usize)>)> { let task_db = self.arc_task_db.lock().await; @@ -834,6 +802,11 @@ impl TaskManager for SqliteTaskManager { let task_db = self.arc_task_db.lock().await; task_db.prune_db() } + + async fn list_all_tasks(&mut self) -> TaskManagerResult> { + let task_db = self.arc_task_db.lock().await; + task_db.list_all_tasks() + } } #[cfg(test)] diff --git a/task_manager/src/lib.rs b/task_manager/src/lib.rs index 42275c20f..d0e7fdbb5 100644 --- a/task_manager/src/lib.rs +++ b/task_manager/src/lib.rs @@ -9,6 +9,7 @@ use raiko_core::interfaces::ProofType; use raiko_lib::primitives::{ChainId, B256}; use rusqlite::Error as SqlError; use serde::Serialize; +use utoipa::ToSchema; use crate::{adv_sqlite::SqliteTaskManager, mem_db::InMemoryTaskManager}; @@ -55,8 +56,9 @@ impl From for TaskManagerError { #[allow(non_camel_case_types)] #[rustfmt::skip] -#[derive(PartialEq, Debug, Copy, Clone, IntoPrimitive, FromPrimitive, Serialize)] +#[derive(PartialEq, Debug, Copy, Clone, IntoPrimitive, FromPrimitive, Serialize, ToSchema)] #[repr(i32)] +#[serde(rename_all = "snake_case")] pub enum TaskStatus { Success = 0, Registered = 1000, @@ -83,7 +85,7 @@ pub struct EnqueueTaskParams { pub block_number: u64, } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, PartialEq, Eq, Hash)] pub struct TaskDescriptor { pub chain_id: ChainId, pub blockhash: B256, @@ -144,6 +146,9 @@ pub struct TaskManagerOpts { pub max_db_size: usize, } +#[derive(Debug, Serialize, ToSchema)] +pub struct TaskReport(pub TaskDescriptor, pub TaskStatus); + #[async_trait::async_trait] pub trait TaskManager { /// new a task manager @@ -175,12 +180,6 @@ pub trait TaskManager { prover: Option, ) -> TaskManagerResult; - /// Returns the latest triplet (submitter or fulfiller, status, last update time) - async fn get_task_proving_status_by_id( - &mut self, - task_id: u64, - ) -> TaskManagerResult; - /// Returns the proof for the given task async fn get_task_proof( &mut self, @@ -190,13 +189,13 @@ pub trait TaskManager { prover: Option, ) -> TaskManagerResult>; - async fn get_task_proof_by_id(&mut self, task_id: u64) -> TaskManagerResult>; - /// Returns the total and detailed database size async fn get_db_size(&mut self) -> TaskManagerResult<(usize, Vec<(String, usize)>)>; /// Prune old tasks async fn prune_db(&mut self) -> TaskManagerResult<()>; + + async fn list_all_tasks(&mut self) -> TaskManagerResult>; } pub fn ensure(expression: bool, message: &str) -> TaskManagerResult<()> { @@ -281,20 +280,6 @@ impl TaskManager for TaskManagerWrapper { } } - async fn get_task_proving_status_by_id( - &mut self, - task_id: u64, - ) -> TaskManagerResult { - match &mut self.manager { - TaskManagerInstance::InMemory(ref mut manager) => { - manager.get_task_proving_status_by_id(task_id).await - } - TaskManagerInstance::Sqlite(ref mut manager) => { - manager.get_task_proving_status_by_id(task_id).await - } - } - } - async fn get_task_proof( &mut self, chain_id: ChainId, @@ -316,17 +301,6 @@ impl TaskManager for TaskManagerWrapper { } } - async fn get_task_proof_by_id(&mut self, task_id: u64) -> TaskManagerResult> { - match &mut self.manager { - TaskManagerInstance::InMemory(ref mut manager) => { - manager.get_task_proof_by_id(task_id).await - } - TaskManagerInstance::Sqlite(ref mut manager) => { - manager.get_task_proof_by_id(task_id).await - } - } - } - async fn get_db_size(&mut self) -> TaskManagerResult<(usize, Vec<(String, usize)>)> { match &mut self.manager { TaskManagerInstance::InMemory(ref mut manager) => manager.get_db_size().await, @@ -340,6 +314,13 @@ impl TaskManager for TaskManagerWrapper { TaskManagerInstance::Sqlite(ref mut manager) => manager.prune_db().await, } } + + async fn list_all_tasks(&mut self) -> TaskManagerResult> { + match &mut self.manager { + TaskManagerInstance::InMemory(ref mut manager) => manager.list_all_tasks().await, + TaskManagerInstance::Sqlite(ref mut manager) => manager.list_all_tasks().await, + } + } } pub fn get_task_manager(opts: &TaskManagerOpts) -> TaskManagerWrapper { diff --git a/task_manager/src/mem_db.rs b/task_manager/src/mem_db.rs index e413b7ca0..146b54cb3 100644 --- a/task_manager/src/mem_db.rs +++ b/task_manager/src/mem_db.rs @@ -14,12 +14,12 @@ use std::{ use crate::{ ensure, EnqueueTaskParams, TaskDescriptor, TaskManager, TaskManagerError, TaskManagerOpts, - TaskManagerResult, TaskProvingStatus, TaskProvingStatusRecords, TaskStatus, + TaskManagerResult, TaskProvingStatus, TaskProvingStatusRecords, TaskReport, TaskStatus, }; use chrono::Utc; use raiko_core::interfaces::ProofType; -use raiko_lib::primitives::{keccak::keccak, ChainId, B256}; +use raiko_lib::primitives::{ChainId, B256}; use tokio::sync::Mutex; use tracing::{debug, info}; @@ -30,22 +30,18 @@ pub struct InMemoryTaskManager { #[derive(Debug)] pub struct InMemoryTaskDb { - enqueue_task: HashMap, - task_id_desc: HashMap, - task_id: u64, + enqueue_task: HashMap, } impl InMemoryTaskDb { fn new() -> InMemoryTaskDb { InMemoryTaskDb { enqueue_task: HashMap::new(), - task_id_desc: HashMap::new(), - task_id: 0, } } fn enqueue_task(&mut self, params: &EnqueueTaskParams) { - let key: B256 = keccak(TaskDescriptor::from(params).to_vec()).into(); + let key = TaskDescriptor::from(params); let task_status = TaskProvingStatus( TaskStatus::Registered, Some(params.prover.clone()), @@ -62,8 +58,6 @@ impl InMemoryTaskDb { None => { info!("Enqueue new task: {:?}", params); self.enqueue_task.insert(key, vec![task_status]); - self.task_id_desc.insert(self.task_id, key); - self.task_id += 1; } } } @@ -77,26 +71,20 @@ impl InMemoryTaskDb { status: TaskStatus, proof: Option<&[u8]>, ) -> TaskManagerResult<()> { - let key: B256 = keccak( - TaskDescriptor::from((chain_id, blockhash, proof_system, prover.clone())).to_vec(), - ) - .into(); + let key = TaskDescriptor::from((chain_id, blockhash, proof_system, prover.clone())); ensure(self.enqueue_task.contains_key(&key), "no task found")?; - let task_proving_records = self.enqueue_task.get(&key).unwrap(); - let task_status = task_proving_records.last().unwrap().0; - if status != task_status { - let new_records = task_proving_records - .iter() - .cloned() - .chain(std::iter::once(TaskProvingStatus( - status, - proof.map(hex::encode), - Utc::now(), - ))) - .collect(); - self.enqueue_task.insert(key, new_records); - } + self.enqueue_task.entry(key).and_modify(|entry| { + if let Some(latest) = entry.last() { + if latest.0 != status { + entry.push(TaskProvingStatus( + status, + proof.map(hex::encode), + Utc::now(), + )); + } + } + }); Ok(()) } @@ -107,10 +95,7 @@ impl InMemoryTaskDb { proof_system: ProofType, prover: Option, ) -> TaskManagerResult { - let key: B256 = keccak( - TaskDescriptor::from((chain_id, blockhash, proof_system, prover.clone())).to_vec(), - ) - .into(); + let key = TaskDescriptor::from((chain_id, blockhash, proof_system, prover.clone())); match self.enqueue_task.get(&key) { Some(proving_status_records) => Ok(proving_status_records.clone()), @@ -118,16 +103,6 @@ impl InMemoryTaskDb { } } - fn get_task_proving_status_by_id( - &mut self, - task_id: u64, - ) -> TaskManagerResult { - ensure(self.task_id_desc.contains_key(&task_id), "no task found")?; - let key = self.task_id_desc.get(&task_id).unwrap(); - let task_status = self.enqueue_task.get(key).unwrap(); - Ok(task_status.clone()) - } - fn get_task_proof( &mut self, chain_id: ChainId, @@ -135,42 +110,38 @@ impl InMemoryTaskDb { proof_system: ProofType, prover: Option, ) -> TaskManagerResult> { - let key: B256 = keccak( - TaskDescriptor::from((chain_id, blockhash, proof_system, prover.clone())).to_vec(), - ) - .into(); + let key = TaskDescriptor::from((chain_id, blockhash, proof_system, prover.clone())); ensure(self.enqueue_task.contains_key(&key), "no task found")?; - let proving_status_records = self.enqueue_task.get(&key).unwrap(); - let task_status = proving_status_records.last().unwrap(); - if task_status.0 == TaskStatus::Success { - let proof = task_status.1.clone().unwrap(); - Ok(hex::decode(proof).unwrap()) - } else { - Err(TaskManagerError::SqlError("working in process".to_owned())) - } - } + let Some(proving_status_records) = self.enqueue_task.get(&key) else { + return Err(TaskManagerError::SqlError("no task in db".to_owned())); + }; - fn get_task_proof_by_id(&mut self, task_id: u64) -> TaskManagerResult> { - ensure(self.task_id_desc.contains_key(&task_id), "no task found")?; - let key = self.task_id_desc.get(&task_id).unwrap(); - let task_records = self.enqueue_task.get(key).unwrap(); - let task_status = task_records.last().unwrap(); - if task_status.0 == TaskStatus::Success { - let proof = task_status.1.clone().unwrap(); - Ok(hex::decode(proof).unwrap()) - } else { - Err(TaskManagerError::SqlError("working in process".to_owned())) - } + proving_status_records + .last() + .map(|status| hex::decode(status.1.clone().unwrap()).unwrap()) + .ok_or_else(|| TaskManagerError::SqlError("working in progress".to_owned())) } fn size(&mut self) -> TaskManagerResult<(usize, Vec<(String, usize)>)> { - Ok((self.enqueue_task.len() + self.task_id_desc.len(), vec![])) + Ok((self.enqueue_task.len(), vec![])) } fn prune(&mut self) -> TaskManagerResult<()> { Ok(()) } + + fn list_all_tasks(&mut self) -> TaskManagerResult> { + Ok(self + .enqueue_task + .iter() + .flat_map(|(descriptor, statuses)| { + statuses + .iter() + .map(|status| TaskReport(descriptor.clone(), status.0)) + }) + .collect()) + } } #[async_trait::async_trait] @@ -241,15 +212,6 @@ impl TaskManager for InMemoryTaskManager { db.get_task_proving_status(chain_id, blockhash, proof_system, prover) } - /// Returns the latest triplet (submitter or fulfiller, status, last update time) - async fn get_task_proving_status_by_id( - &mut self, - task_id: u64, - ) -> TaskManagerResult { - let mut db = self.db.lock().await; - db.get_task_proving_status_by_id(task_id) - } - async fn get_task_proof( &mut self, chain_id: ChainId, @@ -261,11 +223,6 @@ impl TaskManager for InMemoryTaskManager { db.get_task_proof(chain_id, blockhash, proof_system, prover) } - async fn get_task_proof_by_id(&mut self, task_id: u64) -> TaskManagerResult> { - let mut db = self.db.lock().await; - db.get_task_proof_by_id(task_id) - } - /// Returns the total and detailed database size async fn get_db_size(&mut self) -> TaskManagerResult<(usize, Vec<(String, usize)>)> { let mut db = self.db.lock().await; @@ -276,6 +233,11 @@ impl TaskManager for InMemoryTaskManager { let mut db = self.db.lock().await; db.prune() } + + async fn list_all_tasks(&mut self) -> TaskManagerResult> { + let mut db = self.db.lock().await; + db.list_all_tasks() + } } #[cfg(test)]