From 799b7eaf9cacd85e6f1b3c6d9158042de29870a2 Mon Sep 17 00:00:00 2001 From: Robin5605 Date: Sat, 12 Aug 2023 21:08:54 -0500 Subject: [PATCH 1/2] initial draft: rabbitmq --- Cargo.lock | 902 +++++++++++++++++++++++++++++++++++++++--- Cargo.toml | 9 + src/app_config.rs | 4 + src/client.rs | 362 ++++++++++++----- src/client/methods.rs | 67 +--- src/client/models.rs | 50 +-- src/error.rs | 21 + src/main.rs | 227 ++++++----- src/scanner.rs | 70 +--- 9 files changed, 1310 insertions(+), 402 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6c3aca5..d25f668 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,13 +30,87 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.0.5" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c378d78423fdad8089616f827526ee33c19f2fddbd5de1629152c9593ba4783" +checksum = "ea5d730647d4fadd988536d06fecce94b7b4f2a7efdae548f1cf4b63205518ab" dependencies = [ "memchr", ] +[[package]] +name = "amq-protocol" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d40d8b2465c7959dd40cee32ba6ac334b5de57e9fca0cc756759894a4152a5d" +dependencies = [ + "amq-protocol-tcp", + "amq-protocol-types", + "amq-protocol-uri", + "cookie-factory", + "nom", + "serde", +] + +[[package]] +name = "amq-protocol-tcp" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cb2100adae7da61953a2c3a01935d86caae13329fadce3333f524d6d6ce12e2" +dependencies = [ + "amq-protocol-uri", + "tcp-stream", + "tracing", +] + +[[package]] +name = "amq-protocol-types" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "156ff13c8a3ced600b4e54ed826a2ae6242b6069d00dd98466827cef07d3daff" +dependencies = [ + "cookie-factory", + "nom", + "serde", + "serde_json", +] + +[[package]] +name = "amq-protocol-uri" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "751bbd7d440576066233e740576f1b31fdc6ab86cfabfbd48c548de77eca73e4" +dependencies = [ + "amq-protocol-types", + "percent-encoding", + "url", +] + +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-compression" version = "0.4.3" @@ -45,17 +119,175 @@ checksum = "bb42b2197bf15ccb092b62c74515dbd8b86d0effd934795f6687c93b6e679a2c" dependencies = [ "flate2", "futures-core", + "futures-io", "memchr", "pin-project-lite", "tokio", ] +[[package]] +name = "async-executor" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fa3dc5f2a8564f07759c008b9109dc0d39de92a88d5588b8a5036d286383afb" +dependencies = [ + "async-lock", + "async-task", + "concurrent-queue", + "fastrand 1.9.0", + "futures-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + +[[package]] +name = "async-global-executor-trait" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33dd14c5a15affd2abcff50d84efd4009ada28a860f01c14f9d654f3e81b3f75" +dependencies = [ + "async-global-executor", + "async-trait", + "executor-trait", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix 0.37.23", + "slab", + "socket2 0.4.9", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6012d170ad00de56c9ee354aef2e358359deb1ec504254e0e5a3774771de0e" +dependencies = [ + "async-io", + "async-trait", + "futures-core", + "reactor-trait", +] + +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-channel", + "async-global-executor", + "async-io", + "async-lock", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-tar" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c49359998a76e32ef6e870dbc079ebad8f1e53e8441c5dd39d27b44493fe331" +dependencies = [ + "async-std", + "filetime", + "libc", + "pin-project", + "redox_syscall 0.2.16", + "xattr 0.2.3", +] + +[[package]] +name = "async-task" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" + +[[package]] +name = "async-trait" +version = "0.1.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + +[[package]] +name = "async_zip" +version = "0.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "795310de3218cde15219fc98c1cf7d8fe9db4865aab27fcf1d535d6cb61c6b54" +dependencies = [ + "crc32fast", + "futures-util", + "log", + "pin-project", + "thiserror", +] + [[package]] name = "atomic" version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" +[[package]] +name = "atomic-waker" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3" + [[package]] name = "autocfg" version = "1.1.0" @@ -132,11 +364,35 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + +[[package]] +name = "blocking" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "atomic-waker", + "fastrand 1.9.0", + "futures-lite", + "log", +] + [[package]] name = "bumpalo" -version = "3.13.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "byteorder" @@ -171,6 +427,15 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.0.83" @@ -196,6 +461,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets", +] + [[package]] name = "cipher" version = "0.4.4" @@ -217,12 +496,27 @@ dependencies = [ "libloading", ] +[[package]] +name = "concurrent-queue" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "constant_time_eq" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" +[[package]] +name = "cookie-factory" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" + [[package]] name = "core-foundation" version = "0.9.3" @@ -282,6 +576,15 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946" +[[package]] +name = "des" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdd80ce8ce993de27e9f063a444a4d53ce8e8db4c1f00cc03af5ad5a9867a1e" +dependencies = [ + "cipher", +] + [[package]] name = "digest" version = "0.10.7" @@ -293,21 +596,36 @@ dependencies = [ "subtle", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "dragonfly-client-rs" version = "0.1.0" dependencies = [ + "async-compression", + "async-tar", + "async_zip", + "chrono", "env_logger", "figment", "flate2", + "futures", + "futures-io", + "lapin", "log", "once_cell", "parking_lot", "reqwest", "serde", + "serde_json", "tar", "thiserror", "threadpool", + "tokio", "tracing", "tracing-subscriber", "yara", @@ -370,6 +688,30 @@ dependencies = [ "libc", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "executor-trait" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a1052dd43212a7777ec6a69b117da52f5e52f07aec47d00c1a2b33b85d06b08" +dependencies = [ + "async-trait", +] + +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.0.0" @@ -398,7 +740,7 @@ checksum = "d4029edd3e734da6fe05b6cd7bd2960760a616bd2ddd0d59a0124746d6272af0" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.3.5", "windows-sys", ] @@ -412,6 +754,18 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "pin-project", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -442,6 +796,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.28" @@ -449,6 +818,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -457,12 +827,49 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +[[package]] +name = "futures-executor" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + +[[package]] +name = "futures-macro" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "futures-sink" version = "0.3.28" @@ -481,8 +888,11 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", + "futures-sink", "futures-task", "memchr", "pin-project-lite", @@ -500,6 +910,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "gimli" version = "0.28.0" @@ -512,6 +933,18 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.3.21" @@ -545,9 +978,9 @@ checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" [[package]] name = "hermit-abi" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" +checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" [[package]] name = "hmac" @@ -644,6 +1077,29 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "iana-time-zone" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fad5b825842d2b38bd206f3e81d6957625fd7f0a361e345c30e01a0ae2dd613" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "idna" version = "0.4.0" @@ -686,9 +1142,30 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" dependencies = [ + "block-padding", "generic-array", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys", +] + [[package]] name = "ipnet" version = "2.8.0" @@ -702,7 +1179,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi", - "rustix", + "rustix 0.38.14", "windows-sys", ] @@ -730,6 +1207,37 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + +[[package]] +name = "lapin" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f3067a1fcfbc3fc46455809c023e69b8f6602463201010f4ae5a3b572adb9dc" +dependencies = [ + "amq-protocol", + "async-global-executor-trait", + "async-reactor-trait", + "async-trait", + "executor-trait", + "flume", + "futures-core", + "futures-io", + "parking_lot", + "pinky-swear", + "reactor-trait", + "serde", + "tracing", + "waker-fn", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -758,6 +1266,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + [[package]] name = "linux-raw-sys" version = "0.4.7" @@ -779,6 +1293,9 @@ name = "log" version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +dependencies = [ + "value-bag", +] [[package]] name = "matchers" @@ -865,6 +1382,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-traits" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -913,7 +1439,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -940,6 +1466,29 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "p12" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4873306de53fe82e7e484df31e1e947d61514b6ea2ed6cd7b45d63006fd9224" +dependencies = [ + "cbc", + "cipher", + "des", + "getrandom", + "hmac", + "lazy_static", + "rc2", + "sha1", + "yasna", +] + +[[package]] +name = "parking" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" + [[package]] name = "parking_lot" version = "0.12.1" @@ -958,7 +1507,7 @@ checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.3.5", "smallvec", "windows-targets", ] @@ -1006,7 +1555,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -1021,6 +1570,26 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +[[package]] +name = "pin-project" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -1033,12 +1602,40 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pinky-swear" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d894b67aa7a4bf295db5e85349078c604edaa6fa5c8721e8eca3c7729a27f2ac" +dependencies = [ + "doc-comment", + "flume", + "parking_lot", + "tracing", +] + [[package]] name = "pkg-config" version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys", +] + [[package]] name = "proc-macro2" version = "1.0.67" @@ -1056,7 +1653,7 @@ checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", "version_check", "yansi", ] @@ -1076,6 +1673,35 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +[[package]] +name = "rc2" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62c64daa8e9438b84aaae55010a93f396f8e60e3911590fcba770d04643fc1dd" +dependencies = [ + "cipher", +] + +[[package]] +name = "reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "438a4293e4d097556730f4711998189416232f009c137389e0f961d2bc0ddc58" +dependencies = [ + "async-trait", + "futures-core", + "futures-io", +] + +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.3.5" @@ -1168,6 +1794,21 @@ dependencies = [ "winreg", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin 0.5.2", + "untrusted", + "web-sys", + "winapi", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -1182,17 +1823,86 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" [[package]] name = "rustix" -version = "0.38.13" +version = "0.37.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys", +] + +[[package]] +name = "rustix" +version = "0.38.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7db8590df6dfcd144d22afd1b83b36c21a18d7cbc1dc4bb5295a8712e9eb662" +checksum = "747c788e9ce8e92b12cd485c49ddf90723550b654b32508f979b71a7b1ecda4f" dependencies = [ "bitflags 2.4.0", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.7", "windows-sys", ] +[[package]] +name = "rustls" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" +dependencies = [ + "log", + "ring", + "rustls-webpki", + "sct", +] + +[[package]] +name = "rustls-connector" +version = "0.18.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "060bcc1795b840d0e56d78f3293be5f652aa1611d249b0e63ffe19f4a8c9ae23" +dependencies = [ + "log", + "rustls", + "rustls-native-certs", + "rustls-webpki", +] + +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" +dependencies = [ + "base64", +] + +[[package]] +name = "rustls-webpki" +version = "0.101.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "ryu" version = "1.0.15" @@ -1214,6 +1924,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "security-framework" version = "2.9.2" @@ -1254,14 +1974,14 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] name = "serde_json" -version = "1.0.106" +version = "1.0.107" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cc66a619ed80bf7a0f6b17dd063a84b88f6dea1813737cf469aef1d081142c2" +checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" dependencies = [ "itoa", "ryu", @@ -1291,9 +2011,9 @@ dependencies = [ [[package]] name = "sha1" -version = "0.10.5" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", "cpufeatures", @@ -1326,6 +2046,15 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7cee0529a6d40f580e7a5e6c495c8fbfe21b7b52795ed4bb5e62cdf92bc6380" +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.9" @@ -1337,9 +2066,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" [[package]] name = "socket2" @@ -1361,6 +2090,21 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "subtle" version = "2.5.0" @@ -1380,9 +2124,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.33" +version = "2.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9caece70c63bfba29ec2fed841a09851b14a235c60010fa4de58089b6c025668" +checksum = "7303ef2c05cd654186cb250d29049a24840ca25d2747c25c0381c8d9e2f582e8" dependencies = [ "proc-macro2", "quote", @@ -1397,7 +2141,19 @@ checksum = "b16afcea1f22891c49a00c751c7b63b2233284064f11a200fc624137c51e2ddb" dependencies = [ "filetime", "libc", - "xattr", + "xattr 1.0.1", +] + +[[package]] +name = "tcp-stream" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4da30af7998f51ee1aa48ab24276fe303a697b004e31ff542b192c088d5630a5" +dependencies = [ + "cfg-if", + "p12", + "rustls-connector", + "rustls-pemfile", ] [[package]] @@ -1407,17 +2163,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" dependencies = [ "cfg-if", - "fastrand", - "redox_syscall", - "rustix", + "fastrand 2.0.0", + "redox_syscall 0.3.5", + "rustix 0.38.14", "windows-sys", ] [[package]] name = "termcolor" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" +checksum = "6093bad37da69aab9d123a8091e4be0aa4a03e4d601ec641c327398315f62b64" dependencies = [ "winapi-util", ] @@ -1439,7 +2195,7 @@ checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -1463,9 +2219,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48" +checksum = "426f806f4089c493dcac0d24c29c01e2c38baf8e30f1b716ee37e83d200b18fe" dependencies = [ "deranged", "serde", @@ -1474,9 +2230,9 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "tinyvec" @@ -1504,11 +2260,25 @@ dependencies = [ "libc", "mio", "num_cpus", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2 0.5.4", + "tokio-macros", "windows-sys", ] +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" @@ -1521,9 +2291,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" dependencies = [ "bytes", "futures-core", @@ -1593,7 +2363,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -1643,9 +2413,9 @@ checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" [[package]] name = "typenum" -version = "1.16.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" +checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "uncased" @@ -1677,6 +2447,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "url" version = "2.4.1" @@ -1694,6 +2470,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-bag" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d92ccd67fb88503048c01b59152a04effd0782d035a83a6d256ce6085f08f4a3" + [[package]] name = "vcpkg" version = "0.2.15" @@ -1706,6 +2488,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "want" version = "0.3.1" @@ -1742,7 +2530,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", "wasm-bindgen-shared", ] @@ -1776,7 +2564,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -1806,7 +2594,7 @@ dependencies = [ "either", "home", "once_cell", - "rustix", + "rustix 0.38.14", ] [[package]] @@ -1827,9 +2615,9 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] name = "winapi-util" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" dependencies = [ "winapi", ] @@ -1840,6 +2628,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -1925,6 +2722,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "xattr" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d1526bbe5aaeb5eb06885f4d987bcdfa5e23187055de9b83fe00156a821fabc" +dependencies = [ + "libc", +] + [[package]] name = "xattr" version = "1.0.1" @@ -1961,6 +2767,12 @@ dependencies = [ "bindgen", ] +[[package]] +name = "yasna" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" + [[package]] name = "zip" version = "0.6.6" diff --git a/Cargo.toml b/Cargo.toml index ba90f54..f17d850 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,17 +4,26 @@ version = "0.1.0" edition = "2021" [dependencies] +async-compression = { version = "0.4.1", features = ["gzip", "tokio", "futures-io"] } +async-tar = "0.4.2" +async_zip = "0.0.15" +chrono = "0.4.26" env_logger = "0.10.0" figment = { version = "0.10.10", features = ["env", "toml"] } flate2 = "1.0.27" +futures = "0.3.28" +futures-io = "0.3.28" +lapin = "2.3.1" log = "0.4.20" once_cell = "1.18.0" parking_lot = "0.12.1" reqwest = { version = "0.11.20", features = ["blocking", "json", "serde_json", "gzip"] } serde = { version = "1.0.188", features = ["derive"] } +serde_json = "1.0.107" tar = "0.4.40" thiserror = "1.0.48" threadpool = "1.8.1" +tokio = { version = "1.29.1", features = ["full"] } tracing = "0.1.37" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } yara = "0.20.0" diff --git a/src/app_config.rs b/src/app_config.rs index e62a33f..a1c3422 100644 --- a/src/app_config.rs +++ b/src/app_config.rs @@ -19,6 +19,8 @@ pub struct AppConfig { pub username: String, pub password: String, pub max_scan_size: u64, + pub amqp: String, + pub prefetch: u16, } impl Default for AppConfig { @@ -41,6 +43,8 @@ impl Default for AppConfig { bulk_size: 20, load_duration: 60, max_scan_size: 1.28e+8 as u64, // 128 MB + amqp: String::from("amqp://127.0.0.1:5672/%2f"), + prefetch: available_parallelism as u16, } } } diff --git a/src/client.rs b/src/client.rs index 9693095..8a6d7bd 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,18 +1,26 @@ mod methods; mod models; +use lapin::{ + message::Delivery, + options::{ + BasicConsumeOptions, BasicPublishOptions, BasicQosOptions, ExchangeDeclareOptions, + QueueBindOptions, QueueDeclareOptions, + }, + types::FieldTable, + BasicProperties, Channel, Connection, Consumer, ExchangeKind, +}; pub use methods::*; pub use models::*; +use tokio::sync::{Mutex, RwLock}; +use tracing::{error, info, instrument, trace, warn}; +use yara::Compiler; -use crate::{error::DragonflyError, APP_CONFIG}; +use crate::{error::DragonflyError, scanner::DistributionFile, APP_CONFIG}; use flate2::read::GzDecoder; -use parking_lot::RwLock; -use reqwest::{blocking::Client, Url}; -use std::{ - io::{Cursor, Read}, - time::Duration, -}; -use tracing::{error, info, trace, warn}; +use futures::StreamExt; +use reqwest::{Client, Url}; +use std::io::{Cursor, Read}; /// Type alias representing a tar archive pub type TarballType = tar::Archive>>; @@ -20,144 +28,295 @@ pub type TarballType = tar::Archive>>; /// Type alias representing a zip archive pub type ZipType = zip::ZipArchive>>; -pub struct AuthState { +pub struct RulesState { + pub compiled_rules: yara::Rules, + pub commit_hash: String, +} + +impl TryFrom for RulesState { + type Error = yara::Error; + + fn try_from(rules_response: RulesResponse) -> Result { + let rules_str = rules_response + .rules + .values() + .map(String::as_ref) + .collect::>() + .join("\n"); + + let compiled_rules = Compiler::new()? + .add_rules_str(&rules_str)? + .compile_rules()?; + let commit_hash = rules_response.hash; + + Ok(RulesState { + compiled_rules, + commit_hash, + }) + } +} + +pub struct AuthenticationState { pub access_token: String, - pub expires_in: u32, + pub expires_in: tokio::time::Duration, } -pub struct RulesState { - pub rules: yara::Rules, - pub hash: String, +impl From for AuthenticationState { + fn from(authentication_response: AuthenticationResponse) -> Self { + let access_token = authentication_response.access_token; + let expires_in = + tokio::time::Duration::from_secs(u64::from(authentication_response.expires_in)); + Self { + access_token, + expires_in, + } + } } -#[warn(clippy::module_name_repetitions)] +#[allow(clippy::module_name_repetitions)] pub struct DragonflyClient { - pub client: Client, - pub authentication_state: RwLock, - pub rules_state: RwLock, + http_client: Client, + consumer: Mutex, + rule_updates_consumer: Mutex, + channel: Channel, + + pub authentication: RwLock, + pub rules: RwLock, } impl DragonflyClient { - pub fn new() -> Result { - let client = Client::builder().gzip(true).build()?; + pub async fn init( + http_client: Client, + amqp_connection: Connection, + ) -> Result { + let channel = amqp_connection.create_channel().await?; + channel + .basic_qos(APP_CONFIG.prefetch, BasicQosOptions::default()) + .await?; + info!("Created channel"); + + channel + .queue_declare( + "jobs", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await?; + info!("Declared incoming jobs queue"); + + channel + .queue_declare( + "results", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await?; + info!("Declared outgoing results queue"); + + channel + .exchange_declare( + "rule_updates", + ExchangeKind::Fanout, + ExchangeDeclareOptions::default(), + FieldTable::default(), + ) + .await?; + info!("Declared rule_updates exchange"); - let auth_response = fetch_access_token(&client)?; - let rules_response = fetch_rules(&client, &auth_response.access_token)?; + let rule_updates_queue = channel + .queue_declare( + "", + QueueDeclareOptions { + auto_delete: true, + durable: false, + exclusive: true, + nowait: false, + passive: false, + }, + FieldTable::default(), + ) + .await?; + info!( + "Declared exlusive rule updates queue (name: {})", + rule_updates_queue.name() + ); - let authentication_state = RwLock::new(AuthState { - access_token: auth_response.access_token, - expires_in: auth_response.expires_in, - }); + channel + .queue_bind( + rule_updates_queue.name().as_str(), + "rule_updates", + "", + QueueBindOptions::default(), + FieldTable::default(), + ) + .await?; + info!( + "Bound rules update queue (name: {}) to exchange rules_updated", + rule_updates_queue.name() + ); - let rules_state = RwLock::new(RulesState { - rules: rules_response.compile()?, - hash: rules_response.hash, - }); + let rule_updates_consumer = channel + .basic_consume( + rule_updates_queue.name().as_str(), + "rules_update", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + let consumer = channel + .basic_consume( + "jobs", + "incoming_jobs", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + trace!("Performing initial authentication"); + // First fetch an access token + let authentication_response = fetch_access_token(&http_client).await?; + let authentication_state = AuthenticationState::from(authentication_response); + info!("Successfully performed initial authentication"); + + trace!("Fetching initial rules"); + // Then use the access token to fetch and compile YARA rules + let rules_response = fetch_rules(&http_client, &authentication_state.access_token).await?; + let rules_state = RulesState::try_from(rules_response)?; + info!("Successfully fetched initial rules"); Ok(Self { - client, - authentication_state, - rules_state, + http_client, + consumer: Mutex::new(consumer), + rule_updates_consumer: Mutex::new(rule_updates_consumer), + channel, + + authentication: RwLock::new(authentication_state), + + rules: RwLock::new(rules_state), }) } - /// Update the state with a new access token. - /// - /// If an error occurs while reauthenticating, the function retries with an exponential backoff - /// described by the equation `min(10 * 60, 2^(x - 1))` where `x` is the number of failed tries. - pub fn reauthenticate(&self) { - trace!("Waiting for write lock on authentication state"); - let mut state = self.authentication_state.write(); - trace!("Acquired lock"); + /// Receive a delivery from the message queue + pub async fn receive_delivery(&self) -> Result { + match self.consumer.lock().await.next().await { + Some(Ok(delivery)) => Ok(delivery), + Some(Err(err)) => Err(DragonflyError::from(err)), + None => Err(DragonflyError::ConsumerCancelled), + } + } + + /// Receive a delivery from the rule updates queue + pub async fn receive_rule_update(&self) -> Result { + match self.rule_updates_consumer.lock().await.next().await { + Some(Ok(delivery)) => Ok(delivery), + Some(Err(err)) => Err(DragonflyError::from(err)), + None => Err(DragonflyError::ConsumerCancelled), + } + } + + pub async fn push_results( + &self, + results: &SubmitJobResultsSuccess, + ) -> Result<(), DragonflyError> { + let serialized_results = serde_json::to_string(&results)?; + self.channel + .basic_publish( + "", + "results", + BasicPublishOptions::default(), + serialized_results.as_bytes(), + BasicProperties::default(), + ) + .await?; + + Ok(()) + } + + #[instrument(skip(self), name = "reauthenticating")] + pub async fn reauthenticate(&self) { + trace!("Waiting for write lock"); + let mut lock = self.authentication.write().await; + info!("Acquired write lock"); let base = 2_f64; let initial_timeout = 1_f64; let mut tries = 0; let authentication_response = loop { - let r = fetch_access_token(self.get_http_client()); - match r { + match fetch_access_token(&self.http_client).await { Ok(authentication_response) => break authentication_response, Err(e) => { let sleep_time = if tries < 10 { let t = initial_timeout * base.powf(f64::from(tries)); - warn!("Failed to reauthenticate after {tries} tries! Error: {e:#?}. Trying again in {t:.3} seconds"); + warn!("Failed after {tries} tries! Error: {e:#?}. Trying again in {t:.3} seconds"); t } else { - error!("Failed to reauthenticate after {tries} tries! Error: {e:#?}. Trying again in 600.000 seconds"); + error!("Failed after {tries} tries! Error: {e:#?}. Trying again in 600.000 seconds"); 600_f64 }; - std::thread::sleep(Duration::from_secs_f64(sleep_time)); + tokio::time::sleep(std::time::Duration::from_secs_f64(sleep_time)).await; tries += 1; } } }; - trace!("Successfully got new access token!"); + trace!("Got new access token!"); - *state = AuthState { - access_token: authentication_response.access_token, - expires_in: authentication_response.expires_in, - }; + *lock = AuthenticationState::from(authentication_response); info!("Successfully reauthenticated."); } - /// Update the global ruleset. Waits for a write lock. - pub fn update_rules(&self) -> Result<(), DragonflyError> { - let response = fetch_rules( - self.get_http_client(), - &self.authentication_state.read().access_token, - )?; - let mut rules_state = self.rules_state.write(); - rules_state.rules = response.compile()?; - rules_state.hash = response.hash; + #[instrument(skip(self), name = "updating_rules")] + pub async fn update_rules(&self) { + trace!("Waiting for write lock"); + let mut lock = self.rules.write().await; + info!("Acquired write lock"); - Ok(()) - } + let base = 2_f64; + let initial_timeout = 1_f64; + let mut tries = 0; - pub fn bulk_get_job(&self, n_jobs: usize) -> reqwest::Result> { - fetch_bulk_job( - self.get_http_client(), - &self.authentication_state.read().access_token, - n_jobs, - ) - } + info!("Fetching rules"); + let rules_state = loop { + match fetch_rules( + &self.http_client, + &self.authentication.read().await.access_token, + ) + .await + { + Ok(rules_response) => match RulesState::try_from(rules_response) { + Ok(rules_state) => break rules_state, + Err(yara_error) => error!("YARA error: {yara_error}"), + }, + Err(http_err) => error!("HTTP Error: {http_err}"), + }; - /// Report an error to the server. - pub fn send_error(&self, body: &SubmitJobResultsError) -> reqwest::Result<()> { - send_error( - self.get_http_client(), - &self.authentication_state.read().access_token, - body, - ) - } + let sleep_time = if tries < 10 { + initial_timeout * base.powf(f64::from(tries)) + } else { + 600_f64 + }; - /// Submit the results of a scan to the server, given the job and the scan results of each - /// distribution - pub fn send_success(&self, body: &SubmitJobResultsSuccess) -> reqwest::Result<()> { - send_success( - self.get_http_client(), - &self.authentication_state.read().access_token, - body, - ) - } + warn!("Failed in {tries} attempts, trying again in {sleep_time} seconds"); + tokio::time::sleep(std::time::Duration::from_secs_f64(sleep_time)).await; + tries += 1; + }; + info!("Got new rules"); + + *lock = rules_state; - /// Return a reference to the underlying HTTP Client - pub fn get_http_client(&self) -> &Client { - &self.client + info!("Successfully updated rules."); } } -pub fn fetch_tarball( - http_client: &Client, - download_url: &Url, -) -> Result { - let response = http_client.get(download_url.clone()).send()?; +pub fn fetch_tarball(download_url: &Url) -> Result { + let response = reqwest::blocking::get(download_url.clone())?; - let decompressed = GzDecoder::new(response); let mut cursor: Cursor> = Cursor::new(Vec::new()); + let decompressed = GzDecoder::new(response); decompressed .take(APP_CONFIG.max_scan_size) .read_to_end(cursor.get_mut())?; @@ -165,8 +324,8 @@ pub fn fetch_tarball( Ok(tar::Archive::new(cursor)) } -pub fn fetch_zipfile(http_client: &Client, download_url: &Url) -> Result { - let response = http_client.get(download_url.to_string()).send()?; +pub fn fetch_zipfile(download_url: &Url) -> Result { + let response = reqwest::blocking::get(download_url.to_string())?; let mut cursor = Cursor::new(Vec::new()); response @@ -175,3 +334,14 @@ pub fn fetch_zipfile(http_client: &Client, download_url: &Url) -> Result Result { + let parsed_download_url: Url = download_url.parse().unwrap(); + if download_url.ends_with(".tar.gz") { + let tar = fetch_tarball(&parsed_download_url)?; + Ok(DistributionFile::Tar(tar)) + } else { + let zip = fetch_zipfile(&parsed_download_url)?; + Ok(DistributionFile::Zip(zip)) + } +} diff --git a/src/client/methods.rs b/src/client/methods.rs index 11b0ee6..ddef748 100644 --- a/src/client/methods.rs +++ b/src/client/methods.rs @@ -1,11 +1,14 @@ -use super::models; +use reqwest::Client; + +use crate::app_config::APP_CONFIG; -use crate::APP_CONFIG; -use reqwest::blocking::Client; +use super::models; -pub fn fetch_access_token(http_client: &Client) -> reqwest::Result { +pub async fn fetch_access_token( + http_client: &Client, +) -> reqwest::Result { let url = format!("https://{}/oauth/token", APP_CONFIG.auth0_domain); - let json_body = models::AuthBody { + let json_body = models::AuthenticationBody { client_id: &APP_CONFIG.client_id, client_secret: &APP_CONFIG.client_secret, audience: &APP_CONFIG.audience, @@ -17,63 +20,23 @@ pub fn fetch_access_token(http_client: &Client) -> reqwest::Result reqwest::Result> { - http_client - .post(format!("{}/jobs", APP_CONFIG.base_url)) - .header("Authorization", format!("Bearer {access_token}")) - .query(&[("batch", n_jobs)]) - .send()? - .error_for_status()? - .json() -} - -pub fn fetch_rules( +pub async fn fetch_rules( http_client: &Client, access_token: &str, ) -> reqwest::Result { http_client .get(format!("{}/rules", APP_CONFIG.base_url)) .header("Authorization", format!("Bearer {access_token}")) - .send()? + .send() + .await? .error_for_status()? .json() -} - -pub fn send_success( - http_client: &Client, - access_token: &str, - body: &models::SubmitJobResultsSuccess, -) -> reqwest::Result<()> { - http_client - .put(format!("{}/package", APP_CONFIG.base_url)) - .header("Authorization", format!("Bearer {access_token}")) - .json(&body) - .send()? - .error_for_status()?; - - Ok(()) -} - -pub fn send_error( - http_client: &Client, - access_token: &str, - body: &models::SubmitJobResultsError, -) -> reqwest::Result<()> { - http_client - .put(format!("{}/package", APP_CONFIG.base_url)) - .header("Authorization", format!("Bearer {access_token}")) - .json(&body) - .send()? - .error_for_status()?; - - Ok(()) + .await } diff --git a/src/client/models.rs b/src/client/models.rs index 3d0199b..dc46ed1 100644 --- a/src/client/models.rs +++ b/src/client/models.rs @@ -1,10 +1,5 @@ -use serde::Serialize; -use serde::{self, Deserialize}; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::fmt::Display; -use yara::{Compiler, Rules}; - -use crate::error::DragonflyError; #[derive(Debug, Serialize)] pub struct SubmitJobResultsSuccess { @@ -15,31 +10,10 @@ pub struct SubmitJobResultsSuccess { /// Contains all rule identifiers matched for the entire release. pub rules_matched: Vec, - - /// The commit hash of the ruleset used to produce these results. - pub commit: String, -} - -#[derive(Debug, Serialize)] -pub struct SubmitJobResultsError { - pub name: String, - pub version: String, - pub reason: String, -} - -impl Display for SubmitJobResultsError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - writeln!(f, "Name: {}", self.name)?; - writeln!(f, "Version: {}", self.version)?; - writeln!(f, "Reason: {}", self.reason)?; - - Ok(()) - } } #[derive(Debug, Deserialize)] pub struct Job { - pub hash: String, pub name: String, pub version: String, pub distributions: Vec, @@ -51,33 +25,15 @@ pub struct RulesResponse { pub rules: HashMap, } -impl RulesResponse { - /// Compile the rules from the response - pub fn compile(&self) -> Result { - let rules_str = self - .rules - .values() - .map(String::as_ref) - .collect::>() - .join("\n"); - - let compiled_rules = Compiler::new()? - .add_rules_str(&rules_str)? - .compile_rules()?; - - Ok(compiled_rules) - } -} - #[derive(Debug, Deserialize)] -pub struct AuthResponse { +pub struct AuthenticationResponse { pub access_token: String, pub expires_in: u32, pub token_type: String, } #[derive(Debug, Serialize)] -pub struct AuthBody<'a> { +pub struct AuthenticationBody<'a> { pub client_id: &'a str, pub client_secret: &'a str, pub audience: &'a str, diff --git a/src/error.rs b/src/error.rs index 5f34ea3..47ad6eb 100644 --- a/src/error.rs +++ b/src/error.rs @@ -43,6 +43,27 @@ pub enum DragonflyError { source: ConfigError, }, + #[error("AMQP Client Error: {source:#?}")] + LapinError { + #[from] + source: lapin::Error, + }, + + #[error("Unable to join panicked task: {source:#?}")] + JoinError { + #[from] + source: tokio::task::JoinError, + }, + + #[error("Unable to deserialize JSON: {source:#?}")] + SerdeJsonError { + #[from] + source: serde_json::Error, + }, + + #[error("Consumer cancelled")] + ConsumerCancelled, + #[allow(dead_code)] #[error("Download too large: '{0:#?}'")] DownloadTooLarge(String), diff --git a/src/main.rs b/src/main.rs index dfd64f2..e08310c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,142 +5,151 @@ mod exts; mod scanner; mod utils; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; +use std::time::Instant; +use client::download_distribution; use client::DragonflyClient; use error::DragonflyError; -use reqwest::blocking::Client; -use threadpool::ThreadPool; -use tracing::{debug, error, info, span, trace, Level}; +use tracing::trace_span; +use tracing::Instrument; +use tracing::{error, info, trace}; use tracing_subscriber::EnvFilter; -use yara::Rules; -use crate::{ - app_config::APP_CONFIG, - client::{Job, SubmitJobResultsError}, - scanner::{scan_all_distributions, PackageScanResults}, +use lapin::{ + message::Delivery, + options::{BasicAckOptions, BasicRejectOptions}, + Connection, ConnectionProperties, }; +use reqwest::Client; +use scanner::{Distribution, DistributionScanResults}; +use tracing::info_span; +use tracing::warn; +use utils::create_inspector_url; + +use crate::{app_config::APP_CONFIG, client::Job, scanner::PackageScanResults}; + +fn runner(client: &DragonflyClient, job: Job) -> Result { + let mut distribution_scan_results: Vec = Vec::new(); + for download_url in &job.distributions { + let _span = trace_span!( + "Scanner", + distribution = download_url.split('/').last().unwrap_or("/") + ) + .entered(); + trace!("Downloading distribution"); + let file = download_distribution(download_url)?; + trace!("Successfully downloaded distribution"); + let inspector_url = create_inspector_url( + &job.name, + &job.version, + &download_url.clone().parse().unwrap(), + ); + let mut distribution = Distribution { + file, + inspector_url, + }; + trace!("Blocking until rules are available"); + let rules_state = client.rules.blocking_read(); + trace!("Got read lock on rules, starting scan"); + let start = Instant::now(); + let distribution_scan_result = distribution.scan(&rules_state.compiled_rules)?; + let end = Instant::now(); + let duration = end - start; + trace!( + "Finished scan! Took {}s {}ms ", + duration.as_secs(), + duration.subsec_millis() + ); + + distribution_scan_results.push(distribution_scan_result); + } -/// The actual scanning logic. -/// -/// Takes the job to be scanned, the compiled rules, the commit hash being used, and the HTTP -/// client (for downloading distributions), and returns the `PackageScanResults`. -fn scanner( - http_client: &Client, - job: &Job, - rules: &Rules, - commit_hash: &str, -) -> Result { - let distribution_scan_results = scan_all_distributions(http_client, rules, job)?; - - let package_scan_result = PackageScanResults::new( - job.name.to_string(), - job.version.to_string(), + Ok(PackageScanResults { + name: job.name, + version: job.version, distribution_scan_results, - commit_hash.to_string(), - ); - - Ok(package_scan_result) + }) } -/// The job to run in the threadpool. -fn runner(client: &DragonflyClient, job: Job) { - let span = span!(Level::INFO, "Job", name = job.name, version = job.version); - let _enter = span.enter(); - let rules_state = client.rules_state.read(); - let send_result = match scanner( - client.get_http_client(), - &job, - &rules_state.rules, - &rules_state.hash, - ) { - Ok(package_scan_results) => { - let body = package_scan_results.build_body(); - - client.send_success(&body) - } - - Err(err) => { - let body = SubmitJobResultsError { - name: job.name, - version: job.version, - reason: format!("{err}"), - }; - - client.send_error(&body) - } - }; - - if let Err(err) = send_result { - error!("Error while sending response to API: {err}"); +async fn handle_delivery( + client: Arc, + delivery: &Delivery, +) -> Result<(), DragonflyError> { + let job: Job = serde_json::from_slice(&delivery.data)?; + let span = info_span!("Job", name = &job.name, version = &job.version); + async move { + trace!("Spawning blocking scanner job in threadpool"); + let results = tokio::task::spawn_blocking({ + let client = Arc::clone(&client); + move || runner(&client, job) + }) + .await??; + trace!("Scanner job in threadpool finished"); + trace!("Pushing results onto results queue"); + client.push_results(&results.build_body()).await?; + trace!("Ack'ing original delivery"); + delivery.ack(BasicAckOptions::default()).await?; + info!("Acknowledged delivery"); + + Ok(()) } + .instrument(span) + .await } -fn main() -> Result<(), DragonflyError> { +#[tokio::main] +async fn main() -> Result<(), DragonflyError> { tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .init(); - let client = Arc::new(DragonflyClient::new()?); + let http_client = Client::new(); - // We spawn `n_jobs` threads using a threadpool for processing jobs - let n_jobs = APP_CONFIG.threads; - let pool = ThreadPool::new(n_jobs); - debug!("Started threadpool with {} workers", n_jobs); + let connection = Connection::connect(&APP_CONFIG.amqp, ConnectionProperties::default()).await?; + info!("Established connection to AMQP server"); + let client = Arc::new(DragonflyClient::init(http_client, connection).await?); - // Spawning the authentication thread - std::thread::spawn({ + tokio::spawn({ let client = Arc::clone(&client); - trace!("Starting access token refresh thread"); - move || loop { - trace!( - "Waiting for read lock on authentication state to determine how long to sleep for" - ); - let expires_in = client.authentication_state.read().expires_in - 10; - trace!("Got read lock on authentication state"); - info!("Will reauthenticate in {expires_in} seconds"); - std::thread::sleep(Duration::from_secs(u64::from(expires_in))); - - info!("Reauthenticating"); - client.reauthenticate(); + async move { + loop { + let sleep = client.authentication.read().await.expires_in + - tokio::time::Duration::from_secs(10); + tokio::time::sleep(sleep).await; + client.reauthenticate().await; + } } }); - loop { - info!("Fetching {} bulk jobs...", APP_CONFIG.bulk_size); - match client.bulk_get_job(APP_CONFIG.bulk_size) { - Ok(jobs) => { - if jobs.is_empty() { - debug!("Bulk job request returned no jobs"); + tokio::spawn({ + let client = Arc::clone(&client); + async move { + loop { + match client.receive_rule_update().await { + Ok(_) => client.update_rules().await, + Err(err) => error!("Error from rule updates queue: {err}"), } + } + } + }); - info!("Successfully fetched {} jobs", jobs.len()); - - for job in jobs { - info!("Submitting {} v{} for execution", job.name, job.version); - let rules_state = client.rules_state.read(); - if job.hash != rules_state.hash { - info!( - "Must update rules, updating from {} to {}", - rules_state.hash, job.hash - ); - drop(rules_state); - if let Err(err) = client.update_rules() { - error!("Error while updating rules: {err}"); + loop { + trace!("Waiting for message"); + match client.receive_delivery().await { + Ok(delivery) => { + trace!("Message received"); + let client = Arc::clone(&client); + tokio::task::spawn(async move { + if let Err(err) = handle_delivery(Arc::clone(&client), &delivery).await { + error!("Error while scanning: {err}"); + match delivery.reject(BasicRejectOptions { requeue: false }).await { + Ok(()) => warn!("Rejected message"), + Err(err) => error!("Error while rejecting: {err}"), } } - - pool.execute({ - let client = Arc::clone(&client); - move || runner(&client, job) - }); - } - - trace!("Finished loading jobs into queue!"); + }); } - - Err(err) => error!("Unexpected HTTP error: {err}"), + Err(err) => error!("Error while consuming: {err}"), } - - std::thread::sleep(Duration::from_secs(APP_CONFIG.load_duration)); } } diff --git a/src/scanner.rs b/src/scanner.rs index 32a57fa..7900c60 100644 --- a/src/scanner.rs +++ b/src/scanner.rs @@ -4,14 +4,13 @@ use std::{ path::{Path, PathBuf}, }; -use reqwest::{blocking::Client, Url}; +use reqwest::Url; use yara::Rules; use crate::{ - client::{fetch_tarball, fetch_zipfile, Job, SubmitJobResultsSuccess, TarballType, ZipType}, + client::{SubmitJobResultsSuccess, TarballType, ZipType}, error::DragonflyError, exts::RuleExt, - utils::create_inspector_url, }; #[derive(Debug, Hash, Eq, PartialEq)] @@ -39,7 +38,7 @@ impl FileScanResult { } /// Scan an archive format using Yara rules. -trait Scan { +pub trait Scan { fn scan(&mut self, rules: &Rules) -> Result, DragonflyError>; } @@ -75,15 +74,23 @@ impl Scan for ZipType { } } +pub enum DistributionFile { + Zip(ZipType), + Tar(TarballType), +} + /// A distribution consisting of an archive and an inspector url. -struct Distribution { - file: Box, - inspector_url: Url, +pub struct Distribution { + pub file: DistributionFile, + pub inspector_url: Url, } impl Distribution { - fn scan(&mut self, rules: &Rules) -> Result { - let results = self.file.scan(rules)?; + pub fn scan(&mut self, rules: &Rules) -> Result { + let results = match &mut self.file { + DistributionFile::Zip(zip_archive) => zip_archive.scan(rules), + DistributionFile::Tar(tar_archive) => tar_archive.scan(rules), + }?; Ok(DistributionScanResults::new( results, @@ -160,28 +167,14 @@ impl DistributionScanResults { } } +#[derive(Debug)] pub struct PackageScanResults { pub name: String, pub version: String, pub distribution_scan_results: Vec, - pub commit_hash: String, } impl PackageScanResults { - pub fn new( - name: String, - version: String, - distribution_scan_results: Vec, - commit_hash: String, - ) -> Self { - Self { - name, - version, - distribution_scan_results, - commit_hash, - } - } - /// Format the package scan results into something that can be sent over the API pub fn build_body(&self) -> SubmitJobResultsSuccess { let highest_score_distribution = self @@ -212,39 +205,10 @@ impl PackageScanResults { score, inspector_url, rules_matched, - commit: self.commit_hash.clone(), } } } -/// Scan all the distributions of the given job against the given ruleset -/// -/// Uses the provided HTTP client to download each distribution. -pub fn scan_all_distributions( - http_client: &Client, - rules: &Rules, - job: &Job, -) -> Result, DragonflyError> { - let mut distribution_scan_results = Vec::with_capacity(job.distributions.len()); - for distribution in &job.distributions { - let download_url: Url = distribution.parse().unwrap(); - let inspector_url = create_inspector_url(&job.name, &job.version, &download_url); - - let mut dist = Distribution { - file: if distribution.ends_with(".tar.gz") { - Box::new(fetch_tarball(http_client, &download_url)?) - } else { - Box::new(fetch_zipfile(http_client, &download_url)?) - }, - inspector_url, - }; - let distribution_scan_result = dist.scan(rules)?; - distribution_scan_results.push(distribution_scan_result); - } - - Ok(distribution_scan_results) -} - /// Scan a file given it implements `Read`. /// /// # Arguments From adc285e4609f7c3428ccffec3c3b87904ddd9ad4 Mon Sep 17 00:00:00 2001 From: Robin5605 Date: Mon, 25 Sep 2023 13:30:41 -0500 Subject: [PATCH 2/2] Use tracing's instrument macro --- src/main.rs | 41 ++++++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/src/main.rs b/src/main.rs index e08310c..9f54519 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,6 +29,7 @@ use utils::create_inspector_url; use crate::{app_config::APP_CONFIG, client::Job, scanner::PackageScanResults}; +#[tracing::instrument(skip_all, fields(name = job.name, version = job.version))] fn runner(client: &DragonflyClient, job: Job) -> Result { let mut distribution_scan_results: Vec = Vec::new(); for download_url in &job.distributions { @@ -72,30 +73,32 @@ fn runner(client: &DragonflyClient, job: Job) -> Result, delivery: &Delivery, ) -> Result<(), DragonflyError> { + trace!("Parsing delivery for job"); let job: Job = serde_json::from_slice(&delivery.data)?; - let span = info_span!("Job", name = &job.name, version = &job.version); - async move { - trace!("Spawning blocking scanner job in threadpool"); - let results = tokio::task::spawn_blocking({ - let client = Arc::clone(&client); - move || runner(&client, job) - }) - .await??; - trace!("Scanner job in threadpool finished"); - trace!("Pushing results onto results queue"); - client.push_results(&results.build_body()).await?; - trace!("Ack'ing original delivery"); - delivery.ack(BasicAckOptions::default()).await?; - info!("Acknowledged delivery"); - - Ok(()) - } - .instrument(span) - .await + trace!("Successfully parsed delivery"); + + tracing::Span::current().record("name", &job.name); + tracing::Span::current().record("version", &job.version); + + trace!("Spawning blocking scanner job in threadpool"); + let results = tokio::task::spawn_blocking({ + let client = Arc::clone(&client); + move || runner(&client, job) + }) + .await??; + trace!("Scanner job in threadpool finished"); + trace!("Pushing results onto results queue"); + client.push_results(&results.build_body()).await?; + trace!("Ack'ing original delivery"); + delivery.ack(BasicAckOptions::default()).await?; + info!("Acknowledged delivery"); + + Ok(()) } #[tokio::main]