From a7e19fde1f2e630eb098dcd9ec93d529c5d17d5b Mon Sep 17 00:00:00 2001 From: Robin5605 Date: Sat, 12 Aug 2023 21:08:54 -0500 Subject: [PATCH] initial draft: rabbitmq --- Cargo.lock | 838 +++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 11 +- src/app_config.rs | 4 + src/client.rs | 399 +++++++++++--------- src/client/methods.rs | 65 +--- src/client/models.rs | 55 +-- src/error.rs | 24 ++ src/main.rs | 250 ++++++------- src/scanner.rs | 70 +--- 9 files changed, 1249 insertions(+), 467 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d967686..b4c56b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,6 +37,80 @@ dependencies = [ "memchr", ] +[[package]] +name = "amq-protocol" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d40d8b2465c7959dd40cee32ba6ac334b5de57e9fca0cc756759894a4152a5d" +dependencies = [ + "amq-protocol-tcp", + "amq-protocol-types", + "amq-protocol-uri", + "cookie-factory", + "nom", + "serde", +] + +[[package]] +name = "amq-protocol-tcp" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cb2100adae7da61953a2c3a01935d86caae13329fadce3333f524d6d6ce12e2" +dependencies = [ + "amq-protocol-uri", + "tcp-stream", + "tracing", +] + +[[package]] +name = "amq-protocol-types" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "156ff13c8a3ced600b4e54ed826a2ae6242b6069d00dd98466827cef07d3daff" +dependencies = [ + "cookie-factory", + "nom", + "serde", + "serde_json", +] + +[[package]] +name = "amq-protocol-uri" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "751bbd7d440576066233e740576f1b31fdc6ab86cfabfbd48c548de77eca73e4" +dependencies = [ + "amq-protocol-types", + "percent-encoding", + "url", +] + +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-compression" version = "0.4.1" @@ -45,17 +119,175 @@ checksum = "62b74f44609f0f91493e3082d3734d98497e094777144380ea4db9f9905dd5b6" dependencies = [ "flate2", "futures-core", + "futures-io", "memchr", "pin-project-lite", "tokio", ] +[[package]] +name = "async-executor" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fa3dc5f2a8564f07759c008b9109dc0d39de92a88d5588b8a5036d286383afb" +dependencies = [ + "async-lock", + "async-task", + "concurrent-queue", + "fastrand 1.9.0", + "futures-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + +[[package]] +name = "async-global-executor-trait" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33dd14c5a15affd2abcff50d84efd4009ada28a860f01c14f9d654f3e81b3f75" +dependencies = [ + "async-global-executor", + "async-trait", + "executor-trait", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix 0.37.23", + "slab", + "socket2", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6012d170ad00de56c9ee354aef2e358359deb1ec504254e0e5a3774771de0e" +dependencies = [ + "async-io", + "async-trait", + "futures-core", + "reactor-trait", +] + +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-channel", + "async-global-executor", + "async-io", + "async-lock", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-tar" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c49359998a76e32ef6e870dbc079ebad8f1e53e8441c5dd39d27b44493fe331" +dependencies = [ + "async-std", + "filetime", + "libc", + "pin-project", + "redox_syscall 0.2.16", + "xattr", +] + +[[package]] +name = "async-task" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" + +[[package]] +name = "async-trait" +version = "0.1.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc6dde6e4ed435a4c1ee4e73592f5ba9da2151af10076cc04858746af9352d09" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.28", +] + +[[package]] +name = "async_zip" +version = "0.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "795310de3218cde15219fc98c1cf7d8fe9db4865aab27fcf1d535d6cb61c6b54" +dependencies = [ + "crc32fast", + "futures-util", + "log", + "pin-project", + "thiserror", +] + [[package]] name = "atomic" version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" +[[package]] +name = "atomic-waker" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3" + [[package]] name = "autocfg" version = "1.1.0" @@ -132,6 +364,30 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + +[[package]] +name = "blocking" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "atomic-waker", + "fastrand 1.9.0", + "futures-lite", + "log", +] + [[package]] name = "bumpalo" version = "3.13.0" @@ -171,6 +427,15 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.0.79" @@ -195,6 +460,21 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "time 0.1.45", + "wasm-bindgen", + "winapi", +] + [[package]] name = "cipher" version = "0.4.4" @@ -216,12 +496,27 @@ dependencies = [ "libloading", ] +[[package]] +name = "concurrent-queue" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "constant_time_eq" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" +[[package]] +name = "cookie-factory" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" + [[package]] name = "core-foundation" version = "0.9.3" @@ -281,6 +576,15 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8810e7e2cf385b1e9b50d68264908ec367ba642c96d02edfe61c39e88e2a3c01" +[[package]] +name = "des" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdd80ce8ce993de27e9f063a444a4d53ce8e8db4c1f00cc03af5ad5a9867a1e" +dependencies = [ + "cipher", +] + [[package]] name = "digest" version = "0.10.7" @@ -292,21 +596,36 @@ dependencies = [ "subtle", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "dragonfly-client-rs" version = "0.1.0" dependencies = [ + "async-compression", + "async-tar", + "async_zip", + "chrono", "env_logger", "figment", "flate2", + "futures", + "futures-io", + "lapin", "log", "once_cell", "parking_lot", "reqwest", "serde", + "serde_json", "tar", "thiserror", "threadpool", + "tokio", "tracing", "tracing-subscriber", "yara", @@ -369,6 +688,30 @@ dependencies = [ "libc", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "executor-trait" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a1052dd43212a7777ec6a69b117da52f5e52f07aec47d00c1a2b33b85d06b08" +dependencies = [ + "async-trait", +] + +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.0.0" @@ -411,6 +754,18 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "pin-project", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -441,6 +796,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.28" @@ -448,6 +818,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -456,12 +827,49 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +[[package]] +name = "futures-executor" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + +[[package]] +name = "futures-macro" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.28", +] + [[package]] name = "futures-sink" version = "0.3.28" @@ -480,8 +888,11 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", + "futures-sink", "futures-task", "memchr", "pin-project-lite", @@ -499,6 +910,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.11.0+wasi-snapshot-preview1", +] + [[package]] name = "gimli" version = "0.27.3" @@ -511,6 +933,18 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.3.20" @@ -634,6 +1068,29 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "iana-time-zone" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fad5b825842d2b38bd206f3e81d6957625fd7f0a361e345c30e01a0ae2dd613" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "idna" version = "0.4.0" @@ -676,9 +1133,30 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" dependencies = [ + "block-padding", "generic-array", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys", +] + [[package]] name = "ipnet" version = "2.8.0" @@ -692,7 +1170,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi", - "rustix", + "rustix 0.38.4", "windows-sys", ] @@ -720,6 +1198,37 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + +[[package]] +name = "lapin" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f3067a1fcfbc3fc46455809c023e69b8f6602463201010f4ae5a3b572adb9dc" +dependencies = [ + "amq-protocol", + "async-global-executor-trait", + "async-reactor-trait", + "async-trait", + "executor-trait", + "flume", + "futures-core", + "futures-io", + "parking_lot", + "pinky-swear", + "reactor-trait", + "serde", + "tracing", + "waker-fn", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -748,6 +1257,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + [[package]] name = "linux-raw-sys" version = "0.4.3" @@ -769,6 +1284,9 @@ name = "log" version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +dependencies = [ + "value-bag", +] [[package]] name = "matchers" @@ -813,7 +1331,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" dependencies = [ "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys", ] @@ -855,6 +1373,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-traits" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -930,6 +1457,29 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "p12" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4873306de53fe82e7e484df31e1e947d61514b6ea2ed6cd7b45d63006fd9224" +dependencies = [ + "cbc", + "cipher", + "des", + "getrandom", + "hmac", + "lazy_static", + "rc2", + "sha1", + "yasna", +] + +[[package]] +name = "parking" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" + [[package]] name = "parking_lot" version = "0.12.1" @@ -1011,6 +1561,26 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +[[package]] +name = "pin-project" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "030ad2bc4db10a8944cb0d837f158bdfec4d4a4873ab701a95046770d11f8842" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.28", +] + [[package]] name = "pin-project-lite" version = "0.2.10" @@ -1023,12 +1593,40 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pinky-swear" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d894b67aa7a4bf295db5e85349078c604edaa6fa5c8721e8eca3c7729a27f2ac" +dependencies = [ + "doc-comment", + "flume", + "parking_lot", + "tracing", +] + [[package]] name = "pkg-config" version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys", +] + [[package]] name = "proc-macro2" version = "1.0.66" @@ -1066,6 +1664,26 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +[[package]] +name = "rc2" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62c64daa8e9438b84aaae55010a93f396f8e60e3911590fcba770d04643fc1dd" +dependencies = [ + "cipher", +] + +[[package]] +name = "reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "438a4293e4d097556730f4711998189416232f009c137389e0f961d2bc0ddc58" +dependencies = [ + "async-trait", + "futures-core", + "futures-io", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -1163,10 +1781,26 @@ dependencies = [ "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "winreg", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin 0.5.2", + "untrusted", + "web-sys", + "winapi", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -1179,6 +1813,20 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustix" +version = "0.37.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys", +] + [[package]] name = "rustix" version = "0.38.4" @@ -1188,10 +1836,65 @@ dependencies = [ "bitflags 2.3.3", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.3", "windows-sys", ] +[[package]] +name = "rustls" +version = "0.21.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb" +dependencies = [ + "log", + "ring", + "rustls-webpki", + "sct", +] + +[[package]] +name = "rustls-connector" +version = "0.18.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "060bcc1795b840d0e56d78f3293be5f652aa1611d249b0e63ffe19f4a8c9ae23" +dependencies = [ + "log", + "rustls", + "rustls-native-certs", + "rustls-webpki", +] + +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" +dependencies = [ + "base64", +] + +[[package]] +name = "rustls-webpki" +version = "0.101.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "513722fd73ad80a71f72b61009ea1b584bcfa1483ca93949c8f290298837fa59" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "ryu" version = "1.0.15" @@ -1213,6 +1916,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "security-framework" version = "2.9.2" @@ -1325,6 +2038,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.8" @@ -1350,6 +2072,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "subtle" version = "2.5.0" @@ -1389,6 +2126,18 @@ dependencies = [ "xattr", ] +[[package]] +name = "tcp-stream" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4da30af7998f51ee1aa48ab24276fe303a697b004e31ff542b192c088d5630a5" +dependencies = [ + "cfg-if", + "p12", + "rustls-connector", + "rustls-pemfile", +] + [[package]] name = "tempfile" version = "3.7.0" @@ -1396,9 +2145,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5486094ee78b2e5038a6382ed7645bc084dc2ec433426ca4c3cb61e2007b8998" dependencies = [ "cfg-if", - "fastrand", + "fastrand 2.0.0", "redox_syscall 0.3.5", - "rustix", + "rustix 0.38.4", "windows-sys", ] @@ -1450,6 +2199,17 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "time" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a" +dependencies = [ + "libc", + "wasi 0.10.0+wasi-snapshot-preview1", + "winapi", +] + [[package]] name = "time" version = "0.3.24" @@ -1494,11 +2254,25 @@ dependencies = [ "libc", "mio", "num_cpus", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", + "tokio-macros", "windows-sys", ] +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.28", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" @@ -1667,6 +2441,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "url" version = "2.4.0" @@ -1684,6 +2464,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-bag" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d92ccd67fb88503048c01b59152a04effd0782d035a83a6d256ce6085f08f4a3" + [[package]] name = "vcpkg" version = "0.2.15" @@ -1696,6 +2482,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "want" version = "0.3.1" @@ -1705,6 +2497,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.10.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -1777,6 +2575,19 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +[[package]] +name = "wasm-streams" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.64" @@ -1829,6 +2640,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -1949,6 +2769,12 @@ dependencies = [ "bindgen", ] +[[package]] +name = "yasna" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" + [[package]] name = "zip" version = "0.6.6" @@ -1965,7 +2791,7 @@ dependencies = [ "hmac", "pbkdf2", "sha1", - "time", + "time 0.3.24", "zstd", ] diff --git a/Cargo.toml b/Cargo.toml index 4ecb781..816c44a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,17 +4,26 @@ version = "0.1.0" edition = "2021" [dependencies] +async-compression = { version = "0.4.1", features = ["gzip", "tokio", "futures-io"] } +async-tar = "0.4.2" +async_zip = "0.0.15" +chrono = "0.4.26" env_logger = "0.10.0" figment = { version = "0.10.10", features = ["env", "toml"] } flate2 = "1.0.26" +futures = "0.3.28" +futures-io = "0.3.28" +lapin = "2.3.1" log = "0.4.17" once_cell = "1.18.0" parking_lot = "0.12.1" -reqwest = { version = "0.11.18", features = ["blocking", "json", "serde_json", "gzip"] } +reqwest = { version = "0.11.18", features = ["blocking", "json", "serde_json", "gzip", "stream"] } serde = { version = "1.0.178", features = ["derive"] } +serde_json = "1.0.104" tar = "0.4.39" thiserror = "1.0.44" threadpool = "1.8.1" +tokio = { version = "1.29.1", features = ["full"] } tracing = "0.1.37" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } yara = "0.20.0" diff --git a/src/app_config.rs b/src/app_config.rs index f275735..446d603 100644 --- a/src/app_config.rs +++ b/src/app_config.rs @@ -20,6 +20,8 @@ pub struct AppConfig { pub username: String, pub password: String, pub max_scan_size: u64, + pub amqp: String, + pub prefetch: u16, } impl Default for AppConfig { @@ -43,6 +45,8 @@ impl Default for AppConfig { load_duration: 60, wait_duration: 10, max_scan_size: 1.28e+8 as u64, // 128 MB + amqp: String::from("amqp://127.0.0.1:5672/%2f"), + prefetch: available_parallelism as u16, } } } diff --git a/src/client.rs b/src/client.rs index 81cb643..3051543 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,18 +1,18 @@ mod methods; mod models; +use lapin::{Consumer, Channel, options::{BasicPublishOptions, BasicQosOptions, QueueDeclareOptions, BasicConsumeOptions, ExchangeDeclareOptions, QueueBindOptions}, BasicProperties, Connection, types::FieldTable, message::Delivery, ExchangeKind}; pub use methods::*; pub use models::*; +use tokio::sync::{Mutex, RwLock}; +use tracing::{info, trace, warn, error, instrument}; +use yara::Compiler; -use crate::{error::DragonflyError, APP_CONFIG}; +use crate::{error::DragonflyError, scanner::DistributionFile, APP_CONFIG}; use flate2::read::GzDecoder; -use parking_lot::{Condvar, Mutex, RwLock}; -use reqwest::{blocking::Client, StatusCode, Url}; -use std::{ - io::{Cursor, Read}, - time::Duration, -}; -use tracing::{error, info, trace, warn}; +use reqwest::{Url, Client}; +use std::io::{Cursor, Read}; +use futures::StreamExt; /// Type alias representing a tar archive pub type TarballType = tar::Archive>>; @@ -20,209 +20,265 @@ pub type TarballType = tar::Archive>>; /// Type alias representing a zip archive pub type ZipType = zip::ZipArchive>>; -pub struct AuthState { - pub access_token: RwLock, - pub authenticating: Mutex, - pub cvar: Condvar, +pub struct RulesState { + pub compiled_rules: yara::Rules, + pub commit_hash: String, } -pub struct RulesState { - pub rules: yara::Rules, - pub hash: String, +impl TryFrom for RulesState { + type Error = yara::Error; + + fn try_from(rules_response: RulesResponse) -> Result { + let rules_str = rules_response + .rules + .values() + .map(String::as_ref) + .collect::>() + .join("\n"); + + let compiled_rules = Compiler::new()?.add_rules_str(&rules_str)?.compile_rules()?; + let commit_hash = rules_response.hash; + + Ok(RulesState { compiled_rules, commit_hash }) + } } -#[warn(clippy::module_name_repetitions)] -pub struct DragonflyClient { - pub client: Client, - pub authentication_state: AuthState, - pub rules_state: RwLock, +pub struct AuthenticationState { + pub access_token: String, + pub expires_in: tokio::time::Duration, } -impl DragonflyClient { - pub fn new() -> Result { - let client = Client::builder().gzip(true).build()?; +impl From for AuthenticationState { + fn from(authentication_response: AuthenticationResponse) -> Self { + let access_token = authentication_response.access_token; + let expires_in = tokio::time::Duration::from_secs(u64::from(authentication_response.expires_in)); + Self { access_token, expires_in } + } +} - let auth_response = fetch_access_token(&client)?; - let rules_response = fetch_rules(&client, &auth_response.access_token)?; +pub struct DragonflyClient { + http_client: Client, + consumer: Mutex, + rule_updates_consumer: Mutex, + channel: Channel, + + pub authentication: RwLock, + pub rules: RwLock, +} - let auth_state = AuthState { - access_token: RwLock::new(auth_response.access_token), - authenticating: Mutex::new(false), - cvar: Condvar::new(), - }; - let rules_state = RwLock::new(RulesState { - rules: rules_response.compile()?, - hash: rules_response.hash, - }); +impl DragonflyClient { + pub async fn init(http_client: Client, amqp_connection: Connection) -> Result { + let channel = amqp_connection.create_channel().await?; + channel.basic_qos(APP_CONFIG.prefetch, BasicQosOptions::default()).await?; + info!("Created channel"); + + channel + .queue_declare( + "jobs", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await?; + info!("Declared incoming jobs queue"); + + channel + .queue_declare( + "results", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await?; + info!("Declared outgoing results queue"); + + channel + .exchange_declare( + "rule_updates", + ExchangeKind::Fanout, + ExchangeDeclareOptions::default(), + FieldTable::default() + ) + .await?; + info!("Declared rule_updates exchange"); + + let rule_updates_queue = channel + .queue_declare( + "", + QueueDeclareOptions { + auto_delete: true, + durable: false, + exclusive: true, + nowait: false, + passive: false, + }, + FieldTable::default(), + ) + .await?; + info!("Declared exlusive rule updates queue (name: {})", rule_updates_queue.name()); + + channel + .queue_bind( + rule_updates_queue.name().as_str(), + "rule_updates", + "", + QueueBindOptions::default(), + FieldTable::default() + ) + .await?; + info!("Bound rules update queue (name: {}) to exchange rules_updated", rule_updates_queue.name()); + + let rule_updates_consumer = channel + .basic_consume( + rule_updates_queue.name().as_str(), + "rules_update", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + let consumer = channel + .basic_consume( + "jobs", + "incoming_jobs", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + trace!("Performing initial authentication"); + // First fetch an access token + let authentication_response = fetch_access_token(&http_client).await?; + let authentication_state = AuthenticationState::from(authentication_response); + info!("Successfully performed initial authentication"); + + trace!("Fetching initial rules"); + // Then use the access token to fetch and compile YARA rules + let rules_response = fetch_rules(&http_client, &authentication_state.access_token).await?; + let rules_state = RulesState::try_from(rules_response)?; + info!("Successfully fetched initial rules"); Ok(Self { - client, - authentication_state: auth_state, - rules_state, + http_client, + consumer: Mutex::new(consumer), + rule_updates_consumer: Mutex::new(rule_updates_consumer), + channel, + + authentication: RwLock::new(authentication_state), + + rules: RwLock::new(rules_state), }) } - /// Update the state with a new access token. - /// - /// If an error occurs while reauthenticating, the function retries with an exponential backoff - /// described by the equation `min(10 * 60, 2^(x - 1))` where `x` is the number of failed tries. - pub fn reauthenticate(&self) { - trace!("Trying to lock to check if we're authenticating."); - let mut authing = self.authentication_state.authenticating.lock(); - trace!("Acquired lock"); - if *authing { - trace!("Another thread is authenticating. Waiting for it to finish."); - self.authentication_state.cvar.wait(&mut authing); - trace!("Was notified, returning"); - return; + /// Receive a delivery from the message queue + pub async fn receive_delivery(&self) -> Result { + match self.consumer.lock().await.next().await { + Some(Ok(delivery)) => Ok(delivery), + Some(Err(err)) => Err(DragonflyError::from(err)), + None => Err(DragonflyError::ConsumerCancelled), } - trace!("No other thread is authenticating. Trying to reauthenticate."); - *authing = true; - drop(authing); + } + + /// Receive a delivery from the rule updates queue + pub async fn receive_rule_update(&self) -> Result { + match self.rule_updates_consumer.lock().await.next().await { + Some(Ok(delivery)) => Ok(delivery), + Some(Err(err)) => Err(DragonflyError::from(err)), + None => Err(DragonflyError::ConsumerCancelled), + } + } + + pub async fn push_results(&self, results: &SubmitJobResultsSuccess) -> Result<(), DragonflyError> { + let serialized_results = serde_json::to_string(&results)?; + self.channel.basic_publish( + "", + "results", + BasicPublishOptions::default(), + serialized_results.as_bytes(), + BasicProperties::default(), + ).await?; - let access_token; + Ok(()) + } + + #[instrument(skip(self), name = "reauthenticating")] + pub async fn reauthenticate(&self) { + trace!("Waiting for write lock"); + let mut lock = self.authentication.write().await; + info!("Acquired write lock"); let base = 2_f64; let initial_timeout = 1_f64; let mut tries = 0; - loop { - let r = fetch_access_token(self.get_http_client()); - match r { - Ok(authentication_response) => { - access_token = authentication_response.access_token; - break; - } + let authentication_response = loop { + match fetch_access_token(&self.http_client).await { + Ok(authentication_response) => break authentication_response, Err(e) => { let sleep_time = if tries < 10 { let t = initial_timeout * base.powf(f64::from(tries)); - warn!("Failed to reauthenticate after {tries} tries! Error: {e:#?}. Trying again in {t:.3} seconds"); + warn!("Failed after {tries} tries! Error: {e:#?}. Trying again in {t:.3} seconds"); t } else { - error!("Failed to reauthenticate after {tries} tries! Error: {e:#?}. Trying again in 600.000 seconds"); + error!("Failed after {tries} tries! Error: {e:#?}. Trying again in 600.000 seconds"); 600_f64 }; - std::thread::sleep(Duration::from_secs_f64(sleep_time)); + tokio::time::sleep(std::time::Duration::from_secs(sleep_time as u64)).await; tries += 1; } } - } - - trace!("Successfully got new access token!"); + }; - *self.authentication_state.access_token.write() = access_token; + trace!("Got new access token!"); - let mut authing = self.authentication_state.authenticating.lock(); - *authing = false; - self.authentication_state.cvar.notify_all(); + *lock = AuthenticationState::from(authentication_response); info!("Successfully reauthenticated."); } - /// Update the global ruleset. Waits for a write lock. - pub fn update_rules(&self) -> Result<(), DragonflyError> { - let response = match fetch_rules( - self.get_http_client(), - &self.authentication_state.access_token.read(), - ) { - Err(err) if err.status() == Some(StatusCode::UNAUTHORIZED) => { - info!("Got 401 UNAUTHORIZED while updating rules"); - self.reauthenticate(); - info!("Fetching rules again..."); - fetch_rules( - self.get_http_client(), - &self.authentication_state.access_token.read(), - ) - } - - Ok(response) => Ok(response), - - Err(err) => Err(err), - }?; - - let mut rules_state = self.rules_state.write(); - rules_state.rules = response.compile()?; - rules_state.hash = response.hash; + #[instrument(skip(self), name = "updating_rules")] + pub async fn update_rules(&self) { + trace!("Waiting for write lock"); + let mut lock = self.rules.write().await; + info!("Acquired write lock"); - Ok(()) - } - - pub fn bulk_get_job(&self, n_jobs: usize) -> reqwest::Result> { - let access_token = self.authentication_state.access_token.read(); - match fetch_bulk_job(self.get_http_client(), &access_token, n_jobs) { - Err(err) if err.status() == Some(StatusCode::UNAUTHORIZED) => { - drop(access_token); // Drop the read lock - info!("Got 401 UNAUTHORIZED while doing a bulk fetch job request"); - self.reauthenticate(); - info!("Doing a bulk fetch job again..."); - fetch_bulk_job( - self.get_http_client(), - &self.authentication_state.access_token.read(), - n_jobs, - ) - } - - other => other, - } - } - - /// Report an error to the server. - pub fn send_error(&self, body: &SubmitJobResultsError) -> reqwest::Result<()> { - let access_token = self.authentication_state.access_token.read(); - match send_error(self.get_http_client(), &access_token, body) { - Err(http_err) if http_err.status() == Some(StatusCode::UNAUTHORIZED) => { - drop(access_token); // Drop the read lock - info!("Got 401 UNAUTHORIZED while sending success"); - self.reauthenticate(); - info!("Sending error body again..."); - send_error( - self.get_http_client(), - &self.authentication_state.access_token.read(), - body, - ) - } - - other => other, - } - } + let base = 2_f64; + let initial_timeout = 1_f64; + let mut tries = 0; - /// Submit the results of a scan to the server, given the job and the scan results of each - /// distribution - pub fn send_success(&self, body: &SubmitJobResultsSuccess) -> reqwest::Result<()> { - let access_token = self.authentication_state.access_token.read(); - match send_success(self.get_http_client(), &access_token, body) { - Err(http_err) if http_err.status() == Some(StatusCode::UNAUTHORIZED) => { - drop(access_token); // Drop the read lock - info!("Got 401 UNAUTHORIZED while sending success"); - self.reauthenticate(); - info!("Sending success body again..."); - send_success( - self.get_http_client(), - &self.authentication_state.access_token.read(), - body, - ) - } + info!("Fetching rules"); + let rules_state = loop { + match fetch_rules(&self.http_client, &self.authentication.read().await.access_token).await { + Ok(rules_response) => match RulesState::try_from(rules_response) { + Ok(rules_state) => break rules_state, + Err(yara_error) => error!("YARA error: {yara_error}"), + } + Err(http_err) => error!("HTTP Error: {http_err}"), + }; + + let sleep_time = if tries < 10 { + initial_timeout * base.powf(f64::from(tries)) + } else { + 600_f64 + }; + + warn!("Failed in {tries} attempts, trying again in {sleep_time} seconds"); + tokio::time::sleep(std::time::Duration::from_secs(sleep_time as u64)).await; + tries += 1; + }; + info!("Got new rules"); - other => other, - } - } + *lock = rules_state; - /// Return a reference to the underlying HTTP Client - pub fn get_http_client(&self) -> &Client { - &self.client + info!("Successfully updated rules."); } } -pub fn fetch_tarball( - http_client: &Client, - download_url: &Url, -) -> Result { - let response = http_client.get(download_url.clone()).send()?; +pub fn fetch_tarball(download_url: &Url) -> Result { + let response = reqwest::blocking::get(download_url.clone())?; - let decompressed = GzDecoder::new(response); let mut cursor: Cursor> = Cursor::new(Vec::new()); + let decompressed = GzDecoder::new(response); decompressed .take(APP_CONFIG.max_scan_size) .read_to_end(cursor.get_mut())?; @@ -230,8 +286,8 @@ pub fn fetch_tarball( Ok(tar::Archive::new(cursor)) } -pub fn fetch_zipfile(http_client: &Client, download_url: &Url) -> Result { - let response = http_client.get(download_url.to_string()).send()?; +pub fn fetch_zipfile(download_url: &Url) -> Result { + let response = reqwest::blocking::get(download_url.to_string())?; let mut cursor = Cursor::new(Vec::new()); response @@ -240,3 +296,14 @@ pub fn fetch_zipfile(http_client: &Client, download_url: &Url) -> Result Result { + let parsed_download_url: Url = download_url.parse().unwrap(); + if download_url.ends_with(".tar.gz") { + let tar = fetch_tarball(&parsed_download_url)?; + Ok(DistributionFile::Tar(tar)) + } else { + let zip = fetch_zipfile(&parsed_download_url)?; + Ok(DistributionFile::Zip(zip)) + } +} diff --git a/src/client/methods.rs b/src/client/methods.rs index 11b0ee6..f1ed34d 100644 --- a/src/client/methods.rs +++ b/src/client/methods.rs @@ -1,11 +1,12 @@ -use super::models; +use reqwest::Client; + +use crate::app_config::APP_CONFIG; -use crate::APP_CONFIG; -use reqwest::blocking::Client; +use super::models; -pub fn fetch_access_token(http_client: &Client) -> reqwest::Result { +pub async fn fetch_access_token(http_client: &Client) -> reqwest::Result { let url = format!("https://{}/oauth/token", APP_CONFIG.auth0_domain); - let json_body = models::AuthBody { + let json_body = models::AuthenticationBody { client_id: &APP_CONFIG.client_id, client_secret: &APP_CONFIG.client_secret, audience: &APP_CONFIG.audience, @@ -17,63 +18,23 @@ pub fn fetch_access_token(http_client: &Client) -> reqwest::Result reqwest::Result> { - http_client - .post(format!("{}/jobs", APP_CONFIG.base_url)) - .header("Authorization", format!("Bearer {access_token}")) - .query(&[("batch", n_jobs)]) - .send()? - .error_for_status()? - .json() -} - -pub fn fetch_rules( +pub async fn fetch_rules( http_client: &Client, access_token: &str, ) -> reqwest::Result { http_client .get(format!("{}/rules", APP_CONFIG.base_url)) .header("Authorization", format!("Bearer {access_token}")) - .send()? + .send() + .await? .error_for_status()? .json() -} - -pub fn send_success( - http_client: &Client, - access_token: &str, - body: &models::SubmitJobResultsSuccess, -) -> reqwest::Result<()> { - http_client - .put(format!("{}/package", APP_CONFIG.base_url)) - .header("Authorization", format!("Bearer {access_token}")) - .json(&body) - .send()? - .error_for_status()?; - - Ok(()) -} - -pub fn send_error( - http_client: &Client, - access_token: &str, - body: &models::SubmitJobResultsError, -) -> reqwest::Result<()> { - http_client - .put(format!("{}/package", APP_CONFIG.base_url)) - .header("Authorization", format!("Bearer {access_token}")) - .json(&body) - .send()? - .error_for_status()?; - - Ok(()) + .await } diff --git a/src/client/models.rs b/src/client/models.rs index b9af879..3e93c56 100644 --- a/src/client/models.rs +++ b/src/client/models.rs @@ -1,10 +1,5 @@ -use serde::Serialize; -use serde::{self, Deserialize}; use std::collections::HashMap; -use std::fmt::Display; -use yara::{Compiler, Rules}; - -use crate::error::DragonflyError; +use serde::{Serialize, Deserialize}; #[derive(Debug, Serialize)] pub struct SubmitJobResultsSuccess { @@ -15,36 +10,10 @@ pub struct SubmitJobResultsSuccess { /// Contains all rule identifiers matched for the entire release. pub rules_matched: Vec, - - /// The commit hash of the ruleset used to produce these results. - pub commit: String, -} - -#[derive(Debug, Serialize)] -pub struct SubmitJobResultsError { - pub name: String, - pub version: String, - pub reason: String, -} - -impl Display for SubmitJobResultsError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - writeln!(f, "Name: {}", self.name)?; - writeln!(f, "Version: {}", self.version)?; - writeln!(f, "Reason: {}", self.reason)?; - - Ok(()) - } -} - -pub enum SubmitJobResultsBody { - Success(SubmitJobResultsSuccess), - Error(SubmitJobResultsError), } #[derive(Debug, Deserialize)] pub struct Job { - pub hash: String, pub name: String, pub version: String, pub distributions: Vec, @@ -56,33 +25,15 @@ pub struct RulesResponse { pub rules: HashMap, } -impl RulesResponse { - /// Compile the rules from the response - pub fn compile(&self) -> Result { - let rules_str = self - .rules - .values() - .map(String::as_ref) - .collect::>() - .join("\n"); - - let compiled_rules = Compiler::new()? - .add_rules_str(&rules_str)? - .compile_rules()?; - - Ok(compiled_rules) - } -} - #[derive(Debug, Deserialize)] -pub struct AuthResponse { +pub struct AuthenticationResponse { pub access_token: String, pub expires_in: u32, pub token_type: String, } #[derive(Debug, Serialize)] -pub struct AuthBody<'a> { +pub struct AuthenticationBody<'a> { pub client_id: &'a str, pub client_secret: &'a str, pub audience: &'a str, diff --git a/src/error.rs b/src/error.rs index 5f34ea3..cbd1138 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,7 +1,10 @@ use std::io; use figment::Error as ConfigError; +use lapin; +use serde_json; use thiserror::Error; +use tokio; use zip::result::ZipError; #[allow(clippy::module_name_repetitions)] @@ -43,6 +46,27 @@ pub enum DragonflyError { source: ConfigError, }, + #[error("AMQP Client Error: {source:#?}")] + LapinError { + #[from] + source: lapin::Error, + }, + + #[error("Unable to join panicked task: {source:#?}")] + JoinError { + #[from] + source: tokio::task::JoinError, + }, + + #[error("Unable to deserialize JSON: {source:#?}")] + SerdeJsonError { + #[from] + source: serde_json::Error, + }, + + #[error("Consumer cancelled")] + ConsumerCancelled, + #[allow(dead_code)] #[error("Download too large: '{0:#?}'")] DownloadTooLarge(String), diff --git a/src/main.rs b/src/main.rs index 9de73b8..6343f82 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,177 +1,153 @@ mod app_config; -mod client; mod error; mod exts; mod scanner; mod utils; +mod client; -use std::{ - sync::{ - mpsc::{self, SyncSender}, - Arc, - }, - time::Duration, -}; +use std::sync::Arc; +use std::time::Instant; use client::DragonflyClient; +use client::download_distribution; use error::DragonflyError; -use reqwest::blocking::Client; -use threadpool::ThreadPool; -use tracing::{debug, error, info, span, trace, Level}; +use lapin::{ + message::Delivery, + options::{ + BasicAckOptions, + BasicRejectOptions, + }, + Connection, ConnectionProperties, +}; +use reqwest::Client; +use scanner::{Distribution, DistributionScanResults}; +use serde_json; +use tracing::field; +use tracing::info_span; +use tracing::instrument; +use tracing::warn; +use tracing::{error, info, trace, trace_span, Instrument}; use tracing_subscriber::EnvFilter; -use yara::Rules; +use utils::create_inspector_url; use crate::{ app_config::APP_CONFIG, - client::{Job, SubmitJobResultsBody, SubmitJobResultsError}, - scanner::{scan_all_distributions, PackageScanResults}, + scanner::PackageScanResults, + client::Job, }; -/// The actual scanning logic. -/// -/// Takes the job to be scanned, the compiled rules, the commit hash being used, and the HTTP -/// client (for downloading distributions), and returns the `PackageScanResults`. -fn scanner( - http_client: &Client, - job: &Job, - rules: &Rules, - commit_hash: &str, -) -> Result { - let distribution_scan_results = scan_all_distributions(http_client, rules, job)?; - - let package_scan_result = PackageScanResults::new( - job.name.to_string(), - job.version.to_string(), - distribution_scan_results, - commit_hash.to_string(), - ); +fn runner(client: Arc, job: Job) -> Result { + let mut all_distribution_scan_results: Vec = Vec::new(); + for download_url in &job.distributions { + let _span = trace_span!("Scanner", distribution = download_url.split("/").last().unwrap_or("/")).entered(); + trace!("Downloading distribution"); + let file = download_distribution(download_url)?; + trace!("Successfully downloaded distribution"); + let inspector_url = create_inspector_url( + &job.name, + &job.version, + &download_url.clone().parse().unwrap(), + ); + let mut distribution = Distribution { + file, + inspector_url, + }; + trace!("Blocking until rules are available"); + let rules_state = client.rules.blocking_read(); + trace!("Got read lock on rules, starting scan"); + let start = Instant::now(); + let distribution_scan_result = distribution.scan(&rules_state.compiled_rules)?; + let end = Instant::now(); + let duration = end - start; + trace!("Finished scan! Took {}s {}ms ", duration.as_secs(), duration.subsec_millis()); + + all_distribution_scan_results.push(distribution_scan_result); + } - Ok(package_scan_result) + Ok(PackageScanResults { + name: job.name, + version: job.version, + distribution_scan_results: all_distribution_scan_results, + }) } -/// The job to run in the threadpool. -fn runner(client: &DragonflyClient, job: Job, tx: &SyncSender) { - let span = span!(Level::INFO, "Job", name = job.name, version = job.version); - let _enter = span.enter(); - let rules_state = client.rules_state.read(); - let send_result = match scanner( - client.get_http_client(), - &job, - &rules_state.rules, - &rules_state.hash, - ) { - Ok(package_scan_results) => tx.send(SubmitJobResultsBody::Success( - package_scan_results.build_body(), - )), - Err(err) => tx.send(SubmitJobResultsBody::Error(SubmitJobResultsError { - name: job.name, - version: job.version, - reason: format!("{err:#?}"), - })), - }; - - if send_result.is_err() { - error!("No more receivers listening across channel!"); +async fn handle_delivery( + client: Arc, + delivery: &Delivery, +) -> Result<(), DragonflyError> { + let job: Job = serde_json::from_slice(&delivery.data)?; + let span = info_span!("Job", name = &job.name, version = &job.version); + async move { + trace!("Spawning blocking scanner job in threadpool"); + let results = tokio::task::spawn_blocking({ + let client = Arc::clone(&client); + move || runner(client, job) + }).await??; + trace!("Scanner job in threadpool finished"); + trace!("Pushing results onto results queue"); + client.push_results(&results.build_body()).await?; + trace!("Ack'ing original delivery"); + delivery.ack(BasicAckOptions::default()).await?; + info!("Acknowledged delivery"); + + Ok(()) } + .instrument(span) + .await } -fn main() -> Result<(), DragonflyError> { +#[tokio::main] +async fn main() -> Result<(), DragonflyError> { tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .init(); - let client = Arc::new(DragonflyClient::new()?); - let (tx, rx) = mpsc::sync_channel(1024); + let http_client = Client::new(); - // We spawn `n_jobs` threads using a threadpool for processing jobs, +1 more thread to send the - // results. The main thread will handle requesting the jobs and submitting them to the threadpool. - let n_jobs = APP_CONFIG.threads; - let pool = ThreadPool::new(n_jobs); - debug!("Started threadpool with {} workers", n_jobs); + let connection = + Connection::connect(&APP_CONFIG.amqp, ConnectionProperties::default()).await?; + info!("Established connection to AMQP server"); + let client = Arc::new(DragonflyClient::init(http_client, connection).await?); - // Spawning the "sender" thread - std::thread::spawn({ + tokio::spawn({ let client = Arc::clone(&client); - trace!("Starting loader thread"); - move || loop { - match rx.recv() { - Ok(SubmitJobResultsBody::Success(success_body)) => { - let span = span!( - Level::INFO, - "Job", - name = success_body.name, - version = success_body.version - ); - let _enter = span.enter(); - - trace!("Received success body, sending upstream..."); - trace!("Success body: {success_body:#?}"); - if let Err(err) = client.send_success(&success_body) { - error!("Unexpected error while sending success: {err}"); - } else { - info!("Successfully sent success!"); - } - } - - Ok(SubmitJobResultsBody::Error(error_body)) => { - let span = span!( - Level::INFO, - "Job", - name = error_body.name, - version = error_body.version - ); - let _enter = span.enter(); + async move { + loop { + let sleep = client.authentication.read().await.expires_in - tokio::time::Duration::from_secs(10); + tokio::time::sleep(sleep).await; + client.reauthenticate().await; + } + } + }); - trace!("Received error body, sending upstream..."); - trace!("Error body: {error_body}"); - if let Err(err) = client.send_error(&error_body) { - error!("Unexpected error while sending error: {err}"); - } else { - info!("Successfully sent error!"); - } + tokio::spawn({ + let client = Arc::clone(&client); + async move { + loop { + match client.receive_rule_update().await { + Ok(_) => client.update_rules().await, + Err(err) => error!("Error from rule updates queue: {err}"), } - - Err(_) => error!("No more transmitters!"), } } }); loop { - info!("Fetching {} bulk jobs...", APP_CONFIG.bulk_size); - match client.bulk_get_job(APP_CONFIG.bulk_size) { - Ok(jobs) => { - if jobs.is_empty() { - debug!("Bulk job request returned no jobs"); - } - - info!("Successfully fetched {} jobs", jobs.len()); - - for job in jobs { - info!("Submitting {} v{} for execution", job.name, job.version); - let rules_state = client.rules_state.read(); - if job.hash != rules_state.hash { - info!( - "Must update rules, updating from {} to {}", - rules_state.hash, job.hash - ); - drop(rules_state); - if let Err(err) = client.update_rules() { - error!("Error while updating rules: {err}"); + trace!("Waiting for message"); + match client.receive_delivery().await { + Ok(delivery) => { + trace!("Message received"); + let client = Arc::clone(&client); + tokio::task::spawn(async move { + if let Err(err) = handle_delivery(Arc::clone(&client), &delivery).await { + error!("Error while scanning: {err}"); + match delivery.reject(BasicRejectOptions { requeue: false }).await { + Ok(()) => warn!("Rejected message"), + Err(err) => error!("Error while rejecting: {err}"), } } - - pool.execute({ - let client = Arc::clone(&client); - let tx = tx.clone(); - move || runner(&client, job, &tx) - }); - } - - trace!("Finished loading jobs into queue!"); + }); } - - Err(err) => error!("Unexpected HTTP error: {err}"), + Err(err) => error!("Error while consuming: {err}"), } - - std::thread::sleep(Duration::from_secs(APP_CONFIG.load_duration)); } } diff --git a/src/scanner.rs b/src/scanner.rs index 32a57fa..8aa1838 100644 --- a/src/scanner.rs +++ b/src/scanner.rs @@ -4,14 +4,13 @@ use std::{ path::{Path, PathBuf}, }; -use reqwest::{blocking::Client, Url}; +use reqwest::Url; use yara::Rules; use crate::{ - client::{fetch_tarball, fetch_zipfile, Job, SubmitJobResultsSuccess, TarballType, ZipType}, + client::{TarballType, ZipType, SubmitJobResultsSuccess}, error::DragonflyError, exts::RuleExt, - utils::create_inspector_url, }; #[derive(Debug, Hash, Eq, PartialEq)] @@ -39,7 +38,7 @@ impl FileScanResult { } /// Scan an archive format using Yara rules. -trait Scan { +pub trait Scan { fn scan(&mut self, rules: &Rules) -> Result, DragonflyError>; } @@ -75,15 +74,23 @@ impl Scan for ZipType { } } +pub enum DistributionFile { + Zip(ZipType), + Tar(TarballType), +} + /// A distribution consisting of an archive and an inspector url. -struct Distribution { - file: Box, - inspector_url: Url, +pub struct Distribution { + pub file: DistributionFile, + pub inspector_url: Url, } impl Distribution { - fn scan(&mut self, rules: &Rules) -> Result { - let results = self.file.scan(rules)?; + pub fn scan(&mut self, rules: &Rules) -> Result { + let results = match &mut self.file { + DistributionFile::Zip(zip_archive) => zip_archive.scan(&rules), + DistributionFile::Tar(tar_archive) => tar_archive.scan(&rules), + }?; Ok(DistributionScanResults::new( results, @@ -160,28 +167,14 @@ impl DistributionScanResults { } } +#[derive(Debug)] pub struct PackageScanResults { pub name: String, pub version: String, pub distribution_scan_results: Vec, - pub commit_hash: String, } impl PackageScanResults { - pub fn new( - name: String, - version: String, - distribution_scan_results: Vec, - commit_hash: String, - ) -> Self { - Self { - name, - version, - distribution_scan_results, - commit_hash, - } - } - /// Format the package scan results into something that can be sent over the API pub fn build_body(&self) -> SubmitJobResultsSuccess { let highest_score_distribution = self @@ -212,39 +205,10 @@ impl PackageScanResults { score, inspector_url, rules_matched, - commit: self.commit_hash.clone(), } } } -/// Scan all the distributions of the given job against the given ruleset -/// -/// Uses the provided HTTP client to download each distribution. -pub fn scan_all_distributions( - http_client: &Client, - rules: &Rules, - job: &Job, -) -> Result, DragonflyError> { - let mut distribution_scan_results = Vec::with_capacity(job.distributions.len()); - for distribution in &job.distributions { - let download_url: Url = distribution.parse().unwrap(); - let inspector_url = create_inspector_url(&job.name, &job.version, &download_url); - - let mut dist = Distribution { - file: if distribution.ends_with(".tar.gz") { - Box::new(fetch_tarball(http_client, &download_url)?) - } else { - Box::new(fetch_zipfile(http_client, &download_url)?) - }, - inspector_url, - }; - let distribution_scan_result = dist.scan(rules)?; - distribution_scan_results.push(distribution_scan_result); - } - - Ok(distribution_scan_results) -} - /// Scan a file given it implements `Read`. /// /// # Arguments