From 3421001456d8de2f293765b7164d9cf8b333359c Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Tue, 4 Jan 2022 13:36:22 +0100 Subject: [PATCH] Integrate ya-negotiator library (work in progress) --- Cargo.lock | 337 +++++++++++--- agent/provider/Cargo.toml | 5 +- agent/provider/src/execution/task_runner.rs | 2 +- agent/provider/src/market/config.rs | 26 +- agent/provider/src/market/negotiator.rs | 13 +- .../src/market/negotiator/accept_all.rs | 55 --- .../provider/src/market/negotiator/builtin.rs | 20 + .../market/negotiator/builtin/expiration.rs | 185 +++++--- .../negotiator/builtin/max_agreements.rs | 66 ++- .../provider/src/market/negotiator/common.rs | 213 --------- .../src/market/negotiator/component.rs | 170 ------- .../src/market/negotiator/composite.rs | 176 -------- .../provider/src/market/negotiator/factory.rs | 64 --- agent/provider/src/market/provider_market.rs | 425 ++++++++++++------ .../provider/src/market/termination_reason.rs | 15 + agent/provider/src/payments/agreement.rs | 4 +- agent/provider/src/payments/payments.rs | 7 +- agent/provider/src/provider_agent.rs | 2 +- agent/provider/src/startup_config.rs | 4 +- agent/provider/src/tasks/task_info.rs | 2 +- agent/provider/src/tasks/task_manager.rs | 11 +- 21 files changed, 790 insertions(+), 1012 deletions(-) delete mode 100644 agent/provider/src/market/negotiator/accept_all.rs delete mode 100644 agent/provider/src/market/negotiator/component.rs delete mode 100644 agent/provider/src/market/negotiator/composite.rs delete mode 100644 agent/provider/src/market/negotiator/factory.rs diff --git a/Cargo.lock b/Cargo.lock index 33e2083517..b4dfe19560 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,52 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "abi_stable" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c14e9b355e56a614082b552bf24c28b9db1ac50325c96125a1f2723575b39653" +dependencies = [ + "abi_stable_derive", + "abi_stable_shared", + "core_extensions", + "crossbeam-channel 0.5.0", + "generational-arena", + "libloading", + "lock_api 0.4.2", + "parking_lot 0.11.1", + "repr_offset", + "rustc_version", + "serde", + "serde_derive", + "serde_json", +] + +[[package]] +name = "abi_stable_derive" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3434fe2abd3d4c780fb4c23b747cdb8bc2fc1b52dc2329b7b40c1b62ae9f1ab" +dependencies = [ + "abi_stable_shared", + "as_derive_utils", + "core_extensions", + "proc-macro2", + "quote", + "rustc_version", + "syn", + "typed-arena", +] + +[[package]] +name = "abi_stable_shared" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a96112d6a13f37dd1cc2a2f07f7f12b7c3d176b6eec36e7987432a70fece8b48" +dependencies = [ + "core_extensions", +] + [[package]] name = "actix" version = "0.10.0" @@ -558,6 +604,18 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be4dc07131ffa69b8072d35f5007352af944213cde02545e2103680baed38fcd" +[[package]] +name = "as_derive_utils" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffc3613a62c7b739739a2cb1ee166ac275f16c5b86caf454ba21a2f79f04b025" +dependencies = [ + "core_extensions", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "asnom" version = "0.1.1" @@ -732,7 +790,7 @@ dependencies = [ "byteorder", "cfg-if 1.0.0", "crossbeam", - "futures 0.3.13", + "futures 0.3.19", "hex", "lazy_static", "num_cpus", @@ -1226,6 +1284,15 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea221b5284a47e40033bf9b66f35f984ec0ea2931eb03505246cd27a963f981b" +[[package]] +name = "core_extensions" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a117e1ca5d83ff939487e48e143c6a642cebd01a88f472a4ec7dc4773690fda" +dependencies = [ + "rustc_version", +] + [[package]] name = "cpuid-bool" version = "0.1.2" @@ -2116,9 +2183,9 @@ checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" [[package]] name = "futures" -version = "0.3.13" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f55667319111d593ba876406af7c409c0ebb44dc4be6132a783ccf163ea14c1" +checksum = "28560757fe2bb34e79f907794bb6b22ae8b0e5c669b638a1132f2592b19035b4" dependencies = [ "futures-channel", "futures-core", @@ -2131,9 +2198,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.13" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c2dd2df839b57db9ab69c2c9d8f3e8c81984781937fe2807dc6dcf3b2ad2939" +checksum = "ba3dda0b6588335f360afc675d0564c17a77a2bda81ca178a4b6081bd86c7f0b" dependencies = [ "futures-core", "futures-sink", @@ -2141,15 +2208,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.13" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15496a72fabf0e62bdc3df11a59a3787429221dd0710ba8ef163d6f7a9112c94" +checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7" [[package]] name = "futures-executor" -version = "0.3.13" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "891a4b7b96d84d5940084b2a37632dd65deeae662c114ceaa2c879629c9c0ad1" +checksum = "29d6d2ff5bb10fb95c85b8ce46538a2e5f5e7fdc755623a7d4529ab8a4ed9d2a" dependencies = [ "futures-core", "futures-task", @@ -2159,17 +2226,16 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.13" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71c2c65c57704c32f5241c1223167c2c3294fd34ac020c807ddbe6db287ba59" +checksum = "b1f9d34af5a1aac6fb380f735fe510746c38067c5bf16c7fd250280503c971b2" [[package]] name = "futures-macro" -version = "0.3.13" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea405816a5139fb39af82c2beb921d52143f556038378d6db21183a5c37fbfb7" +checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c" dependencies = [ - "proc-macro-hack", "proc-macro2", "quote", "syn", @@ -2177,15 +2243,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.13" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85754d98985841b7d4f5e8e6fbfa4a4ac847916893ec511a2917ccd8525b8bb3" +checksum = "e3055baccb68d74ff6480350f8d6eb8fcfa3aa11bdc1a1ae3afdd0514617d508" [[package]] name = "futures-task" -version = "0.3.13" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa189ef211c15ee602667a6fcfe1c1fd9e07d42250d2156382820fba33c9df80" +checksum = "6ee7c6485c30167ce4dfb83ac568a849fe53274c831081476ee13e0dce1aad72" [[package]] name = "futures-timer" @@ -2195,9 +2261,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.13" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1812c7ab8aedf8d6f2701a43e1243acdbcc2b36ab26e2ad421eb99ac963d96d1" +checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164" dependencies = [ "futures 0.1.31", "futures-channel", @@ -2209,8 +2275,6 @@ dependencies = [ "memchr", "pin-project-lite 0.2.6", "pin-utils", - "proc-macro-hack", - "proc-macro-nested", "slab", ] @@ -2223,6 +2287,15 @@ dependencies = [ "byteorder", ] +[[package]] +name = "generational-arena" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d3b771574f62d0548cee0ad9057857e9fc25d7a3335f140c84f6acd0bf601" +dependencies = [ + "cfg-if 0.1.10", +] + [[package]] name = "generic-array" version = "0.9.1" @@ -2282,7 +2355,7 @@ dependencies = [ "digest 0.8.1", "dotenv", "env_logger 0.7.1", - "futures 0.3.13", + "futures 0.3.19", "log", "rand 0.7.3", "serde", @@ -2359,7 +2432,7 @@ dependencies = [ "directories", "dotenv", "env_logger 0.7.1", - "futures 0.3.13", + "futures 0.3.19", "lazy_static", "libc", "log", @@ -2573,6 +2646,16 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "humantime-serde" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac34a56cfd4acddb469cc7fff187ed5ac36f498ba085caf8bbc725e3ff474058" +dependencies = [ + "humantime 2.1.0", + "serde", +] + [[package]] name = "hyper" version = "0.13.10" @@ -2628,7 +2711,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f93ec5be69758dfc06b9b29efa9d6e9306e387c85eb362c603912eead2ad98c7" dependencies = [ "bytes 0.5.6", - "futures 0.3.13", + "futures 0.3.19", "http", "hyper 0.13.10", "hyper-tls 0.4.3", @@ -2864,7 +2947,7 @@ version = "16.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a47c4c3ac843f9a4238943f97620619033dadef4b378cd1e8addd170de396b3" dependencies = [ - "futures 0.3.13", + "futures 0.3.19", "log", "serde", "serde_derive", @@ -2911,6 +2994,16 @@ version = "0.2.98" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320cfe77175da3a483efed4bc0adc1968ca050b098ce4f2f1c13a56626128790" +[[package]] +name = "libloading" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "351a32417a12d5f7e82c368a66781e307834dae04c6ce0cd4456d52989229883" +dependencies = [ + "cfg-if 1.0.0", + "winapi 0.3.9", +] + [[package]] name = "libm" version = "0.2.1" @@ -4081,12 +4174,6 @@ version = "0.5.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" -[[package]] -name = "proc-macro-nested" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" - [[package]] name = "proc-macro2" version = "1.0.28" @@ -4558,6 +4645,15 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "repr_offset" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "422498868d83340d45980a20132f29cbdcf8a84bf6e9e95b151a0b47f1288df4" +dependencies = [ + "rustc_version", +] + [[package]] name = "reqwest" version = "0.10.10" @@ -5374,7 +5470,7 @@ checksum = "b5c71ed3d54db0a699f4948e1bb3e45b450fa31fe602621dee6680361d569c88" dependencies = [ "base64 0.12.3", "bytes 0.5.6", - "futures 0.3.13", + "futures 0.3.19", "httparse", "log", "rand 0.7.3", @@ -6104,7 +6200,7 @@ dependencies = [ "backtrace", "cfg-if 1.0.0", "enum-as-inner", - "futures 0.3.13", + "futures 0.3.19", "idna", "lazy_static", "log", @@ -6123,7 +6219,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "710f593b371175db53a26d0b38ed2978fafb9e9e8d3868b1acd753ea18df0ceb" dependencies = [ "cfg-if 0.1.10", - "futures 0.3.13", + "futures 0.3.19", "ipconfig", "lazy_static", "log", @@ -6141,6 +6237,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "typed-arena" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0685c84d5d54d1c26f7d3eb96cd41550adb97baed141a761cf335d3d33bcd0ae" + [[package]] name = "typed-headers" version = "0.2.0" @@ -6481,7 +6583,7 @@ dependencies = [ "derive_more", "ethabi", "ethereum-types", - "futures 0.3.13", + "futures 0.3.19", "futures-timer", "hex", "hyper 0.13.10", @@ -6626,7 +6728,7 @@ dependencies = [ "diesel", "diesel_migrations", "env_logger 0.7.1", - "futures 0.3.13", + "futures 0.3.19", "hex", "lazy_static", "libsqlite3-sys", @@ -6652,6 +6754,18 @@ dependencies = [ "ya-service-bus", ] +[[package]] +name = "ya-agreement-utils" +version = "0.2.0" +dependencies = [ + "chrono", + "serde", + "serde_json", + "serde_yaml", + "thiserror", + "ya-client-model", +] + [[package]] name = "ya-agreement-utils" version = "0.2.1" @@ -6665,6 +6779,24 @@ dependencies = [ "ya-client-model", ] +[[package]] +name = "ya-builtin-negotiators" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "humantime 2.1.0", + "humantime-serde", + "log", + "log-derive", + "serde", + "serde_json", + "serde_yaml", + "ya-agreement-utils 0.2.0", + "ya-client-model", + "ya-negotiator-component", +] + [[package]] name = "ya-client" version = "0.5.3" @@ -6675,7 +6807,7 @@ dependencies = [ "bytes 0.5.6", "chrono", "envy", - "futures 0.3.13", + "futures 0.3.19", "heck", "hex", "log", @@ -6759,7 +6891,7 @@ dependencies = [ "anyhow", "bigdecimal 0.2.0", "chrono", - "futures 0.3.13", + "futures 0.3.19", "log", "maplit", "serde_json", @@ -6787,7 +6919,7 @@ dependencies = [ "env_logger 0.7.1", "ethabi", "ethereum-types", - "futures 0.3.13", + "futures 0.3.19", "hex", "lazy_static", "log", @@ -6827,7 +6959,7 @@ dependencies = [ "dotenv", "env_logger 0.7.1", "flexi_logger 0.19.4", - "futures 0.3.13", + "futures 0.3.19", "graphene-sgx", "hex", "ipnet", @@ -6853,7 +6985,7 @@ dependencies = [ "tokio-util 0.3.1", "url", "winapi 0.3.9", - "ya-agreement-utils", + "ya-agreement-utils 0.2.1", "ya-client-model", "ya-compile-time-utils", "ya-core-model", @@ -6894,7 +7026,7 @@ dependencies = [ "dotenv", "env_logger 0.7.1", "ethsign 0.8.0", - "futures 0.3.13", + "futures 0.3.19", "log", "promptly", "r2d2", @@ -6936,7 +7068,7 @@ dependencies = [ "diesel_migrations", "digest 0.8.1", "env_logger 0.7.1", - "futures 0.3.13", + "futures 0.3.19", "humantime 2.1.0", "lazy_static", "libsqlite3-sys", @@ -6957,7 +7089,7 @@ dependencies = [ "thiserror", "tokio 0.2.25", "uuid", - "ya-agreement-utils", + "ya-agreement-utils 0.2.1", "ya-client", "ya-core-model", "ya-diesel-utils", @@ -6986,7 +7118,7 @@ dependencies = [ "semver 0.11.0", "serde_json", "thiserror", - "ya-agreement-utils", + "ya-agreement-utils 0.2.1", ] [[package]] @@ -6996,7 +7128,7 @@ dependencies = [ "actix-web", "anyhow", "bigdecimal 0.2.0", - "futures 0.3.13", + "futures 0.3.19", "lazy_static", "log", "metrics 0.12.1", @@ -7012,6 +7144,62 @@ dependencies = [ "ya-service-bus", ] +[[package]] +name = "ya-negotiator-component" +version = "0.1.0" +dependencies = [ + "anyhow", + "derive_more", + "lazy_static", + "log", + "log-derive", + "serde", + "serde_json", + "serde_yaml", + "ya-agreement-utils 0.2.0", + "ya-client-model", +] + +[[package]] +name = "ya-negotiator-shared-lib-interface" +version = "0.1.0" +dependencies = [ + "abi_stable", + "anyhow", + "lazy_static", + "serde", + "serde_json", + "serde_yaml", + "thiserror", + "ya-agreement-utils 0.2.0", + "ya-client-model", + "ya-negotiator-component", +] + +[[package]] +name = "ya-negotiators" +version = "0.1.0" +dependencies = [ + "actix", + "actix-rt", + "actix_derive", + "anyhow", + "derive_more", + "futures 0.3.19", + "humantime-serde", + "log", + "serde", + "serde_json", + "serde_yaml", + "thiserror", + "tokio 0.2.25", + "ya-agreement-utils 0.2.0", + "ya-builtin-negotiators", + "ya-client-model", + "ya-negotiator-component", + "ya-negotiator-shared-lib-interface", +] + [[package]] name = "ya-net" version = "0.2.0" @@ -7021,7 +7209,7 @@ dependencies = [ "bytes 0.5.6", "env_logger 0.7.1", "ethsign 0.8.0", - "futures 0.3.13", + "futures 0.3.19", "humantime 2.1.0", "lazy_static", "log", @@ -7063,7 +7251,7 @@ dependencies = [ "dotenv", "env_logger 0.7.1", "ethsign 0.7.3", - "futures 0.3.13", + "futures 0.3.19", "hex", "humantime 2.1.0", "lazy_static", @@ -7079,7 +7267,7 @@ dependencies = [ "thiserror", "tokio 0.2.25", "uuid", - "ya-agreement-utils", + "ya-agreement-utils 0.2.1", "ya-client", "ya-client-model", "ya-core-model", @@ -7109,7 +7297,7 @@ dependencies = [ "diesel_migrations", "ethereum-types", "ethsign 0.8.0", - "futures 0.3.13", + "futures 0.3.19", "hex", "log", "num-bigint 0.3.2", @@ -7165,9 +7353,10 @@ dependencies = [ "dialoguer", "directories", "dotenv", - "futures 0.3.13", + "futures 0.3.19", "futures-util", "humantime 2.1.0", + "humantime-serde", "lazy_static", "libc", "log", @@ -7179,6 +7368,7 @@ dependencies = [ "semver 0.11.0", "serde", "serde_json", + "serde_yaml", "shared_child", "signal-hook", "structopt", @@ -7190,12 +7380,13 @@ dependencies = [ "url", "walkdir", "winapi 0.3.9", - "ya-agreement-utils", + "ya-agreement-utils 0.2.0", "ya-client", "ya-client-model", "ya-compile-time-utils", "ya-core-model", "ya-file-logging", + "ya-negotiators", "ya-std-utils", "ya-utils-actix", "ya-utils-path", @@ -7212,7 +7403,7 @@ dependencies = [ "chrono", "derive_more", "env_logger 0.8.4", - "futures 0.3.13", + "futures 0.3.19", "log", "tokio 0.2.25", "url", @@ -7231,7 +7422,7 @@ dependencies = [ "chrono", "digest 0.9.0", "ethsign 0.8.0", - "futures 0.3.13", + "futures 0.3.19", "hex", "lazy_static", "log", @@ -7257,7 +7448,7 @@ dependencies = [ "anyhow", "bytes 0.5.6", "derive_more", - "futures 0.3.13", + "futures 0.3.19", "prost", "prost-build", "rand 0.8.4", @@ -7272,7 +7463,7 @@ version = "0.1.0" source = "git+https://github.com/golemfactory/ya-relay.git?rev=fbb2f4a5589890d2d3e8cef271fe647e0798d26d#fbb2f4a5589890d2d3e8cef271fe647e0798d26d" dependencies = [ "derive_more", - "futures 0.3.13", + "futures 0.3.19", "log", "managed", "rand 0.8.4", @@ -7288,7 +7479,7 @@ dependencies = [ "anyhow", "bytes 0.5.6", "env_logger 0.7.1", - "futures 0.3.13", + "futures 0.3.19", "log", "prost", "prost-build", @@ -7326,7 +7517,7 @@ dependencies = [ "anyhow", "bitflags", "chrono", - "futures 0.3.13", + "futures 0.3.19", "lazy_static", "log", "parking_lot 0.11.1", @@ -7349,7 +7540,7 @@ checksum = "91deadd9d0598335eceb0b3b419f7531ba5aba9c0ca1459bca19beb2e6a473a4" dependencies = [ "actix", "bitflags", - "futures 0.3.13", + "futures 0.3.19", "pin-project 0.4.27", ] @@ -7371,7 +7562,7 @@ dependencies = [ name = "ya-service-api-cache" version = "0.1.0" dependencies = [ - "futures 0.3.13", + "futures 0.3.19", "log", ] @@ -7401,7 +7592,7 @@ version = "0.1.0" dependencies = [ "actix-web", "anyhow", - "futures 0.3.13", + "futures 0.3.19", ] [[package]] @@ -7415,7 +7606,7 @@ dependencies = [ "anyhow", "awc", "env_logger 0.7.1", - "futures 0.3.13", + "futures 0.3.19", "log", "serde", "structopt", @@ -7440,7 +7631,7 @@ checksum = "75b587455c11a7bbd9ec2a69d196c1d199d6e54696a923dba48a690e90a74c84" dependencies = [ "actix", "flexbuffers", - "futures 0.3.13", + "futures 0.3.19", "lazy_static", "log", "rand 0.7.3", @@ -7487,7 +7678,7 @@ dependencies = [ "awc", "bytes 0.5.6", "env_logger 0.7.1", - "futures 0.3.13", + "futures 0.3.19", "gftp", "globset", "h2 0.2.7", @@ -7523,7 +7714,7 @@ dependencies = [ "actix-rt", "anyhow", "chrono", - "futures 0.3.13", + "futures 0.3.19", "log", "tokio 0.2.25", ] @@ -7532,7 +7723,7 @@ dependencies = [ name = "ya-utils-futures" version = "0.1.0" dependencies = [ - "futures 0.3.13", + "futures 0.3.19", "tokio 0.2.25", ] @@ -7541,7 +7732,7 @@ name = "ya-utils-networking" version = "0.1.1" dependencies = [ "anyhow", - "futures 0.3.13", + "futures 0.3.19", "ipnet", "log", "thiserror", @@ -7566,7 +7757,7 @@ dependencies = [ "anyhow", "derive_more", "fs2", - "futures 0.3.13", + "futures 0.3.19", "futures-util", "libc", "nix 0.22.0", @@ -7618,7 +7809,7 @@ dependencies = [ "anyhow", "bytes 0.5.6", "env_logger 0.7.1", - "futures 0.3.13", + "futures 0.3.19", "hex", "ipnet", "lazy_static", @@ -7659,7 +7850,7 @@ dependencies = [ "chrono", "dotenv", "env_logger 0.7.1", - "futures 0.3.13", + "futures 0.3.19", "hex", "lazy_static", "log", @@ -7693,7 +7884,7 @@ dependencies = [ "chrono", "directories", "dotenv", - "futures 0.3.13", + "futures 0.3.19", "gftp", "lazy_static", "log", @@ -7920,7 +8111,7 @@ source = "git+https://github.com/matter-labs/zksync?rev=0e28e238f71b3be128e4a760 dependencies = [ "anyhow", "bigdecimal 0.2.0", - "futures 0.3.13", + "futures 0.3.19", "hex", "num", "serde", diff --git a/agent/provider/Cargo.toml b/agent/provider/Cargo.toml index 5dfc19872d..75ac48bb2a 100644 --- a/agent/provider/Cargo.toml +++ b/agent/provider/Cargo.toml @@ -13,7 +13,6 @@ name = "ya-provider" path = "src/main.rs" [dependencies] -ya-agreement-utils = { version = "^0.2"} ya-client = { version = "0.5", features = ['cli'] } ya-client-model = "0.3" ya-compile-time-utils = "0.2" @@ -23,6 +22,8 @@ ya-utils-actix = "0.1" ya-utils-path = "0.1" ya-utils-process = { version = "0.1", features = ['lock'] } ya-std-utils = "0.1" +ya-negotiators = { version = "0.1", path = "../../../../experiments/ya-negotiators"} +ya-agreement-utils = { version = "0.2", path = "../../../../experiments/ya-negotiators/agreement-utils"} actix = { version = "0.10", default-features = false } actix-rt = "1.1.1" @@ -39,6 +40,7 @@ dotenv = "0.15.0" futures = "0.3" futures-util = "0.3.4" humantime = "2.0.0" +humantime-serde = "1" lazy_static = "1.4.0" libc = "0.2" log = "0.4.8" @@ -49,6 +51,7 @@ path-clean = "0.1.0" semver = { version = "0.11", features = ["serde"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +serde_yaml = "0.8" shared_child = "0.3.4" signal-hook = "0.1.13" structopt = "0.3.20" diff --git a/agent/provider/src/execution/task_runner.rs b/agent/provider/src/execution/task_runner.rs index 91567e06e8..f841f25999 100644 --- a/agent/provider/src/execution/task_runner.rs +++ b/agent/provider/src/execution/task_runner.rs @@ -384,7 +384,7 @@ impl TaskRunner { log::debug!("[TaskRunner] Got new Agreement: {}", msg.agreement); // Agreement waits for first create activity event. - let agreement_id = msg.agreement.agreement_id.clone(); + let agreement_id = msg.agreement.id.clone(); self.active_agreements.insert(agreement_id, msg.agreement); Ok(()) } diff --git a/agent/provider/src/market/config.rs b/agent/provider/src/market/config.rs index e100dfccd2..4ae3540b2f 100644 --- a/agent/provider/src/market/config.rs +++ b/agent/provider/src/market/config.rs @@ -1,6 +1,12 @@ +use std::path::PathBuf; use structopt::StructOpt; -use crate::market::negotiator::factory::NegotiatorsConfig; +use crate::startup_config::DEFAULT_DATA_DIR; +use crate::startup_config::DEFAULT_PLUGINS_DIR; + +lazy_static::lazy_static! { + pub static ref DEFAULT_NEGOTIATORS_WORKDIR_DIR: PathBuf = default_negotiators_workdir(); +} /// Configuration for ProviderMarket actor. #[derive(StructOpt, Clone, Debug)] @@ -11,10 +17,20 @@ pub struct MarketConfig { pub negotiation_events_interval: f32, #[structopt(long, env, default_value = "10.0")] pub agreement_approve_timeout: f32, - #[structopt(long, env, default_value = "Composite")] - pub negotiator_type: String, - #[structopt(flatten)] - pub negotiator_config: NegotiatorsConfig, #[structopt(skip = "you-forgot-to-set-session-id")] pub session_id: String, + /// Relative to Provider DataDir + #[structopt(long, env, default_value = "negotiations")] + pub negotiators_workdir: String, + /// Uses ExeUnit plugins directory by default + #[structopt( + long, + default_value_os = DEFAULT_PLUGINS_DIR.as_ref(), + required = false, + )] + pub negotiators_plugins: PathBuf, +} + +fn default_negotiators_workdir() -> PathBuf { + PathBuf::from(&*DEFAULT_DATA_DIR).join("negotiations") } diff --git a/agent/provider/src/market/negotiator.rs b/agent/provider/src/market/negotiator.rs index 70c72c885c..b7bcb0ba6c 100644 --- a/agent/provider/src/market/negotiator.rs +++ b/agent/provider/src/market/negotiator.rs @@ -1,15 +1,4 @@ -mod accept_all; pub mod builtin; mod common; -mod component; -mod composite; -pub mod factory; -pub use accept_all::AcceptAllNegotiator; -pub use composite::CompositeNegotiator; - -pub use common::{ - AgreementResponse, AgreementResult, Negotiator, NegotiatorAddr, ProposalResponse, -}; - -pub use component::{NegotiationResult, NegotiatorComponent, NegotiatorsPack, ProposalView}; +pub use common::AgreementResult; diff --git a/agent/provider/src/market/negotiator/accept_all.rs b/agent/provider/src/market/negotiator/accept_all.rs deleted file mode 100644 index 64ccd9be9c..0000000000 --- a/agent/provider/src/market/negotiator/accept_all.rs +++ /dev/null @@ -1,55 +0,0 @@ -use actix::{Actor, Context, Handler}; - -use ya_client::model::market::NewOffer; - -use super::common::offer_definition_to_offer; -use super::common::{AgreementResponse, Negotiator, ProposalResponse}; -use crate::market::negotiator::common::{ - AgreementFinalized, CreateOffer, ReactToAgreement, ReactToProposal, -}; - -#[derive(Debug)] -pub struct AcceptAllNegotiator; - -impl AcceptAllNegotiator { - pub fn new() -> AcceptAllNegotiator { - AcceptAllNegotiator {} - } -} - -impl Handler for AcceptAllNegotiator { - type Result = anyhow::Result; - - fn handle(&mut self, msg: CreateOffer, _: &mut Context) -> Self::Result { - Ok(offer_definition_to_offer(msg.offer_definition.clone())) - } -} - -impl Handler for AcceptAllNegotiator { - type Result = anyhow::Result; - - fn handle(&mut self, _: ReactToProposal, _: &mut Context) -> Self::Result { - Ok(ProposalResponse::AcceptProposal) - } -} - -impl Handler for AcceptAllNegotiator { - type Result = anyhow::Result; - - fn handle(&mut self, _: ReactToAgreement, _: &mut Context) -> Self::Result { - Ok(AgreementResponse::ApproveAgreement) - } -} - -impl Handler for AcceptAllNegotiator { - type Result = anyhow::Result<()>; - - fn handle(&mut self, _: AgreementFinalized, _: &mut Context) -> Self::Result { - Ok(()) - } -} - -impl Negotiator for AcceptAllNegotiator {} -impl Actor for AcceptAllNegotiator { - type Context = Context; -} diff --git a/agent/provider/src/market/negotiator/builtin.rs b/agent/provider/src/market/negotiator/builtin.rs index fd514deae7..9f7fcc44bb 100644 --- a/agent/provider/src/market/negotiator/builtin.rs +++ b/agent/provider/src/market/negotiator/builtin.rs @@ -3,3 +3,23 @@ pub mod max_agreements; pub use expiration::LimitExpiration; pub use max_agreements::MaxAgreements; + +use ya_negotiators::component::register_negotiator; +use ya_negotiators::NegotiatorComponent; + +pub fn register_negotiators() { + register_negotiator( + "ya-provider", + "LimitExpiration", + Box::new(|config, _| { + Ok(Box::new(LimitExpiration::new(config)?) as Box) + }), + ); + register_negotiator( + "ya-provider", + "LimitAgreements", + Box::new(|config, _| { + Ok(Box::new(MaxAgreements::new(config)?) as Box) + }), + ); +} diff --git a/agent/provider/src/market/negotiator/builtin/expiration.rs b/agent/provider/src/market/negotiator/builtin/expiration.rs index c91dec6d85..01cc448c5a 100644 --- a/agent/provider/src/market/negotiator/builtin/expiration.rs +++ b/agent/provider/src/market/negotiator/builtin/expiration.rs @@ -1,13 +1,15 @@ use anyhow::{anyhow, Result}; use chrono::{DateTime, Duration, TimeZone, Utc}; +use humantime; +use serde::{Deserialize, Serialize}; +use structopt::StructOpt; -use ya_agreement_utils::{Error, OfferDefinition}; +use ya_agreement_utils::{Error, OfferTemplate, ProposalView}; +use ya_negotiators::component::{RejectReason, Score}; +use ya_negotiators::factory::{LoadMode, NegotiatorConfig}; +use ya_negotiators::{NegotiationResult, NegotiatorComponent}; use crate::display::EnableDisplay; -use crate::market::negotiator::factory::AgreementExpirationNegotiatorConfig; -use crate::market::negotiator::{ - AgreementResult, NegotiationResult, NegotiatorComponent, ProposalView, -}; /// Negotiator that can reject Requestors, that request too long Agreement /// expiration time. Expiration limit can be different in case, when Requestor @@ -37,8 +39,26 @@ pub static DEBIT_NOTE_ACCEPT_TIMEOUT_PROPERTY_FLAT: &'static str = #[allow(dead_code)] pub static AGREEMENT_EXPIRATION_PROPERTY_FLAT: &'static str = "golem.srv.comp.expiration"; +/// Configuration for LimitAgreements Negotiator. +#[derive(StructOpt, Clone, Debug, Serialize, Deserialize)] +pub struct Config { + #[serde(with = "humantime_serde")] + #[structopt(long, env, parse(try_from_str = humantime::parse_duration), default_value = "5min")] + pub min_agreement_expiration: std::time::Duration, + #[serde(with = "humantime_serde")] + #[structopt(long, env, parse(try_from_str = humantime::parse_duration), default_value = "10h")] + pub max_agreement_expiration: std::time::Duration, + #[serde(with = "humantime_serde")] + #[structopt(long, env, parse(try_from_str = humantime::parse_duration), default_value = "30min")] + pub max_agreement_expiration_without_deadline: std::time::Duration, + #[serde(with = "humantime_serde")] + #[structopt(long, env, parse(try_from_str = humantime::parse_duration), default_value = "4min")] + pub debit_note_acceptance_deadline: std::time::Duration, +} + impl LimitExpiration { - pub fn new(config: &AgreementExpirationNegotiatorConfig) -> anyhow::Result { + pub fn new(config: serde_yaml::Value) -> anyhow::Result { + let config: Config = serde_yaml::from_value(config)?; let component = LimitExpiration { min_expiration: chrono::Duration::from_std(config.min_agreement_expiration)?, max_expiration: chrono::Duration::from_std(config.max_agreement_expiration)?, @@ -84,12 +104,13 @@ fn debit_deadline_from(proposal: &ProposalView) -> Result> { impl NegotiatorComponent for LimitExpiration { fn negotiate_step( &mut self, - demand: &ProposalView, - mut offer: ProposalView, + their: &ProposalView, + mut ours: ProposalView, + score: Score, ) -> anyhow::Result { - let req_deadline = debit_deadline_from(demand)?; - let our_deadline = debit_deadline_from(&offer)?; - let req_expiration = proposal_expiration_from(&demand)?; + let req_deadline = debit_deadline_from(their)?; + let our_deadline = debit_deadline_from(&ours)?; + let req_expiration = proposal_expiration_from(&their)?; // Let's check if Requestor is able to accept DebitNotes. let max_expiration_delta = match &req_deadline { @@ -106,16 +127,16 @@ impl NegotiatorComponent for LimitExpiration { if too_soon || too_late { log::info!( "Negotiator: Reject proposal [{}] due to expiration limits.", - demand.agreement_id + their.id ); return Ok(NegotiationResult::Reject { - message: format!( + reason: RejectReason::new(format!( "Proposal expires at: {} which is less than {} or more than {} from now", req_expiration, self.min_expiration.display(), max_expiration_delta.display() - ), + )), is_final: too_late, // when it's too soon we could try later }); }; @@ -127,68 +148,83 @@ impl NegotiatorComponent for LimitExpiration { (Some(req_deadline), Some(our_deadline)) => { if req_deadline > our_deadline { NegotiationResult::Reject { - message: format!( + reason: RejectReason::new(format!( "DebitNote acceptance deadline should be less than {}.", self.payment_deadline.display() - ), + )), is_final: true, } } else if req_deadline == our_deadline { // We agree with Requestor to the same deadline. - NegotiationResult::Ready { offer } + NegotiationResult::Ready { + proposal: ours, + score, + } } else { // Below certain timeout it is impossible for Requestor to accept DebitNotes. if req_deadline.num_seconds() < self.min_deadline { return Ok(NegotiationResult::Reject { - message: format!( + reason: RejectReason::new(format!( "To low DebitNotes timeout: {}", req_deadline.display() - ), + )), is_final: true, }); } // Requestor proposed better deadline, than we required. // We are expected to set property to the same value if we agree. - let deadline_prop = offer + let deadline_prop = ours .pointer_mut(DEBIT_NOTE_ACCEPT_TIMEOUT_PROPERTY) .unwrap(); *deadline_prop = serde_json::Value::Number(req_deadline.num_seconds().into()); // Since we changed our proposal, we can't return `Ready`. - NegotiationResult::Negotiating { offer } + NegotiationResult::Negotiating { + proposal: ours, + score, + } } } // Requestor doesn't support DebitNotes acceptance, so we should // remove our property from Proposal to match with his. (None, Some(_)) => { - offer.remove_property(DEBIT_NOTE_ACCEPT_TIMEOUT_PROPERTY)?; - NegotiationResult::Negotiating { offer } + ours.remove_property(DEBIT_NOTE_ACCEPT_TIMEOUT_PROPERTY)?; + NegotiationResult::Negotiating { + proposal: ours, + score, + } } // We agree with Requestor, that he won't accept DebitNotes. - (None, None) => NegotiationResult::Ready { offer }, + (None, None) => NegotiationResult::Ready { + proposal: ours, + score, + }, _ => return Err(anyhow!("Shouldn't be in this state.")), }) } - fn fill_template(&mut self, mut template: OfferDefinition) -> anyhow::Result { - template.offer.set_property( + fn fill_template(&mut self, mut template: OfferTemplate) -> anyhow::Result { + template.set_property( DEBIT_NOTE_ACCEPT_TIMEOUT_PROPERTY_FLAT, serde_json::Value::Number(self.payment_deadline.num_seconds().into()), ); Ok(template) } +} - fn on_agreement_terminated( - &mut self, - _agreement_id: &str, - _result: &AgreementResult, - ) -> anyhow::Result<()> { - Ok(()) - } - - fn on_agreement_approved(&mut self, _agreement_id: &str) -> anyhow::Result<()> { - Ok(()) +impl Config { + pub fn from_env() -> anyhow::Result { + // Empty command line arguments, because we want to use ENV fallback + // or default values if ENV variables are not set. + let config = Config::from_iter_safe(&[""])?; + Ok(NegotiatorConfig { + name: "LimitExpiration".to_string(), + load_mode: LoadMode::StaticLib { + library: "ya-provider".to_string(), + }, + params: serde_yaml::to_value(&config)?, + }) } } @@ -196,42 +232,57 @@ impl NegotiatorComponent for LimitExpiration { mod test_expiration_negotiator { use super::*; use ya_agreement_utils::agreement::expand; - use ya_agreement_utils::{InfNodeInfo, NodeInfo, OfferTemplate, ServiceInfo}; + use ya_agreement_utils::{InfNodeInfo, NodeInfo, OfferDefinition, OfferTemplate, ServiceInfo}; + use ya_client_model::market::proposal::State; - fn expiration_config() -> AgreementExpirationNegotiatorConfig { - AgreementExpirationNegotiatorConfig { + fn expiration_config() -> serde_yaml::Value { + serde_yaml::to_value(&Config { min_agreement_expiration: std::time::Duration::from_secs(5 * 60), max_agreement_expiration: std::time::Duration::from_secs(30 * 60), max_agreement_expiration_without_deadline: std::time::Duration::from_secs(10 * 60), debit_note_acceptance_deadline: std::time::Duration::from_secs(120), - } + }) + .unwrap() } fn properties_to_proposal(value: serde_json::Value) -> ProposalView { ProposalView { - agreement_id: "2332850934yer".to_string(), - json: expand(value), + content: OfferTemplate { + properties: expand(value), + constraints: "()".to_string(), + }, + id: "2332850934yer".to_string(), + issuer: Default::default(), + state: State::Initial, + timestamp: Utc::now(), } } - fn example_offer() -> OfferDefinition { + fn example_offer() -> OfferTemplate { OfferDefinition { node_info: NodeInfo::with_name("nanana"), srv_info: ServiceInfo::new(InfNodeInfo::default(), serde_json::Value::Null), com_info: Default::default(), offer: OfferTemplate::default(), } + .into_template() } trait ToProposal { fn to_proposal(self) -> ProposalView; } - impl ToProposal for OfferDefinition { + impl ToProposal for OfferTemplate { fn to_proposal(self) -> ProposalView { ProposalView { - agreement_id: "sagdshgdfgd".to_string(), - json: expand(self.into_json()), + content: OfferTemplate { + properties: expand(self.properties), + constraints: self.constraints, + }, + id: "sagdshgdfgd".to_string(), + issuer: Default::default(), + state: State::Initial, + timestamp: Utc::now(), } } } @@ -242,7 +293,7 @@ mod test_expiration_negotiator { #[test] fn test_lower_deadline() { let config = expiration_config(); - let mut negotiator = LimitExpiration::new(&config).unwrap(); + let mut negotiator = LimitExpiration::new(config).unwrap(); let offer_proposal = negotiator .fill_template(example_offer()) @@ -255,11 +306,13 @@ mod test_expiration_negotiator { })); match negotiator - .negotiate_step(&proposal, offer_proposal) + .negotiate_step(&proposal, offer_proposal, Score::default()) .unwrap() { // Negotiator is expected to take better proposal and change adjust property. - NegotiationResult::Negotiating { offer } => { + NegotiationResult::Negotiating { + proposal: offer, .. + } => { assert_eq!( debit_deadline_from(&offer).unwrap().unwrap(), Duration::seconds(50) @@ -273,7 +326,7 @@ mod test_expiration_negotiator { #[test] fn test_greater_deadline() { let config = expiration_config(); - let mut negotiator = LimitExpiration::new(&config).unwrap(); + let mut negotiator = LimitExpiration::new(config).unwrap(); let offer_proposal = negotiator .fill_template(example_offer()) @@ -286,11 +339,13 @@ mod test_expiration_negotiator { })); match negotiator - .negotiate_step(&proposal, offer_proposal) + .negotiate_step(&proposal, offer_proposal, Score::default()) .unwrap() { - NegotiationResult::Reject { message, is_final } => { - assert!(message.contains("DebitNote acceptance deadline should be less than")); + NegotiationResult::Reject { reason, is_final } => { + assert!(reason + .message + .contains("DebitNote acceptance deadline should be less than")); assert!(is_final) } result => panic!("Expected NegotiationResult::Reject. Got: {:?}", result), @@ -302,7 +357,7 @@ mod test_expiration_negotiator { #[test] fn test_equal_deadline() { let config = expiration_config(); - let mut negotiator = LimitExpiration::new(&config).unwrap(); + let mut negotiator = LimitExpiration::new(config).unwrap(); let offer_proposal = negotiator .fill_template(example_offer()) @@ -315,10 +370,12 @@ mod test_expiration_negotiator { })); match negotiator - .negotiate_step(&proposal, offer_proposal) + .negotiate_step(&proposal, offer_proposal, Score::default()) .unwrap() { - NegotiationResult::Ready { offer } => { + NegotiationResult::Ready { + proposal: offer, .. + } => { assert_eq!( debit_deadline_from(&offer).unwrap().unwrap(), Duration::seconds(120) @@ -335,7 +392,7 @@ mod test_expiration_negotiator { #[test] fn test_requestor_doesnt_accept_debit_notes_to_high_expiration() { let config = expiration_config(); - let mut negotiator = LimitExpiration::new(&config).unwrap(); + let mut negotiator = LimitExpiration::new(config).unwrap(); let offer_proposal = negotiator .fill_template(example_offer()) @@ -347,11 +404,11 @@ mod test_expiration_negotiator { })); match negotiator - .negotiate_step(&proposal, offer_proposal) + .negotiate_step(&proposal, offer_proposal, Score::default()) .unwrap() { - NegotiationResult::Reject { message, is_final } => { - assert!(message.contains("Proposal expires at")); + NegotiationResult::Reject { reason, is_final } => { + assert!(reason.message.contains("Proposal expires at")); assert!(!is_final) } result => panic!("Expected NegotiationResult::Reject. Got: {:?}", result), @@ -365,7 +422,7 @@ mod test_expiration_negotiator { #[test] fn test_requestor_doesnt_accept_debit_notes_expiration_ok() { let config = expiration_config(); - let mut negotiator = LimitExpiration::new(&config).unwrap(); + let mut negotiator = LimitExpiration::new(config).unwrap(); let offer_proposal = negotiator .fill_template(example_offer()) @@ -377,10 +434,12 @@ mod test_expiration_negotiator { })); match negotiator - .negotiate_step(&proposal, offer_proposal) + .negotiate_step(&proposal, offer_proposal, Score::default()) .unwrap() { - NegotiationResult::Negotiating { offer } => { + NegotiationResult::Negotiating { + proposal: offer, .. + } => { assert!(debit_deadline_from(&offer).unwrap().is_none()) } result => panic!("Expected NegotiationResult::Negotiating. Got: {:?}", result), diff --git a/agent/provider/src/market/negotiator/builtin/max_agreements.rs b/agent/provider/src/market/negotiator/builtin/max_agreements.rs index 428722647d..3997d9840c 100644 --- a/agent/provider/src/market/negotiator/builtin/max_agreements.rs +++ b/agent/provider/src/market/negotiator/builtin/max_agreements.rs @@ -1,12 +1,19 @@ use anyhow::bail; +use serde::{Deserialize, Serialize}; use std::collections::HashSet; +use structopt::StructOpt; -use ya_agreement_utils::OfferDefinition; +use ya_agreement_utils::{AgreementView, ProposalView}; +use ya_negotiators::component::{RejectReason, Score}; +use ya_negotiators::factory::{LoadMode, NegotiatorConfig}; +use ya_negotiators::{AgreementResult, NegotiationResult, NegotiatorComponent}; -use crate::market::negotiator::factory::LimitAgreementsNegotiatorConfig; -use crate::market::negotiator::{ - AgreementResult, NegotiationResult, NegotiatorComponent, ProposalView, -}; +/// Configuration for LimitAgreements Negotiator. +#[derive(StructOpt, Clone, Debug, Serialize, Deserialize)] +pub struct Config { + #[structopt(long, env, default_value = "1")] + pub max_simultaneous_agreements: u32, +} /// Negotiator that can limit number of running agreements. pub struct MaxAgreements { @@ -15,11 +22,12 @@ pub struct MaxAgreements { } impl MaxAgreements { - pub fn new(config: &LimitAgreementsNegotiatorConfig) -> MaxAgreements { - MaxAgreements { + pub fn new(config: serde_yaml::Value) -> anyhow::Result { + let config: Config = serde_yaml::from_value(config)?; + Ok(MaxAgreements { max_agreements: config.max_simultaneous_agreements, active_agreements: HashSet::new(), - } + }) } pub fn has_free_slot(&self) -> bool { @@ -32,31 +40,28 @@ impl NegotiatorComponent for MaxAgreements { &mut self, demand: &ProposalView, offer: ProposalView, + score: Score, ) -> anyhow::Result { if self.has_free_slot() { - Ok(NegotiationResult::Ready { offer }) + Ok(NegotiationResult::Ready { + proposal: offer, + score, + }) } else { log::info!( "'MaxAgreements' negotiator: Reject proposal [{}] due to limit.", - demand.agreement_id, // TODO: Should be just `id`, but I reuse AgreementView struct. + demand.id, ); Ok(NegotiationResult::Reject { - message: format!( + reason: RejectReason::new(format!( "No capacity available. Reached Agreements limit: {}", self.max_agreements - ), + )), is_final: false, }) } } - fn fill_template( - &mut self, - offer_template: OfferDefinition, - ) -> anyhow::Result { - Ok(offer_template) - } - fn on_agreement_terminated( &mut self, agreement_id: &str, @@ -69,16 +74,31 @@ impl NegotiatorComponent for MaxAgreements { Ok(()) } - fn on_agreement_approved(&mut self, agreement_id: &str) -> anyhow::Result<()> { + fn on_agreement_approved(&mut self, agreement: &AgreementView) -> anyhow::Result<()> { if self.has_free_slot() { - self.active_agreements.insert(agreement_id.to_string()); + self.active_agreements.insert(agreement.id.to_string()); Ok(()) } else { - self.active_agreements.insert(agreement_id.to_string()); + self.active_agreements.insert(agreement.id.to_string()); bail!( "Agreement [{}] approved despite not available capacity.", - agreement_id + agreement.id ) } } } + +impl Config { + pub fn from_env() -> anyhow::Result { + // Empty command line arguments, because we want to use ENV fallback + // or default values if ENV variables are not set. + let config = Config::from_iter_safe(&[""])?; + Ok(NegotiatorConfig { + name: "LimitAgreements".to_string(), + load_mode: LoadMode::StaticLib { + library: "ya-provider".to_string(), + }, + params: serde_yaml::to_value(&config)?, + }) + } +} diff --git a/agent/provider/src/market/negotiator/common.rs b/agent/provider/src/market/negotiator/common.rs index 809621c18c..ef647dfe6d 100644 --- a/agent/provider/src/market/negotiator/common.rs +++ b/agent/provider/src/market/negotiator/common.rs @@ -1,45 +1,5 @@ -use actix::prelude::*; -use actix::{Actor, Handler}; -use anyhow::Result; -use derive_more::Display; - -use ya_agreement_utils::{AgreementView, OfferDefinition}; -use ya_client::model::market::Reason; -use ya_client::model::market::{NewOffer, Proposal}; - -use crate::display::EnableDisplay; use crate::market::termination_reason::BreakReason; -/// Response for requestor proposals. -#[derive(Debug, Display)] -#[allow(dead_code)] -pub enum ProposalResponse { - #[display(fmt = "CounterProposal")] - CounterProposal { - offer: NewOffer, - }, - AcceptProposal, - #[display(fmt = "RejectProposal (reason: {})", "reason.display()")] - RejectProposal { - reason: Option, - is_final: bool, - }, - ///< Don't send any message to requestor. Could be useful to wait for other offers. - IgnoreProposal, -} - -/// Response for requestor agreements. -#[derive(Debug, Display)] -#[allow(dead_code)] -pub enum AgreementResponse { - ApproveAgreement, - #[display(fmt = "RejectAgreement (reason: {})", "reason.display()")] - RejectAgreement { - reason: Option, - is_final: bool, - }, -} - /// Result of agreement execution. #[derive(Clone)] pub enum AgreementResult { @@ -52,176 +12,3 @@ pub enum AgreementResult { /// Agreement was broken by us. Broken { reason: BreakReason }, } - -// =========================================== // -// Negotiator interface -// =========================================== // - -/// Negotiator can modify offer, that was generated for him. He can save -/// information about this offer, that are necessary for negotiations. -#[derive(Message)] -#[rtype(result = "Result")] -pub struct CreateOffer { - pub offer_definition: OfferDefinition, -} - -/// Reactions to events from market. These function make market decisions -/// related to incoming Proposals. -#[derive(Message)] -#[rtype(result = "Result")] -pub struct ReactToProposal { - pub prev_proposal: Proposal, - pub demand: Proposal, -} - -/// Reactions to events from market. These function make market decisions -/// related to incoming Agreements. -#[derive(Message)] -#[rtype(result = "Result")] -pub struct ReactToAgreement { - pub agreement: AgreementView, -} - -/// Agreement finished notifications. Negotiator can adjust his strategy based on it. -#[derive(Message)] -#[rtype(result = "Result<()>")] -pub struct AgreementFinalized { - pub agreement_id: String, - pub result: AgreementResult, -} - -/// Actor implementing Negotiation logic. -/// -/// Direction: -/// - Negotiator should asynchronously generate negotiation decisions instead -/// of returning them as direct response to incoming events. This would allow use -/// to implement time dependent logic like: Collect Proposals during `n` seconds -/// and choose the best from them. -/// - Extensibility: we expect, that developers will implement different market strategies. -/// In best case they should be able to do this without modifying `ya-provider` code. -/// This mean we should implement plugin-like system to communicate with external applications/code. -/// - Multiple negotiating plugins cooperating with each other. Note that introducing new features to -/// Agreement specification requires implementing separate negotiation logic. In this case we -/// can end up with explosion of combination to implement. What worse, we will force external -/// developers to adjust their logic to new Agreement features each time, when they appear. -/// To avoid this we should design internal interfaces, which will allow to combine multiple logics -/// as plugable components. -pub trait Negotiator: - Actor - + Handler::Result> - + Handler::Result> - + Handler::Result> - + Handler::Result> -{ -} - -pub fn offer_definition_to_offer(offer_def: OfferDefinition) -> NewOffer { - let constraints = offer_def.offer.constraints.clone(); - NewOffer::new(offer_def.into_json(), constraints) -} - -#[derive(Clone)] -pub struct NegotiatorAddr { - pub on_create: Recipient, - pub on_finalized: Recipient, - pub on_proposal: Recipient, - pub on_agreement: Recipient, -} - -impl NegotiatorAddr { - pub async fn create_offer(&self, offer_definition: &OfferDefinition) -> Result { - self.on_create - .send(CreateOffer { - offer_definition: offer_definition.clone(), - }) - .await? - } - - pub async fn react_to_proposal( - &self, - prev_proposal: Proposal, - demand: Proposal, - ) -> Result { - self.on_proposal - .send(ReactToProposal { - demand, - prev_proposal, - }) - .await? - } - - pub async fn react_to_agreement( - &self, - agreement_view: &AgreementView, - ) -> Result { - self.on_agreement - .send(ReactToAgreement { - agreement: agreement_view.clone(), - }) - .await? - } - - pub async fn agreement_finalized( - &self, - agreement_id: &str, - result: AgreementResult, - ) -> Result<()> { - self.on_finalized - .send(AgreementFinalized { - agreement_id: agreement_id.to_string(), - result, - }) - .await? - } - - pub fn from>>(negotiator: T) -> NegotiatorAddr { - let addr = negotiator.start(); - NegotiatorAddr { - on_create: addr.clone().recipient(), - on_finalized: addr.clone().recipient(), - on_proposal: addr.clone().recipient(), - on_agreement: addr.recipient(), - } - } -} - -pub fn reason_with_extra(message: String, extra: serde_json::Value) -> Reason { - let mut reason = Reason::new(message); - reason.extra = extra; - reason -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_proposal_response_display() { - let reason = ProposalResponse::RejectProposal { - reason: Some("zima".into()), - is_final: false, - }; - let no_reason = ProposalResponse::RejectProposal { - reason: None, - is_final: false, - }; - - assert_eq!(reason.to_string(), "RejectProposal (reason: 'zima')"); - assert_eq!(no_reason.to_string(), "RejectProposal (reason: None)"); - } - - #[test] - fn test_agreement_response_display() { - let reason = AgreementResponse::RejectAgreement { - reason: Some("lato".into()), - is_final: false, - }; - let no_reason = AgreementResponse::RejectAgreement { - reason: None, - is_final: false, - }; - - assert_eq!(reason.to_string(), "RejectAgreement (reason: 'lato')"); - assert_eq!(no_reason.to_string(), "RejectAgreement (reason: None)"); - } -} diff --git a/agent/provider/src/market/negotiator/component.rs b/agent/provider/src/market/negotiator/component.rs deleted file mode 100644 index 1202b3e483..0000000000 --- a/agent/provider/src/market/negotiator/component.rs +++ /dev/null @@ -1,170 +0,0 @@ -use anyhow::anyhow; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; - -use ya_agreement_utils::{AgreementView, OfferDefinition}; - -use crate::market::negotiator::AgreementResult; - -pub type ProposalView = AgreementView; - -/// Result returned by `NegotiatorComponent` during Proposals evaluation. -#[derive(Serialize, Deserialize, Clone, Debug)] -pub enum NegotiationResult { - /// `NegotiatorComponent` fully negotiated his part of Proposal, - /// and it can be turned into valid Agreement. Provider will send - /// counter Proposal. - Ready { offer: ProposalView }, - /// Proposal is not ready to become Agreement, but negotiations - /// are in progress. - Negotiating { offer: ProposalView }, - /// Proposal is not acceptable and should be rejected. - /// Negotiations can't be continued. - Reject { message: String, is_final: bool }, -} - -/// `NegotiatorComponent` implements negotiation logic for part of Agreement -/// specification. Components should be as granular as possible to allow composition -/// with other Components. -/// -/// Future goal is to allow developers to create their own specifications and implement -/// components, that are able to negotiate this specification. -/// It would be useful to have `NegotiatorComponent`, that can be loaded from shared library -/// or can communicate with negotiation logic in external process (maybe RPC or TCP??). -pub trait NegotiatorComponent { - /// Push forward negotiations as far as you can. - /// `NegotiatorComponent` should modify only properties in his responsibility - /// and return remaining part of Proposal unchanged. - fn negotiate_step( - &mut self, - demand: &ProposalView, - offer: ProposalView, - ) -> anyhow::Result; - - /// Called during Offer creation. `NegotiatorComponent` should add properties - /// and constraints for which it is responsible during future negotiations. - fn fill_template(&mut self, offer_template: OfferDefinition) - -> anyhow::Result; - - /// Called when Agreement was finished. `NegotiatorComponent` can use termination - /// result to adjust his future negotiation strategy. - fn on_agreement_terminated( - &mut self, - agreement_id: &str, - result: &AgreementResult, - ) -> anyhow::Result<()>; - - /// Called when Negotiator decided to approve Agreement. It's only notification, - /// `NegotiatorComponent` can't reject Agreement anymore. - fn on_agreement_approved(&mut self, agreement_id: &str) -> anyhow::Result<()>; -} - -pub struct NegotiatorsPack { - components: HashMap>, -} - -impl NegotiatorsPack { - pub fn new() -> NegotiatorsPack { - NegotiatorsPack { - components: HashMap::new(), - } - } - - pub fn add_component( - mut self, - name: &str, - component: Box, - ) -> NegotiatorsPack { - self.components.insert(name.to_string(), component); - self - } -} - -impl NegotiatorComponent for NegotiatorsPack { - fn negotiate_step( - &mut self, - demand: &ProposalView, - mut offer: ProposalView, - ) -> anyhow::Result { - let mut all_ready = true; - for (name, component) in &mut self.components { - let result = component.negotiate_step(demand, offer)?; - offer = match result { - NegotiationResult::Ready { offer } => offer, - NegotiationResult::Negotiating { offer } => { - log::info!( - "Negotiator component '{}' is still negotiating Proposal [{}].", - name, - demand.agreement_id - ); - all_ready = false; - offer - } - NegotiationResult::Reject { message, is_final } => { - return Ok(NegotiationResult::Reject { message, is_final }) - } - } - } - - // Full negotiations is ready only, if all `NegotiatorComponent` returned - // ready state. Otherwise we must still continue negotiations. - Ok(match all_ready { - true => NegotiationResult::Ready { offer }, - false => NegotiationResult::Negotiating { offer }, - }) - } - - fn fill_template( - &mut self, - mut offer_template: OfferDefinition, - ) -> anyhow::Result { - for (name, component) in &mut self.components { - offer_template = component.fill_template(offer_template).map_err(|e| { - anyhow!( - "Negotiator component '{}' failed filling Offer template. {}", - name, - e - ) - })?; - } - Ok(offer_template) - } - - fn on_agreement_terminated( - &mut self, - agreement_id: &str, - result: &AgreementResult, - ) -> anyhow::Result<()> { - for (name, component) in &mut self.components { - component - .on_agreement_terminated(agreement_id, result) - .map_err(|e| { - log::warn!( - "Negotiator component '{}' failed handling Agreement [{}] termination. {}", - name, - agreement_id, - e - ) - }) - .ok(); - } - Ok(()) - } - - fn on_agreement_approved(&mut self, agreement_id: &str) -> anyhow::Result<()> { - for (name, component) in &mut self.components { - component - .on_agreement_approved(agreement_id) - .map_err(|e| { - log::warn!( - "Negotiator component '{}' failed handling Agreement [{}] approval. {}", - name, - agreement_id, - e - ) - }) - .ok(); - } - Ok(()) - } -} diff --git a/agent/provider/src/market/negotiator/composite.rs b/agent/provider/src/market/negotiator/composite.rs deleted file mode 100644 index 05d99c6b84..0000000000 --- a/agent/provider/src/market/negotiator/composite.rs +++ /dev/null @@ -1,176 +0,0 @@ -use actix::{Actor, Addr, Context, Handler}; -use anyhow::anyhow; -use serde_json::Value; - -use ya_agreement_utils::agreement::{expand, flatten_value}; -use ya_agreement_utils::AgreementView; -use ya_client::model::market::NewOffer; - -use super::builtin::{LimitExpiration, MaxAgreements}; -use super::common::{offer_definition_to_offer, AgreementResponse, Negotiator, ProposalResponse}; -use super::{NegotiationResult, NegotiatorsPack}; -use crate::market::negotiator::common::{ - reason_with_extra, AgreementFinalized, CreateOffer, ReactToAgreement, ReactToProposal, -}; -use crate::market::negotiator::factory::CompositeNegotiatorConfig; -use crate::market::negotiator::{NegotiatorComponent, ProposalView}; -use crate::market::ProviderMarket; - -/// Negotiator that can limit number of running agreements. -pub struct CompositeNegotiator { - components: NegotiatorsPack, -} - -impl CompositeNegotiator { - pub fn new( - _market: Addr, - config: &CompositeNegotiatorConfig, - ) -> anyhow::Result { - let components = NegotiatorsPack::new() - .add_component( - "LimitAgreements", - Box::new(MaxAgreements::new(&config.limit_agreements_config)), - ) - .add_component( - "LimitExpiration", - Box::new(LimitExpiration::new(&config.expire_agreements_config)?), - ); - - Ok(CompositeNegotiator { components }) - } -} - -impl Handler for CompositeNegotiator { - type Result = anyhow::Result; - - fn handle(&mut self, msg: CreateOffer, _: &mut Context) -> Self::Result { - let offer = self.components.fill_template(msg.offer_definition)?; - Ok(offer_definition_to_offer(offer)) - } -} - -impl Handler for CompositeNegotiator { - type Result = anyhow::Result; - - fn handle(&mut self, msg: ReactToProposal, _: &mut Context) -> Self::Result { - // In current implementation we don't allow to change constraints, so we take - // them from initial Offer. - let constraints = msg.prev_proposal.constraints; - let proposal = ProposalView { - agreement_id: msg.demand.proposal_id, - json: expand(msg.demand.properties), - }; - - let offer_proposal = ProposalView { - json: expand(msg.prev_proposal.properties), - agreement_id: msg.prev_proposal.proposal_id, - }; - - let result = self.components.negotiate_step(&proposal, offer_proposal)?; - match result { - NegotiationResult::Reject { message, is_final } => { - Ok(ProposalResponse::RejectProposal { - reason: Some(reason_with_extra( - message, - serde_json::json!({ "golem.proposal.rejection.is-final": is_final }), - )), - is_final, - }) - } - NegotiationResult::Ready { offer } | NegotiationResult::Negotiating { offer } => { - let offer = NewOffer { - properties: flatten_value(offer.json), - constraints, - }; - Ok(ProposalResponse::CounterProposal { offer }) - } - } - } -} - -pub fn to_proposal_views( - mut agreement: AgreementView, -) -> anyhow::Result<(ProposalView, ProposalView)> { - // Dispatch Agreement into separate Demand-Offer Proposal pair. - // TODO: We should get ProposalId here, but Agreement doen't store it anywhere. - let offer_id = agreement.pointer_typed("/offer/offerId")?; - let demand_id = agreement.pointer_typed("/demand/demandId")?; - let offer_proposal = agreement - .json - .pointer_mut("/offer/properties") - .map(Value::take) - .unwrap_or(Value::Null); - - let demand_proposal = agreement - .json - .pointer_mut("/demand/properties") - .map(Value::take) - .unwrap_or(Value::Null); - - let offer_proposal = ProposalView { - json: offer_proposal, - agreement_id: offer_id, - }; - - let demand_proposal = ProposalView { - json: demand_proposal, - agreement_id: demand_id, - }; - Ok((demand_proposal, offer_proposal)) -} - -impl Handler for CompositeNegotiator { - type Result = anyhow::Result; - - fn handle(&mut self, msg: ReactToAgreement, _: &mut Context) -> Self::Result { - let agreement_id = msg.agreement.agreement_id.clone(); - let (demand_proposal, offer_proposal) = to_proposal_views(msg.agreement).map_err(|e| { - anyhow!( - "Negotiator failed to extract Proposals from Agreement. {}", - e - ) - })?; - - // We expect that all `NegotiatorComponents` should return ready state. - // Otherwise we must reject Agreement proposals, because negotiations didn't end. - match self - .components - .negotiate_step(&demand_proposal, offer_proposal)? - { - NegotiationResult::Ready { .. } => { - self.components.on_agreement_approved(&agreement_id)?; - Ok(AgreementResponse::ApproveAgreement) - } - NegotiationResult::Reject { message, is_final } => { - Ok(AgreementResponse::RejectAgreement { - reason: Some(reason_with_extra( - message, - serde_json::json!({ "golem.proposal.rejection.is-final": is_final }), - )), - is_final, - }) - } - NegotiationResult::Negotiating { .. } => Ok(AgreementResponse::RejectAgreement { - reason: Some(reason_with_extra( - format!("Negotiations aren't finished."), - serde_json::json!({ "golem.proposal.rejection.is-final": false }), - )), - is_final: false, - }), - } - } -} - -impl Handler for CompositeNegotiator { - type Result = anyhow::Result<()>; - - fn handle(&mut self, msg: AgreementFinalized, _: &mut Context) -> Self::Result { - self.components - .on_agreement_terminated(&msg.agreement_id, &msg.result) - } -} - -impl Negotiator for CompositeNegotiator {} -impl Actor for CompositeNegotiator { - type Context = Context; -} diff --git a/agent/provider/src/market/negotiator/factory.rs b/agent/provider/src/market/negotiator/factory.rs deleted file mode 100644 index dbecc8d335..0000000000 --- a/agent/provider/src/market/negotiator/factory.rs +++ /dev/null @@ -1,64 +0,0 @@ -use actix::Addr; -use humantime; -use std::sync::Arc; -use structopt::StructOpt; - -use super::common::NegotiatorAddr; -use crate::market::config::MarketConfig; -use crate::market::negotiator::{AcceptAllNegotiator, CompositeNegotiator}; -use crate::market::ProviderMarket; - -/// Configuration for LimitAgreements Negotiator. -#[derive(StructOpt, Clone, Debug)] -pub struct LimitAgreementsNegotiatorConfig { - #[structopt(long, env, default_value = "1")] - pub max_simultaneous_agreements: u32, -} - -/// Configuration for LimitAgreements Negotiator. -#[derive(StructOpt, Clone, Debug)] -pub struct AgreementExpirationNegotiatorConfig { - #[structopt(long, env, parse(try_from_str = humantime::parse_duration), default_value = "5min")] - pub min_agreement_expiration: std::time::Duration, - #[structopt(long, env, parse(try_from_str = humantime::parse_duration), default_value = "10h")] - pub max_agreement_expiration: std::time::Duration, - #[structopt(long, env, parse(try_from_str = humantime::parse_duration), default_value = "30min")] - pub max_agreement_expiration_without_deadline: std::time::Duration, - #[structopt(long, env, parse(try_from_str = humantime::parse_duration), default_value = "4min")] - pub debit_note_acceptance_deadline: std::time::Duration, -} - -/// Configuration for LimitAgreements Negotiator. -#[derive(StructOpt, Clone, Debug)] -pub struct CompositeNegotiatorConfig { - #[structopt(flatten)] - pub limit_agreements_config: LimitAgreementsNegotiatorConfig, - #[structopt(flatten)] - pub expire_agreements_config: AgreementExpirationNegotiatorConfig, -} - -#[derive(StructOpt, Clone, Debug)] -pub struct NegotiatorsConfig { - #[structopt(flatten)] - pub composite_config: CompositeNegotiatorConfig, -} - -pub fn create_negotiator( - market: Addr, - config: &MarketConfig, -) -> Arc { - let negotiator = match &config.negotiator_type[..] { - "Composite" => NegotiatorAddr::from( - CompositeNegotiator::new(market, &config.negotiator_config.composite_config).unwrap(), - ), - "AcceptAll" => NegotiatorAddr::from(AcceptAllNegotiator::new()), - _ => Default::default(), - }; - Arc::new(negotiator) -} - -impl Default for NegotiatorAddr { - fn default() -> Self { - NegotiatorAddr::from(AcceptAllNegotiator::new()) - } -} diff --git a/agent/provider/src/market/provider_market.rs b/agent/provider/src/market/provider_market.rs index adb008f363..fd30afebc5 100644 --- a/agent/provider/src/market/provider_market.rs +++ b/agent/provider/src/market/provider_market.rs @@ -6,31 +6,37 @@ use chrono::Utc; use derive_more::Display; use futures::prelude::*; use futures_util::FutureExt; +use serde_yaml; use std::collections::HashMap; -use std::convert::TryFrom; -use std::str::FromStr; +use std::convert::{TryFrom, TryInto}; +use std::fs::File; +use std::io::{BufReader, Write}; +use std::path::Path; use std::sync::Arc; +use tokio::sync::mpsc; use ya_agreement_utils::{AgreementView, OfferDefinition}; use ya_client::market::MarketProviderApi; -use ya_client::model::market::agreement_event::AgreementEventType; +use ya_client::model::market::agreement_event::{AgreementEventType, AgreementTerminator}; use ya_client::model::market::proposal::State; -use ya_client::model::market::{ - agreement_event::AgreementTerminator, Agreement, NewOffer, Proposal, ProviderEvent, Reason, -}; +use ya_client::model::market::{Agreement, NewOffer, Proposal, ProviderEvent, Reason}; use ya_client::model::NodeId; +use ya_client_model::market::NewProposal; +use ya_negotiators::factory::NegotiatorsConfig; +use ya_negotiators::{ + factory, AgreementAction, AgreementResult, NegotiatorAddr, NegotiatorCallbacks, ProposalAction, +}; use ya_std_utils::LogErr; use ya_utils_actix::{ actix_handler::ResultTypeGetter, actix_signal::SignalSlot, actix_signal_handler, forward_actix_handler, }; -use super::negotiator::factory; -use super::negotiator::{AgreementResponse, AgreementResult, NegotiatorAddr, ProposalResponse}; use super::Preset; use crate::display::EnableDisplay; use crate::market::config::MarketConfig; -use crate::market::termination_reason::GolemReason; +use crate::market::negotiator::builtin::*; +use crate::market::termination_reason::{GolemReason, ProviderAgreementResult}; use crate::tasks::task_manager::ClosingCause; use crate::tasks::{AgreementBroken, AgreementClosed, CloseAgreement}; @@ -85,7 +91,7 @@ struct Subscription { #[rtype(result = "Result<()>")] struct AgreementFinalized { id: String, - result: AgreementResult, + result: ProviderAgreementResult, } #[derive(Message)] @@ -121,6 +127,9 @@ pub struct ProviderMarket { /// Infinite tasks requiring to be killed on shutdown. handles: HashMap, + + /// Temporary - used only during initialization + callbacks: Option, } #[derive(Clone)] @@ -131,22 +140,74 @@ struct AsyncCtx { negotiator: Arc, } +fn load_negotiators_config(data_dir: &Path) -> Result { + // Register ya-provider built-in negotiators + register_negotiators(); + + let path = data_dir.join("negotiators-config.yaml"); + + log::debug!("Loading negotiators config: {:?}", path); + + Ok(match File::open(&path) { + Ok(file) => serde_yaml::from_reader(BufReader::new(file))?, + Err(_) => { + let mut negotiator_config = NegotiatorsConfig::default(); + + // Add default negotiators. + negotiator_config + .negotiators + .push(expiration::Config::from_env()?); + negotiator_config + .negotiators + .push(max_agreements::Config::from_env()?); + + let content = serde_yaml::to_string(&negotiator_config)?; + File::create(&path) + .map_err(|e| anyhow!("Can't create file: {:?}. Error: {}", path, e))? + .write_all(content.as_bytes())?; + negotiator_config + } + }) +} + impl ProviderMarket { // =========================================== // // Initialization // =========================================== // - pub fn new(api: MarketProviderApi, config: MarketConfig) -> ProviderMarket { - return ProviderMarket { + pub fn new( + api: MarketProviderApi, + data_dir: &Path, + config: MarketConfig, + ) -> Result { + let negotiator_config = load_negotiators_config(data_dir).map_err(|e| { + anyhow!( + "Failed to load negotiators config from {}. Error: {}", + data_dir.display(), + e + ) + })?; + + let negotiators_workdir = data_dir.join(&config.negotiators_workdir); + std::fs::create_dir_all(&negotiators_workdir)?; + + let (negotiator, callbacks) = factory::create_negotiator( + negotiator_config, + negotiators_workdir, + config.negotiators_plugins.clone(), + )?; + + Ok(ProviderMarket { api: Arc::new(api), - negotiator: Arc::new(NegotiatorAddr::default()), + negotiator, config: Arc::new(config), subscriptions: HashMap::new(), postponed_demands: Vec::new(), agreement_signed_signal: SignalSlot::::new(), agreement_terminated_signal: SignalSlot::::new(), handles: HashMap::new(), - }; + callbacks: Some(callbacks), + }) } fn async_context(&self, ctx: &mut Context) -> AsyncCtx { @@ -179,7 +240,7 @@ impl ProviderMarket { // =========================================== // fn on_agreement_approved(&mut self, msg: NewAgreement, _ctx: &mut Context) -> Result<()> { - log::info!("Got approved agreement [{}].", msg.agreement.agreement_id,); + log::info!("Got approved agreement [{}].", msg.agreement.id,); // At this moment we only forward agreement to outside world. self.agreement_signed_signal.send_signal(msg) } @@ -268,18 +329,18 @@ async fn dispatch_event( async fn process_proposal( ctx: AsyncCtx, subscription: Subscription, - demand: &Proposal, + their: &Proposal, ) -> Result<()> { - let proposal_id = &demand.proposal_id; + let proposal_id = &their.proposal_id; log::info!( "Got proposal [{}] from Requestor [{}] for subscription [{}].", proposal_id, - demand.issuer_id, + their.issuer_id, subscription.preset.name, ); - let prev_proposal = match &demand.prev_proposal_id { + let prev_proposal = match &their.prev_proposal_id { Some(prev_proposal_id) => ctx .api .get_proposal(&subscription.id, prev_proposal_id) @@ -297,16 +358,15 @@ async fn process_proposal( properties: subscription.offer.properties.clone(), constraints: subscription.offer.constraints.clone(), proposal_id: subscription.id.clone(), - issuer_id: NodeId::from_str("0x000000000000000000000000000000000000000")?, // How to set? + issuer_id: NodeId::default(), state: State::Initial, timestamp: Utc::now(), // How to set? prev_proposal_id: None, }, }; - let action = ctx - .negotiator - .react_to_proposal(prev_proposal, demand.clone()) + ctx.negotiator + .react_to_proposal(&subscription.id, &prev_proposal, &their) .await .map_err(|e| { anyhow!( @@ -314,47 +374,7 @@ async fn process_proposal( proposal_id, e ) - })?; - - log::info!( - "Decided to {} [{}] for subscription [{}].", - action, - proposal_id, - subscription.preset.name - ); - - match action { - ProposalResponse::CounterProposal { offer } => { - ctx.api - .counter_proposal(&offer, &subscription.id, proposal_id) - .await?; - } - ProposalResponse::AcceptProposal => { - ctx.api - .counter_proposal(&subscription.offer, &subscription.id, proposal_id) - .await?; - } - ProposalResponse::IgnoreProposal => log::info!("Ignoring proposal {:?}", proposal_id), - ProposalResponse::RejectProposal { reason, is_final } => { - if !is_final { - let sub_dem = SubscriptionProposal { - subscription_id: subscription.id.clone(), - proposal: demand.clone(), - }; - log::debug!( - "Postponing rejected Proposal [{}] from Requestor [{}]. Reason: {}", - demand.proposal_id, - demand.issuer_id, - reason.display() - ); - ctx.market.do_send(PostponeDemand(sub_dem)); - } - ctx.api - .reject_proposal(&subscription.id, proposal_id, &reason) - .await?; - } - }; - Ok(()) + }) } async fn process_agreement( @@ -369,76 +389,19 @@ async fn process_agreement( subscription.preset.name, ); - let config = ctx.config; let agreement = AgreementView::try_from(agreement) .map_err(|e| anyhow!("Invalid agreement. Error: {}", e))?; - let action = ctx - .negotiator - .react_to_agreement(&agreement) + ctx.negotiator + .react_to_agreement(&subscription.id, &agreement) .await .map_err(|e| { anyhow!( "Negotiator error while processing agreement [{}]. Error: {}", - agreement.agreement_id, + agreement.id, e ) - })?; - - log::info!( - "Decided to {} [{}] for subscription [{}].", - action, - agreement.agreement_id, - subscription.preset.name - ); - - match action { - AgreementResponse::ApproveAgreement => { - // Prepare Provider for Agreement. We aren't sure here, that approval will - // succeed, but we are obligated to reserve all promised resources for Requestor, - // so after `approve_agreement` will return, we are ready to create activities. - ctx.market - .send(NewAgreement { - agreement: agreement.clone(), - }) - .await? - .ok(); - - // TODO: We should retry approval, but only a few times, than we should - // give up since it's better to take another agreement. - let result = ctx - .api - .approve_agreement( - &agreement.agreement_id, - Some(config.session_id.clone()), - Some(config.agreement_approve_timeout), - ) - .await; - - if let Err(error) = result { - // Notify negotiator, that we couldn't approve. - let msg = AgreementFinalized { - id: agreement.agreement_id.clone(), - result: AgreementResult::ApprovalFailed, - }; - let _ = ctx.market.send(msg).await; - return Err(anyhow!( - "Failed to approve agreement [{}]. Error: {}", - agreement.agreement_id, - error - )); - } - - // We negotiated agreement and here responsibility of ProviderMarket ends. - // Notify outside world about agreement for further processing. - } - AgreementResponse::RejectAgreement { reason, .. } => { - ctx.api - .reject_agreement(&agreement.agreement_id, &reason) - .await?; - } - }; - Ok(()) + }) } async fn collect_agreement_events(ctx: AsyncCtx) { @@ -525,6 +488,149 @@ async fn collect_negotiation_events(ctx: AsyncCtx, subscription: Subscription) { } } +async fn collect_proposal_decisions( + ctx: AsyncCtx, + mut decisions: mpsc::UnboundedReceiver, +) { + while let Some(action) = decisions.recv().await { + process_proposal_decision(ctx.clone(), action) + .await + .map_err(|e| log::error!("Failed to process Proposal decision: {}", e)) + .ok(); + } +} + +async fn process_proposal_decision(ctx: AsyncCtx, decision: ProposalAction) -> anyhow::Result<()> { + // log::info!( + // "Decided to {} [{}] for subscription [{}].", + // decision, + // decision.id(), + // subscription.preset.name + // ); + + match decision { + ProposalAction::CounterProposal { + id, + subscription_id, + proposal, + } => { + ctx.api + .counter_proposal(&proposal, &subscription_id, &id) + .await?; + } + ProposalAction::AcceptProposal { + id, + subscription_id, + } => { + // Accepting Proposal means, that we counter with the same Proposal, as we + // sent in previous round. + let proposal = ctx.api.get_proposal(&subscription_id, &id).await?; + let proposal = ctx + .api + .get_proposal( + &subscription_id, + &proposal.prev_proposal_id.unwrap_or("".to_string()), + ) + .await?; + + ctx.api + .counter_proposal( + &NewProposal { + properties: proposal.properties, + constraints: proposal.constraints, + }, + &subscription_id, + &id, + ) + .await?; + } + ProposalAction::RejectProposal { + id, + subscription_id, + reason, + } => { + ctx.api + .reject_proposal(&subscription_id, &id, &reason) + .await?; + } + }; + Ok(()) +} + +async fn collect_agreement_decisions( + ctx: AsyncCtx, + mut decisions: mpsc::UnboundedReceiver, +) { + while let Some(action) = decisions.recv().await { + process_agreement_decision(ctx.clone(), action) + .await + .map_err(|e| log::error!("Failed to process Agreement decision: {}", e)) + .ok(); + } +} + +async fn process_agreement_decision( + ctx: AsyncCtx, + decision: AgreementAction, +) -> anyhow::Result<()> { + // log::info!( + // "Decided to {} [{}] for subscription [{}].", + // decision, + // decision.id(), + // subscription.preset.name + // ); + + let config = ctx.config; + match decision { + AgreementAction::ApproveAgreement { id, .. } => { + let agreement = ctx.api.get_agreement(&id).await?; + let agreement = AgreementView::try_from(&agreement)?; + + // Prepare Provider for Agreement. We aren't sure here, that approval will + // succeed, but we are obligated to reserve all promised resources for Requestor, + // so after `approve_agreement` will return, we are ready to create activities. + ctx.market + .send(NewAgreement { + agreement: agreement.clone(), + }) + .await? + .ok(); + + // TODO: We should retry approval, but only a few times, than we should + // give up since it's better to take another agreement. + let result = ctx + .api + .approve_agreement( + &id, + Some(config.session_id.clone()), + Some(config.agreement_approve_timeout), + ) + .await; + + if let Err(error) = result { + // Notify negotiator, that we couldn't approve. + let msg = AgreementFinalized { + id: id.clone(), + result: ProviderAgreementResult::ApprovalFailed, + }; + let _ = ctx.market.send(msg).await; + return Err(anyhow!( + "Failed to approve agreement [{}]. Error: {}", + id, + error + )); + } + + // We negotiated agreement and here responsibility of ProviderMarket ends. + // Notify outside world about agreement for further processing. + } + AgreementAction::RejectAgreement { reason, id, .. } => { + ctx.api.reject_agreement(&id, &reason).await?; + } + }; + Ok(()) +} + #[derive(Message)] #[rtype(result = "Result<()>")] struct ReSubscribe(String); @@ -578,10 +684,26 @@ impl Actor for ProviderMarket { // Note: There will be no collision with subscription ids stored normally here. self.handles.insert( "collect-agreement-events".to_string(), - ctx.spawn(collect_agreement_events(actx).into_actor(self)), + ctx.spawn(collect_agreement_events(actx.clone()).into_actor(self)), ); - self.negotiator = factory::create_negotiator(ctx.address(), &self.config); + if let Some(callbacks) = self.callbacks.take() { + self.handles.insert( + "collect-agreement-decisions".to_string(), + ctx.spawn( + collect_agreement_decisions(actx.clone(), callbacks.agreement_channel) + .into_actor(self), + ), + ); + + self.handles.insert( + "collect-proposal-decisions".to_string(), + ctx.spawn( + collect_proposal_decisions(actx.clone(), callbacks.proposal_channel) + .into_actor(self), + ), + ); + } } } @@ -652,7 +774,7 @@ impl Handler for ProviderMarket { let offer = ctx .negotiator - .create_offer(&msg.offer_definition) + .create_offer(&msg.offer_definition.into_template()) .await .log_err_msg(&format!( "Negotiator failed to create offer for preset [{}]", @@ -676,15 +798,17 @@ impl Handler for ProviderMarket { } } +/// Terminate Agreement on yagna Daemon. async fn terminate_agreement(api: Arc, msg: AgreementFinalized) { let id = msg.id; let reason = match &msg.result { - AgreementResult::ClosedByUs => GolemReason::success(), - AgreementResult::Broken { reason } => GolemReason::new(reason), + ProviderAgreementResult::ClosedByUs => GolemReason::success(), + ProviderAgreementResult::BrokenByUs { reason } => GolemReason::new(reason), // No need to terminate, because Requestor already did it. - AgreementResult::ClosedByRequestor => return (), + ProviderAgreementResult::ClosedByRequestor => return (), + ProviderAgreementResult::BrokenByRequestor { .. } => return (), // No need to terminate since we didn't have Agreement with Requestor. - AgreementResult::ApprovalFailed => return (), + ProviderAgreementResult::ApprovalFailed => return (), }; log::info!( @@ -786,7 +910,7 @@ impl Handler for ProviderMarket { let agreement_id = msg.id.clone(); let result = msg.result.clone(); - if let AgreementResult::ApprovalFailed = &msg.result { + if let ProviderAgreementResult::ApprovalFailed = &msg.result { self.agreement_terminated_signal .send_signal(CloseAgreement { cause: ClosingCause::ApprovalFail, @@ -802,13 +926,14 @@ impl Handler for ProviderMarket { let async_ctx = ctx.clone(); let future = async move { ctx.negotiator - .agreement_finalized(&agreement_id, result) + .agreement_finalized(&agreement_id, result.try_into()?) .await .log_err_msg(&format!( - "Negotiator failed while handling agreement [{}] finalize", + "Negotiator failed while handling agreement [{}] finalize.", &agreement_id, )) .ok(); + anyhow::Result::<()>::Ok(()) } .into_actor(self) .map(|_, myself, ctx| { @@ -827,6 +952,8 @@ impl Handler for ProviderMarket { } } +/// Market handles closed Agreement the same way as Broken Agreement, so we +/// translate event to AgreementFinalized and send to ourselves. impl Handler for ProviderMarket { type Result = ResponseFuture>; @@ -838,6 +965,8 @@ impl Handler for ProviderMarket { } } +/// Market handles closed Agreement the same way as Broken Agreement, so we +/// translate event to AgreementFinalized and send to ourselves. impl Handler for ProviderMarket { type Result = ResponseFuture>; @@ -918,7 +1047,7 @@ impl From for AgreementFinalized { fn from(msg: AgreementBroken) -> Self { AgreementFinalized { id: msg.agreement_id, - result: AgreementResult::Broken { reason: msg.reason }, + result: ProviderAgreementResult::BrokenByUs { reason: msg.reason }, } } } @@ -926,8 +1055,8 @@ impl From for AgreementFinalized { impl From for AgreementFinalized { fn from(msg: AgreementClosed) -> Self { let result = match msg.send_terminate { - true => AgreementResult::ClosedByUs, - false => AgreementResult::ClosedByRequestor, + true => ProviderAgreementResult::ClosedByUs, + false => ProviderAgreementResult::ClosedByRequestor, }; AgreementFinalized { @@ -936,3 +1065,23 @@ impl From for AgreementFinalized { } } } + +impl TryInto for ProviderAgreementResult { + type Error = anyhow::Error; + + fn try_into(self) -> anyhow::Result { + match self { + ProviderAgreementResult::ApprovalFailed => Err(anyhow!( + "AgreementResult::ApprovalFailed can't be translated" + )), + ProviderAgreementResult::ClosedByUs => Ok(AgreementResult::ClosedByUs), + ProviderAgreementResult::ClosedByRequestor => Ok(AgreementResult::ClosedByThem), + ProviderAgreementResult::BrokenByUs { reason } => Ok(AgreementResult::BrokenByUs { + reason: GolemReason::new(&reason).to_client(), + }), + ProviderAgreementResult::BrokenByRequestor { reason } => { + Ok(AgreementResult::BrokenByThem { reason }) + } + } + } +} diff --git a/agent/provider/src/market/termination_reason.rs b/agent/provider/src/market/termination_reason.rs index 5f059f28ae..6a926f57ec 100644 --- a/agent/provider/src/market/termination_reason.rs +++ b/agent/provider/src/market/termination_reason.rs @@ -11,6 +11,21 @@ use crate::display::EnableDisplay; use ya_client::model::market::Reason; +#[derive(Clone)] +pub enum ProviderAgreementResult { + /// Failed to approve agreement. (Agreement even wasn't created) + /// It can happen for Provider in case call to `approve_agreement` will fail. + ApprovalFailed, + /// Agreement was finished with success after first Activity. + ClosedByUs, + /// Agreement was finished with success by Requestor. + ClosedByRequestor, + /// Agreement was broken by us. It indicates not successful end of Agreement. + BrokenByUs { reason: BreakReason }, + /// Agreement was broken by Requestor. It indicates not successful end of Agreement. + BrokenByRequestor { reason: Option }, +} + #[derive(Display, EnumMessage, Debug, Clone, PartialEq)] #[non_exhaustive] pub enum BreakReason { diff --git a/agent/provider/src/payments/agreement.rs b/agent/provider/src/payments/agreement.rs index 156d0bf743..d2a5326f38 100644 --- a/agent/provider/src/payments/agreement.rs +++ b/agent/provider/src/payments/agreement.rs @@ -81,7 +81,7 @@ impl AgreementPayment { if let Some(deadline) = &debit_deadline { log::info!( "Requestor is expected to accept DebitNotes for Agreement [{}] in {}", - &agreement.agreement_id, + &agreement.id, deadline.display() ); } @@ -90,7 +90,7 @@ impl AgreementPayment { let (sender, receiver) = watch::channel(0); Ok(AgreementPayment { - agreement_id: agreement.agreement_id.clone(), + agreement_id: agreement.id.clone(), activities: HashMap::new(), payment_model, update_interval, diff --git a/agent/provider/src/payments/payments.rs b/agent/provider/src/payments/payments.rs index cb4751b786..cc35791cb1 100644 --- a/agent/provider/src/payments/payments.rs +++ b/agent/provider/src/payments/payments.rs @@ -178,20 +178,19 @@ impl Payments { ) -> Result<()> { log::info!( "Payments got signed agreement [{}]. Waiting for activities creation...", - &msg.agreement.agreement_id + &msg.agreement.id ); match AgreementPayment::new(&msg.agreement) { Ok(agreement) => { - self.agreements - .insert(msg.agreement.agreement_id.clone(), agreement); + self.agreements.insert(msg.agreement.id.clone(), agreement); Ok(()) } Err(error) => { //TODO: What should we do? Maybe terminate agreement? log::error!( "Failed to create payment model for agreement [{}]. Error: {}", - &msg.agreement.agreement_id, + &msg.agreement.id, error ); Err(error) diff --git a/agent/provider/src/provider_agent.rs b/agent/provider/src/provider_agent.rs index fda13b56f8..be4c99faf5 100644 --- a/agent/provider/src/provider_agent.rs +++ b/agent/provider/src/provider_agent.rs @@ -147,7 +147,7 @@ impl ProviderAgent { let mut hardware = hardware::Manager::try_new(&config)?; hardware.spawn_monitor(&config.hardware_file)?; - let market = ProviderMarket::new(api.market, args.market).start(); + let market = ProviderMarket::new(api.market, &data_dir, args.market)?.start(); let payments = Payments::new(api.activity.clone(), api.payment, args.payment).start(); let runner = TaskRunner::new(api.activity, args.runner, registry, data_dir)?.start(); let task_manager = diff --git a/agent/provider/src/startup_config.rs b/agent/provider/src/startup_config.rs index 2e7bd69a3b..c69613a8c4 100644 --- a/agent/provider/src/startup_config.rs +++ b/agent/provider/src/startup_config.rs @@ -27,9 +27,9 @@ use crate::payments::PaymentsConfig; use crate::tasks::config::TaskConfig; lazy_static::lazy_static! { - static ref DEFAULT_DATA_DIR: String = DataDir::new(clap::crate_name!()).to_string(); + pub static ref DEFAULT_DATA_DIR: String = DataDir::new(clap::crate_name!()).to_string(); - static ref DEFAULT_PLUGINS_DIR : PathBuf = default_plugins(); + pub static ref DEFAULT_PLUGINS_DIR : PathBuf = default_plugins(); } pub(crate) const PRESETS_JSON: &'static str = "presets.json"; pub(crate) const HARDWARE_JSON: &'static str = "hardware.json"; diff --git a/agent/provider/src/tasks/task_info.rs b/agent/provider/src/tasks/task_info.rs index a1a2de76ef..c00a9c4d3c 100644 --- a/agent/provider/src/tasks/task_info.rs +++ b/agent/provider/src/tasks/task_info.rs @@ -47,7 +47,7 @@ fn multi_activity_from(agreement: &AgreementView) -> Result { impl TaskInfo { pub fn from(agreement: &AgreementView) -> Result { Ok(TaskInfo { - agreement_id: agreement.agreement_id.clone(), + agreement_id: agreement.id.clone(), expiration: agreement_expiration_from(agreement)?, multi_activity: multi_activity_from(agreement)?, idle_agreement_timeout: Duration::from_secs(90), diff --git a/agent/provider/src/tasks/task_manager.rs b/agent/provider/src/tasks/task_manager.rs index dd0b686e8c..271e953876 100644 --- a/agent/provider/src/tasks/task_manager.rs +++ b/agent/provider/src/tasks/task_manager.rs @@ -47,7 +47,7 @@ pub enum ClosingCause { SingleActivity, } -/// Notifies TaskManager that Requestor close agreement. +/// Notifies TaskManager, that Requestor closed agreement. #[derive(Message, Clone)] #[rtype(result = "Result<()>")] pub struct CloseAgreement { @@ -246,7 +246,7 @@ impl TaskManager { } fn add_new_agreement(&mut self, msg: &NewAgreement) -> anyhow::Result { - let agreement_id = msg.agreement.agreement_id.clone(); + let agreement_id = msg.agreement.id.clone(); self.tasks.new_agreement(&agreement_id)?; let props = TaskInfo::from(&msg.agreement) @@ -340,12 +340,7 @@ impl Handler for TaskManager { actx.runner.send(msg.clone()).await??; actx.payments.send(msg.clone()).await??; - finish_transition( - &actx.myself, - &msg.agreement.agreement_id, - AgreementState::Initialized, - ) - .await + finish_transition(&actx.myself, &msg.agreement.id, AgreementState::Initialized).await } .into_actor(self) .map(