diff --git a/CHANGELOG.md b/CHANGELOG.md deleted file mode 100644 index b86dfd4..0000000 --- a/CHANGELOG.md +++ /dev/null @@ -1,32 +0,0 @@ -# Changelog - -All notable changes to this project will be documented in this file. - -The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), -and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - -## [Unreleased] - -### Added - -- `report::Event` and `report::EventType` to measure the result of async operation execution -- `report::RealtimeStatus` trait for monitoring in realtime how many active operations and connections are concurrently - utilized. -- `report::RealtimeReporter` trait for building own listener on open connections and operations -- `report::RealtimeReport` default concurrent lightweight implementation for `report::RealtimeReporter` - and `report::RealtimeStatus` traits -- `report::EventProcessorBuilder` and `report::EventProcessor` traits for processing timings of operations -- `report::AggregagteEventProcessorBuilder` and `report::AggregagteEventProcessor` implementation that aggregates - timings into `HdrHistogram` and `AggregateEvent` timeline -- `executor::ExecutionStep` trait for implementing async load test scenario steps with combinators like - `executor::NoopStep`, `executor::ClosureStep`, `executor::SequenceStep` that allows to build scenarios in simple dsl -- `executor::ScenarioBuilder` and `executor::Scenario` trait for creating load testing scenarios - with `executor::StepScenarioBuilder` and `executor::StepScenario` based on `ExecutionStep` functionality. -- `executor::Limiter` trait for creating control structures that can throttle or terminate test run based - on `report::RealtimeStatus` results. -- `executor::MaxDurationLimiter` limiter that terminates load test after specified duration. -- `executor::MaxOperationsLimiter` limiter that terminates load test after specified number of operations finished. -- `executor::ConcurrencyLimiter` limiter that throttles operations when too concurrent operations reach limit. -- `executor::MeasuredFuture` for capturing execution events of the async tasks. - -[Unreleased]: https://github.com/EcomDev/profusion-rs/compare/3077010...HEAD \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 4b6e2f5..367af9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -40,9 +40,9 @@ checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" [[package]] name = "autocfg" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +checksum = "f1fdabc7756949593fe60f30ec81974b613357de856987752631dea1e3394c80" [[package]] name = "backtrace" @@ -67,9 +67,9 @@ checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" [[package]] name = "bumpalo" -version = "3.15.4" +version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ff69b9dd49fd426c69a0db9fc04dd934cdb6645ff000864d98f7e2af8830eaa" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "byteorder" @@ -85,9 +85,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.90" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" +checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7" [[package]] name = "cfg-if" @@ -124,9 +124,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.3" +version = "4.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "949626d00e063efc93b6dca932419ceb5432f99769911c0b995f7e884c778813" +checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0" dependencies = [ "clap_builder", ] @@ -236,9 +236,9 @@ checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" [[package]] name = "either" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" +checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2" [[package]] name = "flate2" @@ -332,9 +332,9 @@ checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" [[package]] name = "half" -version = "2.4.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5eceaaeec696539ddaf7b333340f1af35a5aa87ae3e4f3ead0532f72affab2e" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" dependencies = [ "cfg-if", "crunchy", @@ -391,9 +391,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.10" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" [[package]] name = "js-sys" @@ -446,9 +446,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.1" +version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" [[package]] name = "minimal-lexical" @@ -533,9 +533,9 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "pin-project-lite" -version = "0.2.13" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" [[package]] name = "pin-utils" @@ -573,9 +573,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.79" +version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e" +checksum = "a56dea16b0a29e94408b9aa5e2940a4eedbd128a1ba20e8f7ae60fd3d465af0e" dependencies = [ "unicode-ident", ] @@ -589,27 +589,48 @@ dependencies = [ "itertools 0.12.1", "loom", "pin-project-lite", + "profusion-macros", "rustc-hash", + "serde", + "serde_test", "thiserror", "tokio", "tracing", "trait-variant", ] +[[package]] +name = "profusion-macros" +version = "0.1.0" +dependencies = [ + "proc-macro2", + "profusion", + "quote", + "syn", +] + +[[package]] +name = "profusion-tests" +version = "0.1.0" +dependencies = [ + "profusion", + "tokio", +] + [[package]] name = "quote" -version = "1.0.35" +version = "1.0.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" dependencies = [ "proc-macro2", ] [[package]] name = "rayon" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4963ed1bc86e4f3ee217022bd855b297cef07fb9eac5dfa1f788b220b49b3bd" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" dependencies = [ "either", "rayon-core", @@ -634,7 +655,7 @@ dependencies = [ "aho-corasick", "memchr", "regex-automata 0.4.6", - "regex-syntax 0.8.2", + "regex-syntax 0.8.3", ] [[package]] @@ -654,7 +675,7 @@ checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.2", + "regex-syntax 0.8.3", ] [[package]] @@ -665,9 +686,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "regex-syntax" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" [[package]] name = "rustc-demangle" @@ -683,9 +704,9 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" [[package]] name = "rustversion" -version = "1.0.14" +version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" +checksum = "80af6f9131f277a45a3fba6ce8e2258037bb0477a67e610d3c1fe046ab31de47" [[package]] name = "ryu" @@ -730,15 +751,24 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.114" +version = "1.0.115" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" +checksum = "12dc5c46daa8e9fdf4f5e71b6cf9a53f2487da0e86e55808e2d35539666497dd" dependencies = [ "itoa", "ryu", "serde", ] +[[package]] +name = "serde_test" +version = "1.0.176" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a2f49ace1498612d14f7e0b8245519584db8299541dfe31a06374a828d620ab" +dependencies = [ + "serde", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -756,9 +786,9 @@ checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "syn" -version = "2.0.53" +version = "2.0.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7383cd0e49fff4b6b90ca5670bfd3e9d6a733b3f90c686605aa7eec8c4996032" +checksum = "4a6531ffc7b071655e4ce2e04bd464c4830bb585a61cabb96cf808f05172615a" dependencies = [ "proc-macro2", "quote", @@ -807,9 +837,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.36.0" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", "num_cpus", @@ -1032,7 +1062,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.4", + "windows-targets 0.52.5", ] [[package]] @@ -1052,17 +1082,18 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b" +checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" dependencies = [ - "windows_aarch64_gnullvm 0.52.4", - "windows_aarch64_msvc 0.52.4", - "windows_i686_gnu 0.52.4", - "windows_i686_msvc 0.52.4", - "windows_x86_64_gnu 0.52.4", - "windows_x86_64_gnullvm 0.52.4", - "windows_x86_64_msvc 0.52.4", + "windows_aarch64_gnullvm 0.52.5", + "windows_aarch64_msvc 0.52.5", + "windows_i686_gnu 0.52.5", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.5", + "windows_x86_64_gnu 0.52.5", + "windows_x86_64_gnullvm 0.52.5", + "windows_x86_64_msvc 0.52.5", ] [[package]] @@ -1073,9 +1104,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9" +checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" [[package]] name = "windows_aarch64_msvc" @@ -1085,9 +1116,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675" +checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" [[package]] name = "windows_i686_gnu" @@ -1097,9 +1128,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.4" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3" +checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" [[package]] name = "windows_i686_msvc" @@ -1109,9 +1146,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02" +checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" [[package]] name = "windows_x86_64_gnu" @@ -1121,9 +1158,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03" +checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" [[package]] name = "windows_x86_64_gnullvm" @@ -1133,9 +1170,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177" +checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" [[package]] name = "windows_x86_64_msvc" @@ -1145,6 +1182,6 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" +checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" diff --git a/Cargo.toml b/Cargo.toml index 59836ad..45cfbef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,30 +1,8 @@ -[package] -name = "profusion" -version = "0.1.0" -authors = ["Ivan Chepurnyi "] -edition = "2021" -repository = "https://github.com/EcomDev/profusion-rs" -license = "MIT" -rust-version = "1.77.0" - -[lib] -name = "profusion" -crate-type = ["lib"] - -[dependencies] -tokio = { version = "1", features = ["rt", "time", "macros", "test-util"] } -pin-project-lite = "0.2" -hdrhistogram = "7" -thiserror = "1" -rustc-hash = "1.1.0" -trait-variant = "0.1.2" -tracing = "0.1.40" - -[dev-dependencies] -loom = "0.7" -itertools = "0.12" -criterion = { version = "0.5", features = ["default", "async_tokio"] } -tokio = { version = "1", features = ["rt", "time", "macros", "rt-multi-thread", "test-util"] } +[workspace] +resolver = "2" +members = [ + "profusion", "profusion-macros", "profusion-tests" +] [profile.release] opt-level = 3 @@ -34,10 +12,3 @@ lto = true opt-level = 0 codegen-units = 1 lto = false - -[[bench]] -name = "aggregator_storage" -harness = false - -[features] -test_util = [] \ No newline at end of file diff --git a/profusion-macros/CHANGELOG.md b/profusion-macros/CHANGELOG.md new file mode 100644 index 0000000..1a77b9c --- /dev/null +++ b/profusion-macros/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog + +## [Unreleased] + +### Added + +[Unreleased]: https://github.com/EcomDev/profusion-rs/compare/3077010...HEAD \ No newline at end of file diff --git a/profusion-macros/Cargo.toml b/profusion-macros/Cargo.toml new file mode 100644 index 0000000..2b26186 --- /dev/null +++ b/profusion-macros/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "profusion-macros" +version = "0.1.0" +authors = ["Ivan Chepurnyi "] +edition = "2021" +repository = "https://github.com/EcomDev/profusion-rs" +license = "MIT" +rust-version = "1.77.0" + +[lib] +proc-macro = true + +[dependencies] +proc-macro2 = { version = "~1.0" } +syn = { version = "~2.0", features = ["full"] } +quote = { version = "~1.0" } + +[dev-dependencies] +profusion = { version = "0.1.0", path = "../profusion", features = ["full"] } \ No newline at end of file diff --git a/profusion-macros/src/lib.rs b/profusion-macros/src/lib.rs new file mode 100644 index 0000000..2dcc5ff --- /dev/null +++ b/profusion-macros/src/lib.rs @@ -0,0 +1,22 @@ +use proc_macro::TokenStream; + +use quote::ToTokens; +use syn::parse_macro_input; + +use crate::load_test_function::LoadTestFunction; +use crate::scenario::Scenario; +use crate::scenario_arguments::ScenarioArguments; + +mod load_test_function; +mod scenario; +mod scenario_arguments; + +#[proc_macro_attribute] +pub fn scenario(args: TokenStream, item: TokenStream) -> TokenStream { + let scenario = Scenario::new( + parse_macro_input!(args as ScenarioArguments), + parse_macro_input!(item as LoadTestFunction), + ); + + scenario.to_token_stream().into() +} diff --git a/profusion-macros/src/load_test_function.rs b/profusion-macros/src/load_test_function.rs new file mode 100644 index 0000000..b53ca09 --- /dev/null +++ b/profusion-macros/src/load_test_function.rs @@ -0,0 +1,520 @@ +use proc_macro2::TokenStream; +use quote::quote; +use syn::{ + AssocType, + FnArg, + GenericArgument, Ident, ItemFn, parse::{Parse, ParseStream}, Pat, Path, PathArguments, PathSegment, PatType, + ReturnType, Signature, spanned::Spanned, TraitBound, Type, TypeParamBound, TypePath, +}; + +use crate::scenario_arguments::ScenarioArguments; + +#[derive(Debug, Eq, PartialEq)] +pub(crate) struct LoadTestFunction { + function: ItemFn, + state: Vec<(Ident, Path)>, + metric_type: Type, +} + +impl Parse for LoadTestFunction { + fn parse(input: ParseStream) -> syn::Result { + let function = input.parse::()?; + + if function.sig.asyncness.is_none() { + return Err(syn::Error::new( + function.sig.span(), + "scenario must be async function", + )); + } + + let (state, metric_type) = process_function_input(&function.sig)?; + + validate_return_type(&function.sig.output)?; + + Ok(Self { + function, + state, + metric_type, + }) + } +} + +impl LoadTestFunction { + fn builder_definition(&self, name: &Ident) -> TokenStream { + quote! { + struct #name; + } + } + + fn scenario_definition(&self, name: &Ident) -> TokenStream { + let state: Vec<_> = self + .state + .iter() + .map(|(ident, path)| { + quote! { + #ident: #path, + } + }) + .collect(); + + if !self.state.is_empty() { + return quote! { + struct #name { #( #state )* } + }; + } + + quote! { struct #name; } + } + + fn scenario_default(&self, name: &Ident) -> TokenStream { + let state: Vec<_> = self + .state + .iter() + .map(|(ident, _)| { + quote! { + #ident: Default::default(), + } + }) + .collect(); + + if !self.state.is_empty() { + return quote! { + #name { #( #state )* } + }; + } + + quote! { + #name + } + } + + fn builder_impl(&self, name: &Ident, builder_name: &Ident) -> TokenStream { + let build_creator = self.scenario_default(name); + let metric = &self.metric_type; + + quote! { + impl profusion::prelude::ScenarioBuilder<#metric> for #builder_name { + type Scenario = #name; + + fn build(&self) -> Self::Scenario { + #build_creator + } + } + } + } + + fn scenario_impl(&self, name: &Ident) -> TokenStream { + let metric = &self.metric_type; + let state: Vec<_> = self + .state + .iter() + .map(|(ident, _)| { + quote! { + &mut self.#ident + } + }) + .collect(); + + let function_name = &self.function.sig.ident; + + quote! { + impl profusion::prelude::Scenario<#metric> for #name { + async fn execute( + &mut self, + aggregate: &mut MetricMeasurer< impl MetricAggregate > + ) -> Result<(), MetricRecordError> { + #function_name(aggregate #(, #state )*).await + } + } + } + } + + pub(crate) fn generate(&self, arguments: &ScenarioArguments) -> TokenStream { + let builder_definition = self.builder_definition(arguments.builder_name()); + let builder_impl = self.builder_impl(arguments.name(), arguments.builder_name()); + let scenario_definition = self.scenario_definition(arguments.name()); + let scenario_impl = self.scenario_impl(arguments.name()); + let function_definition = &self.function; + + quote! { + #builder_definition + #scenario_definition + #builder_impl + #scenario_impl + #function_definition + } + } +} + +fn process_function_input(signature: &Signature) -> Result<(Vec<(Ident, Path)>, Type), syn::Error> { + let metric_type = match signature.inputs.first() { + Some(FnArg::Typed(path)) + if is_of_type(&path.ty, "MetricMeasurer") && is_mutable_reference(&path.ty) => + { + extract_last_path_segment(&path.ty) + } + _ => None, + }; + + let metric_type = match metric_type { + Some(PathSegment { + arguments: PathArguments::AngleBracketed(arguments), + .. + }) => match arguments.args.first().map(extract_measurer_aggregate_metric) { + Some(Some(Ok(path))) => path, + Some(Some(Err(err))) => return Err(err), + _ => { + return Err(syn::Error::new( + signature.inputs.span(), + "missing required `impl MetricAggregate` in `MetricMeasurer`", + )) + } + }, + Some(PathSegment { + arguments: PathArguments::None, + .. + }) => { + return Err(syn::Error::new( + signature.inputs.span(), + "missing required `impl MetricAggregate` in `MetricMeasurer`", + )) + } + _ => return Err(syn::Error::new( + signature.inputs.span(), + "missing required `&mut profusion::prelude::MetricMeasurer` as first function argument", + )), + }; + + let mut types = Vec::new(); + + for item in signature.inputs.iter().skip(1) { + match item { + FnArg::Typed(PatType { ty, pat, .. }) => { + match ( + extract_type_path(ty), + *(pat.clone()), + is_mutable_reference(ty), + ) { + (Some(type_name), Pat::Ident(ident), true) => { + types.push((ident.ident, type_name.path.clone())) + } + _ => return Err(syn::Error::new( + item.span(), + "state for load tests should be a &mut type that implements Default trait", + )), + } + } + _ => continue, + } + } + + Ok((types, metric_type)) +} + +fn is_of_type(item_type: &Type, expected: &str) -> bool { + extract_type_path(item_type).map_or(false, |path| is_path_of_type(&path.path, expected)) +} + +fn is_path_of_type(path: &Path, expected: &str) -> bool { + matches!( + path.segments.last(), + Some(path) if path.ident.eq(&Ident::new(expected, path.ident.span())) + ) +} + +fn extract_type_path(item_type: &Type) -> Option<&TypePath> { + match item_type { + Type::Path(item) => Some(item), + Type::Reference(reference) => extract_type_path(&reference.elem), + _ => None, + } +} + +fn extract_measurer_aggregate_metric( + argument: &GenericArgument, +) -> Option> { + match argument { + GenericArgument::Type(Type::ImplTrait(trait_impl)) => match trait_impl.bounds.first() { + Some(TypeParamBound::Trait(TraitBound { path, .. })) + if is_path_of_type(path, "MetricAggregate") => + { + match path.segments.last()? { + PathSegment { + arguments: PathArguments::AngleBracketed(args), + .. + } => match args.args.first()? { + GenericArgument::AssocType(AssocType { ident, ty, .. }) + if ident.eq(&Ident::new("Metric", ident.span())) => + { + Some(Ok(ty.clone())) + } + _ => Some(Err(syn::Error::new( + args.args.first()?.span(), + "missing required `T` of in `impl MetricAggregate`", + ))), + }, + _ => None, + } + } + _ => None, + }, + _ => None, + } +} + +fn extract_last_path_segment(item_type: &Type) -> Option<&PathSegment> { + match extract_type_path(item_type)?.path.segments.last() { + Some(path) => Some(path), + _ => None, + } +} + +fn is_mutable_reference(item_type: &Type) -> bool { + matches!(item_type, Type::Reference(reference) if reference.mutability.is_some()) +} + +fn validate_return_type(result: &ReturnType) -> Result<(), syn::Error> { + if let ReturnType::Type(_, return_type) = result { + if is_of_type(return_type, "Result") { + return Ok(()); + } + }; + + Err(syn::Error::new( + result.span(), + "invalid return type, it should be std::result::Result with `profusion::prelude::MetricRecordError` as Error", + )) +} + +#[cfg(test)] +mod load_test_function_tests { + use quote::quote; + use syn::parse2; + + use super::*; + + #[test] + fn returns_error_on_non_async_function() { + let stream = quote! { + fn load_test(measurer: &mut MetricMeasurer>) + -> Result<(), MetricRecordError> { + + } + }; + + let result = parse2::(stream); + assert!(result.is_err()); + } + + #[test] + fn returns_error_on_missing_measurer_argument() { + let stream = quote! { + async fn load_test() -> Result<(), MetricRecordError> { + + } + }; + + let result = parse2::(stream); + assert!(result.is_err()); + } + + #[test] + fn returns_error_on_empty_metric() { + let stream = quote! { + async fn load_test( + measurer: &mut MetricMeasurer + ) -> Result<(), MetricRecordError> { + + } + }; + + let result = parse2::(stream); + assert!(result.is_err()); + } + + #[test] + fn returns_error_not_mut_arguments() { + let stream = quote! { + async fn load_test( + measurer: &mut profusion::prelude::MetricMeasurer, + state: &mut State, + state_two: State2 + ) -> Result<(), MetricRecordError> where A: Instance { + + } + }; + + let result = parse2::(stream); + assert!(result.is_err()); + } + + #[test] + fn returns_error_when_not_result_return_type() { + let stream = quote! { + async fn load_test( + measurer: &mut profusion::prelude::MetricMeasurer, + state: &mut State, + state_two: State2 + ) where A: Instance { + + } + }; + + let result = parse2::(stream); + assert!(result.is_err()); + } + + #[test] + fn returns_error_when_wrong_result_error_returned() { + let stream = quote! { + async fn load_test( + measurer: &mut profusion::prelude::MetricMeasurer, + state: &mut State, state_two: State2 + ) -> Result<(), Test> where A: Instance { + + } + }; + + let result = parse2::(stream); + assert!(result.is_err()); + } +} + +#[cfg(test)] +mod process_function_arguments_tests { + use quote::quote; + use quote::ToTokens; + use syn::{parse2, Signature}; + + use super::*; + + #[test] + fn first_argument_of_function_is_correct() { + let (state, metric_type) = process_function_input( + &parse2::( + quote! { async fn load_test(measurer: &mut MetricMeasurer>) }, + ) + .unwrap(), + ) + .unwrap(); + + let ident = metric_type.to_token_stream(); + + assert_eq!(state.len(), 0); + assert_eq!(ident.to_string(), quote! { Test }.to_string()); + } + + #[test] + fn error_when_first_argument_of_function_is_missing() { + let result = process_function_input( + &parse2::(quote! { async fn load_test() }).unwrap(), + ); + + assert!(result.is_err()); + } + + #[test] + fn error_when_first_argument_of_function_is_not_mutable_ref() { + let result = process_function_input( + &parse2::(quote! { async fn load_test(measurer: MetricMeasurer) }) + .unwrap(), + ); + + assert!(result.is_err()); + } + + #[test] + fn error_when_first_argument_function_is_not_mutable() { + let result = process_function_input( + &parse2::(quote! { async fn load_test(measurer: &mut State) }).unwrap(), + ); + + assert!(result.is_err()); + } + + #[test] + fn error_when_other_arguments_are_not_mutable_references() { + let result = process_function_input( + &parse2::(quote! { + async fn load_test( + measurer: &mut MetricMeasurer, + state_one: &mut item_crate::StateOne, + state_two: item_crate::StateTwo, + ) + }) + .unwrap(), + ); + + assert!(result.is_err()); + } + + #[test] + fn returns_all_state_arguments() { + let (result, _metric_type) = process_function_input( + &parse2::(quote! { + async fn load_test( + measurer: &mut MetricMeasurer>, + state_one: &mut item_crate::StateOne, + state_two: &mut item_crate::StateTwo, + state_three: &mut StateThree + ) + }) + .unwrap(), + ) + .unwrap(); + + assert_eq!(result[0].0, Ident::new("state_one", result[0].0.span())); + assert_eq!(result[1].0, Ident::new("state_two", result[0].0.span())); + assert_eq!(result[2].0, Ident::new("state_three", result[0].0.span())); + } +} + +#[cfg(test)] +mod validate_result_type_tests { + use quote::quote; + use syn::parse2; + + use super::*; + + #[test] + fn accepts_full_metric_error_in_result() { + let return_type = validate_return_type( + &parse2::(quote! { -> Result<(), profusion::prelude::MetricRecordError> }) + .unwrap(), + ); + + assert!(return_type.is_ok()); + } + + #[test] + fn accepts_simplified_metric_error_in_result() { + let return_type = validate_return_type( + &parse2::(quote! { -> Result<(), MetricRecordError> }).unwrap(), + ); + + assert!(return_type.is_ok()); + } + + #[test] + fn accepts_any_error_result_that_converts_to_metric_error() { + let return_type = validate_return_type( + &parse2::(quote! { -> Result<(), std::io::Error> }).unwrap(), + ); + + assert!(return_type.is_ok()); + } + + #[test] + fn does_not_accept_wrong_return_type() { + let return_type = validate_return_type(&parse2::(quote! { -> () }).unwrap()); + + assert!(return_type.is_err()); + } + + #[test] + fn does_not_accept_empty_return_type() { + let return_type = validate_return_type(&parse2::(quote! {}).unwrap()); + + assert!(return_type.is_err()); + } +} diff --git a/profusion-macros/src/scenario.rs b/profusion-macros/src/scenario.rs new file mode 100644 index 0000000..36d98f3 --- /dev/null +++ b/profusion-macros/src/scenario.rs @@ -0,0 +1,122 @@ +use proc_macro2::TokenStream; +use quote::ToTokens; + +use crate::load_test_function::LoadTestFunction; +use crate::scenario_arguments::ScenarioArguments; + +pub(crate) struct Scenario { + scenario_arguments: ScenarioArguments, + function: LoadTestFunction, +} + +impl Scenario { + pub(crate) fn new(scenario_arguments: ScenarioArguments, function: LoadTestFunction) -> Self { + Self { + scenario_arguments, + function, + } + } +} + +impl ToTokens for Scenario { + fn to_tokens(&self, tokens: &mut TokenStream) { + self.function.generate(&self.scenario_arguments).to_tokens(tokens); + } +} + +#[cfg(test)] +mod tests { + use quote::quote; + use syn::parse2; + + use super::*; + + #[test] + fn generates_stateless_code() { + let scenario = Scenario::new( + parse2(quote! { MyLoadTest }).unwrap(), + parse2( + quote! { async fn test_function(report: &mut MetricMeasurer>) -> Result<(), Error> {}}, + ) + .unwrap(), + ); + + assert_eq!( + scenario.to_token_stream().to_string(), + quote! { + struct MyLoadTest; + struct MyLoadTestScenario; + + impl profusion::prelude::ScenarioBuilder<&'static str> for MyLoadTest { + type Scenario = MyLoadTestScenario; + fn build(& self) -> Self::Scenario { + MyLoadTestScenario + } + } + + impl profusion::prelude::Scenario<&'static str> for MyLoadTestScenario { + async fn execute( + &mut self, + aggregate: &mut MetricMeasurer > + ) -> Result<(), MetricRecordError> { + test_function(aggregate).await + } + } + + async fn test_function(report: &mut MetricMeasurer >) + -> Result<(), Error> { + + } + } + .to_string() + ); + } + + #[test] + fn generates_stateful_code() { + let scenario = Scenario::new( + parse2(quote! { MyLoadTest }).unwrap(), + parse2( + quote! { + async fn test_function(report: &mut MetricMeasurer>, state_one: &mut StateOne, state_two: &mut StateTwo,) -> Result<(), Error> {}}, + ) + .unwrap(), + ); + + assert_eq!( + scenario.to_token_stream().to_string(), + quote! { + struct MyLoadTest; + struct MyLoadTestScenario { + state_one: StateOne, + state_two: StateTwo, + } + + impl profusion::prelude::ScenarioBuilder<&'static str> for MyLoadTest { + type Scenario = MyLoadTestScenario; + fn build(& self) -> Self::Scenario { + MyLoadTestScenario { + state_one: Default::default(), + state_two: Default::default(), + } + } + } + + impl profusion::prelude::Scenario<&'static str> for MyLoadTestScenario { + async fn execute( + &mut self, + aggregate: &mut MetricMeasurer > + ) -> Result<(), MetricRecordError> { + test_function(aggregate, &mut self.state_one, &mut self.state_two).await + } + } + + async fn test_function(report: &mut MetricMeasurer >, state_one: &mut StateOne, state_two: &mut StateTwo, ) + -> Result<(), Error> { + + } + } + .to_string() + ); + } +} diff --git a/profusion-macros/src/scenario_arguments.rs b/profusion-macros/src/scenario_arguments.rs new file mode 100644 index 0000000..2626541 --- /dev/null +++ b/profusion-macros/src/scenario_arguments.rs @@ -0,0 +1,99 @@ +use quote::format_ident; +use syn::{Error, Ident, Token}; +use syn::parse::{Parse, ParseStream}; + +#[derive(Debug)] +pub(crate) struct ScenarioArguments { + name: Ident, + builder_name: Ident, +} + +impl Parse for ScenarioArguments { + fn parse(input: ParseStream) -> syn::Result { + let builder_name = input + .parse::() + .map_err(|e| Error::new(e.span(), "missing struct name for scenario"))?; + + let mut name = format_ident!("{builder_name}Scenario"); + + if input.peek(Token![,]) { + input.parse::()?; + name = input.parse::()?; + } + + if !input.is_empty() { + return Err(Error::new( + input.span(), + "There should be only two attributes", + )); + } + + Ok(Self { name, builder_name }) + } +} + +impl ScenarioArguments { + pub(crate) fn name(&self) -> &Ident { + &self.name + } + + pub(crate) fn builder_name(&self) -> &Ident { + &self.builder_name + } +} + +#[cfg(test)] +mod test { + use quote::quote; + use syn::parse2; + + use super::*; + + #[test] + fn empty_arguments_return_error() { + let arguments = parse2::(quote! {}); + + assert!(arguments.is_err()); + } + + #[test] + fn missing_second_struct_name_gets_gets_handled() { + let arguments = parse2::(quote! { ItemName, }); + assert!(arguments.is_err()); + } + + #[test] + fn too_many_arguments_result_in_error() { + let arguments = parse2::(quote! { ItemName, ItemNameBuilder, TooMuch }); + + assert!(arguments.is_err()); + print!("{}", arguments.unwrap_err().to_string()); + } + + #[test] + fn creates_args_with_default_builder_suffix() { + let arguments = parse2::(quote! { ItemName }).unwrap(); + assert_eq!( + arguments.builder_name, + Ident::new("ItemName", arguments.builder_name.span()) + ); + assert_eq!( + arguments.name, + Ident::new("ItemNameScenario", arguments.name.span()) + ); + } + + #[test] + fn creates_args_with_custom_scenario_suffix() { + let arguments = + parse2::(quote! { ItemName, ItemNameCustomScenario }).unwrap(); + assert_eq!( + arguments.name, + Ident::new("ItemNameCustomScenario", arguments.name.span()) + ); + assert_eq!( + arguments.builder_name, + Ident::new("ItemName", arguments.builder_name.span()) + ); + } +} diff --git a/profusion-tests/Cargo.toml b/profusion-tests/Cargo.toml new file mode 100644 index 0000000..dffd2e3 --- /dev/null +++ b/profusion-tests/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "profusion-tests" +version = "0.1.0" +authors = ["Ivan Chepurnyi "] +edition = "2021" +repository = "https://github.com/EcomDev/profusion-rs" +license = "MIT" +rust-version = "1.77.0" +publish = false + +[dependencies] +profusion = { version = "0.1.0", path = "../profusion", features = ["full"] } +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } \ No newline at end of file diff --git a/profusion-tests/src/lib.rs b/profusion-tests/src/lib.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/profusion-tests/src/lib.rs @@ -0,0 +1 @@ + diff --git a/profusion-tests/tests/macro_scenario.rs b/profusion-tests/tests/macro_scenario.rs new file mode 100644 index 0000000..b77dd0a --- /dev/null +++ b/profusion-tests/tests/macro_scenario.rs @@ -0,0 +1,44 @@ +use std::time::Duration; + +use profusion::prelude::*; +use profusion::scenario; + +#[derive(Default)] +struct State(usize); + +impl State { + fn incr(&mut self) { + self.0 += 1; + } + + fn value(&self) -> u64 { + self.0 as u64 + } +} + +#[scenario(LoadTestOne)] +async fn load_test_items( + reporter: &mut MetricMeasurer>, +) -> Result<(), MetricRecordError> { + reporter.measure("load_test_one", async { Ok(()) }).await? +} + +#[scenario(LoadTestTwo)] +async fn load_test_items_with_state( + reporter: &mut MetricMeasurer>, + state_one: &mut State, +) -> Result<(), MetricRecordError> { + reporter.add_measurement( + "load_test_one::one", + Duration::from_millis(state_one.value()), + None, + ); + reporter.measure("load_test_two", async { Ok(state_one.incr()) }).await? +} + +#[tokio::test] +async fn stateless_scenario() -> Result<(), MetricRecordError> { + let item = LoadTestOne; + + Ok(()) +} diff --git a/profusion/CHANGELOG.md b/profusion/CHANGELOG.md new file mode 100644 index 0000000..c46dcbe --- /dev/null +++ b/profusion/CHANGELOG.md @@ -0,0 +1,12 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +### Added + +[Unreleased]: https://github.com/EcomDev/profusion-rs/compare/3077010...HEAD \ No newline at end of file diff --git a/profusion/Cargo.toml b/profusion/Cargo.toml new file mode 100644 index 0000000..4816f9b --- /dev/null +++ b/profusion/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "profusion" +version = "0.1.0" +authors = ["Ivan Chepurnyi "] +edition = "2021" +repository = "https://github.com/EcomDev/profusion-rs" +license = "MIT" +rust-version = "1.77.0" + +[lib] +name = "profusion" +crate-type = ["lib"] + +[dependencies] +tokio = { version = "1", features = ["rt", "time", "macros", "test-util"] } +pin-project-lite = "0.2" +hdrhistogram = "7" +thiserror = "1" +rustc-hash = "1.1.0" +trait-variant = "0.1.2" +tracing = "0.1.40" +serde = { version = "1", features = ["derive"], optional = true } +profusion-macros = { version = "~0.1.0", path = "../profusion-macros", optional = true } + +[dev-dependencies] +loom = "0.7" +itertools = "0.12" +criterion = { version = "0.5", features = ["default", "async_tokio"] } +tokio = { version = "1", features = ["rt", "time", "macros", "rt-multi-thread", "test-util"] } +serde_test = { version = "1" } + +[[bench]] +name = "aggregator_storage" +harness = false + +[features] +test_util = [] +macros = ["profusion-macros"] +full = ["test_util", "macros"] + +[package.metadata.docs.rs] +all-features = true \ No newline at end of file diff --git a/benches/aggregator_storage.rs b/profusion/benches/aggregator_storage.rs similarity index 100% rename from benches/aggregator_storage.rs rename to profusion/benches/aggregator_storage.rs diff --git a/src/aggregate/mod.rs b/profusion/src/aggregate/mod.rs similarity index 100% rename from src/aggregate/mod.rs rename to profusion/src/aggregate/mod.rs diff --git a/src/aggregate/scale.rs b/profusion/src/aggregate/scale.rs similarity index 100% rename from src/aggregate/scale.rs rename to profusion/src/aggregate/scale.rs diff --git a/src/aggregate/settings.rs b/profusion/src/aggregate/settings.rs similarity index 100% rename from src/aggregate/settings.rs rename to profusion/src/aggregate/settings.rs diff --git a/src/aggregate/storage/combined.rs b/profusion/src/aggregate/storage/combined.rs similarity index 70% rename from src/aggregate/storage/combined.rs rename to profusion/src/aggregate/storage/combined.rs index 757448e..0c5ac23 100644 --- a/src/aggregate/storage/combined.rs +++ b/profusion/src/aggregate/storage/combined.rs @@ -1,23 +1,21 @@ use crate::aggregate::AggregateStorage; -use crate::metric::Metric; pub struct CombinedAggregateStorage(L, R); -impl Clone for CombinedAggregateStorage +impl Clone for CombinedAggregateStorage where - L: AggregateStorage, - R: AggregateStorage, + L: AggregateStorage, + R: AggregateStorage, { fn clone(&self) -> Self { Self(self.0.clone(), self.1.clone()) } } -impl CombinedAggregateStorage +impl CombinedAggregateStorage where - L: AggregateStorage, - R: AggregateStorage, - T: Metric, + L: AggregateStorage, + R: AggregateStorage, { pub fn new(left: L, right: R) -> Self { Self(left, right) @@ -28,24 +26,22 @@ where } } -impl Default for CombinedAggregateStorage +impl Default for CombinedAggregateStorage where - L: AggregateStorage + Default, - R: AggregateStorage + Default, - T: Metric, + L: AggregateStorage + Default, + R: AggregateStorage + Default, { fn default() -> Self { Self(L::default(), R::default()) } } -impl AggregateStorage for CombinedAggregateStorage +impl AggregateStorage for CombinedAggregateStorage where - L: AggregateStorage, - R: AggregateStorage, - T: Metric, + L: AggregateStorage, + R: AggregateStorage, { - type Metric = T; + type Metric = L::Metric; #[inline] fn record(&mut self, metric: Self::Metric, latency_value: u64) { @@ -60,9 +56,7 @@ where #[cfg(test)] mod tests { - use crate::aggregate::storage::{AggregateStorage, TotalAggregateStorage}; - - use super::*; + use crate::prelude::*; #[derive(Eq, PartialEq, Hash, Copy, Clone)] enum TestMetric { @@ -78,8 +72,7 @@ mod tests { #[test] fn stores_values_into_each_storage() { - let mut storage = - TotalAggregateStorage::default().and(TotalAggregateStorage::default()); + let mut storage = TotalAggregateStorage::default().and(TotalAggregateStorage::default()); storage.record(TestMetric::One, 100); storage.record(TestMetric::Two, 20); @@ -101,8 +94,7 @@ mod tests { #[test] fn merges_from_all_storages() { - let mut one = - TotalAggregateStorage::default().and(TotalAggregateStorage::default()); + let mut one = TotalAggregateStorage::default().and(TotalAggregateStorage::default()); let (mut two, three) = (one.clone(), one.clone()); diff --git a/src/aggregate/storage/hashmap.rs b/profusion/src/aggregate/storage/metric.rs similarity index 90% rename from src/aggregate/storage/hashmap.rs rename to profusion/src/aggregate/storage/metric.rs index bad15ed..56ee005 100644 --- a/src/aggregate/storage/hashmap.rs +++ b/profusion/src/aggregate/storage/metric.rs @@ -1,3 +1,5 @@ +use std::fmt::{Debug, Formatter}; + use hdrhistogram::{CreationError, Histogram}; pub use rustc_hash::FxHashMap; use tracing::error; @@ -10,6 +12,17 @@ pub struct MetricAggregateStorage { proto: Histogram, } +impl Debug for MetricAggregateStorage +where + T: Metric + Debug, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str("MetricAggregateStorage { storage: {")?; + self.inner.iter().fmt(f)?; + f.write_str("} }") + } +} + impl Default for MetricAggregateStorage where T: Metric + Send, @@ -44,8 +57,8 @@ where }) } - pub fn value(&self, metric: T) -> &Histogram { - &self.inner.get(&metric).unwrap_or(&self.proto) + pub(crate) fn value(&self, metric: T) -> &Histogram { + self.inner.get(&metric).unwrap_or(&self.proto) } } @@ -106,7 +119,10 @@ mod tests { impl Metric for TestMetric { fn name(&self) -> &str { - "metric" + match self { + Self::One => "metric_one", + Self::Two => "metric_two", + } } } diff --git a/src/aggregate/storage/mod.rs b/profusion/src/aggregate/storage/mod.rs similarity index 91% rename from src/aggregate/storage/mod.rs rename to profusion/src/aggregate/storage/mod.rs index 1521ecd..da23a87 100644 --- a/src/aggregate/storage/mod.rs +++ b/profusion/src/aggregate/storage/mod.rs @@ -1,11 +1,11 @@ pub use combined::*; -pub use hashmap::*; +pub use metric::*; pub use total::*; use crate::metric::Metric; mod combined; -mod hashmap; +mod metric; mod total; /// Storage for aggregation of metric values @@ -27,6 +27,7 @@ pub trait AggregateStorage: Clone + Default + Send { /// * `other`: other storage of the same type fn merge(self, other: Self) -> Self; + /// Chains another storage to store metric values in fn and(self, other: O) -> CombinedAggregateStorage where O: AggregateStorage, diff --git a/src/aggregate/storage/total.rs b/profusion/src/aggregate/storage/total.rs similarity index 92% rename from src/aggregate/storage/total.rs rename to profusion/src/aggregate/storage/total.rs index 917890b..7549277 100644 --- a/src/aggregate/storage/total.rs +++ b/profusion/src/aggregate/storage/total.rs @@ -48,8 +48,8 @@ where }) } - pub fn value(self) -> Histogram { - self.inner + pub fn value(&self) -> &Histogram { + &self.inner } } @@ -137,12 +137,12 @@ mod tests { two.record(TestMetric::Two, 50); two.record(TestMetric::One, 50); - let value = three.merge(two.merge(one)).value(); + let merged = three.merge(two.merge(one)); - assert_eq!(value.len(), 6); - assert_eq!(value.max(), 200); - assert_eq!(value.min(), 20); - assert_eq!(value.count_at(50), 3) + assert_eq!(merged.value().len(), 6); + assert_eq!(merged.value().max(), 200); + assert_eq!(merged.value().min(), 20); + assert_eq!(merged.value().count_at(50), 3) } #[test] diff --git a/src/aggregate/test_aggregate.rs b/profusion/src/aggregate/test_aggregate.rs similarity index 100% rename from src/aggregate/test_aggregate.rs rename to profusion/src/aggregate/test_aggregate.rs diff --git a/profusion/src/aggregate/timeline/aggregate.rs b/profusion/src/aggregate/timeline/aggregate.rs new file mode 100644 index 0000000..32bf0a2 --- /dev/null +++ b/profusion/src/aggregate/timeline/aggregate.rs @@ -0,0 +1,505 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Duration; + +use crate::metric::MetricRecordError; +use crate::prelude::*; + +struct Counter(Arc); + +impl Counter { + fn new() -> Self { + Self(Arc::new(AtomicUsize::new(0))) + } + + fn increment(&self) { + self.0.fetch_add(1, Ordering::Relaxed); + } + + fn decrement(&self) { + self.0.fetch_sub(1, Ordering::Relaxed); + } + + fn current(&self) -> usize { + self.0.load(Ordering::Relaxed) + } +} + +impl Clone for Counter { + fn clone(&self) -> Self { + self.increment(); + Self(self.0.clone()) + } +} + +impl Drop for Counter { + fn drop(&mut self) { + self.decrement() + } +} + +pub struct TimelineAggregateBuilder { + settings: AggregateSettings, + storage: S, + users: Counter, +} + +pub struct TimelineAggregate { + settings: AggregateSettings, + timeline: Vec>, + storage: S, + total: TimelineItem, + users: Counter, +} + +impl TimelineAggregateBuilder +where + S: AggregateStorage, + S::Metric: Sync, +{ + pub fn new(storage: S) -> Self { + Self::with_settings(storage, AggregateSettings::default()) + } + + pub fn with_settings(storage: S, settings: AggregateSettings) -> Self { + Self { + settings, + storage, + users: Counter::new(), + } + } +} + +impl MetricAggregateBuilder for TimelineAggregateBuilder +where + S: AggregateStorage, + S::Metric: Sync, +{ + type Reporter = TimelineAggregate; + + fn build(&self) -> Self::Reporter { + TimelineAggregate { + timeline: Vec::new(), + storage: self.storage.clone(), + total: TimelineItem::new( + self.settings.zero().window(self.settings.window()), + self.storage.clone(), + 0, + 0, + ), + settings: self.settings, + users: self.users.clone(), + } + } +} + +impl MetricAggregate for TimelineAggregate +where + S: AggregateStorage, +{ + type Metric = S::Metric; + + fn add_entry( + &mut self, + metric: Self::Metric, + latency: Duration, + error: Option<&MetricRecordError>, + ) { + let time_window = self.settings.zero().window(self.settings.window()); + let item = match self.timeline.last_mut() { + Some(item) if item.time().eq(&time_window) => item, + _ => { + let position = self.timeline.len(); + self.timeline.push(TimelineItem::new( + time_window, + self.storage.clone(), + 0, + 0, + )); + &mut self.timeline[position] + } + }; + + let latency = self.settings.scale().duration_to_value(latency); + item.record(metric, latency); + item.update_counters(error, self.users.current()); + self.total.record(metric, latency); + self.total.update_counters(error, self.users.current()); + } + + fn merge_into(self, other: &mut Self) { + for item in self.timeline.into_iter() { + match other.timeline.binary_search(&item) { + Ok(position) => { + item.merge_into(&mut other.timeline[position]); + } + Err(position) => other.timeline.insert(position, item), + } + } + } +} + +impl TimelineAggregate +where + S: AggregateStorage, +{ + pub fn flush(self) -> (TimelineItem, Vec>) { + (self.total, self.timeline) + } +} + +impl TimelineAggregate> +where + L: AggregateStorage, + R: AggregateStorage, +{ + pub fn split(self) -> (TimelineAggregate, TimelineAggregate) { + let (left_storage, right_storage) = self.storage.unwrap(); + let (left_total, right_total) = self.total.split(); + + let (left_timeline, right_timeline) = + self.timeline.into_iter().map(|item| item.split()).unzip(); + + ( + TimelineAggregate { + users: self.users.clone(), + settings: self.settings, + total: left_total, + storage: left_storage, + timeline: left_timeline, + }, + TimelineAggregate { + users: self.users.clone(), + settings: self.settings, + total: right_total, + storage: right_storage, + timeline: right_timeline, + }, + ) + } +} + +#[cfg(test)] +mod tests { + use tokio::time::advance; + + use super::*; + + #[derive(Ord, PartialOrd, Eq, PartialEq, Hash, Copy, Clone)] + enum ReportMetric { + One, + Two, + } + + impl Metric for ReportMetric { + fn name(&self) -> &str { + "report_metric" + } + } + + enum Action { + Wait(Duration), + Add(ReportMetric, Duration), + Error(ReportMetric, Duration, MetricRecordError), + } + + async fn populate_test_metric( + reporter: &mut TimelineAggregate, + metrics: Vec, + ) where + S: AggregateStorage, + { + for action in metrics { + match action { + Action::Wait(duration) => advance(duration).await, + Action::Add(metric, latency) => reporter.add_entry(metric, latency, None), + Action::Error(metric, latency, error) => { + reporter.add_entry(metric, latency, Some(&error)) + } + } + } + } + + #[tokio::test(start_paused = true)] + async fn aggregates_values_per_each_time_window() { + let builder = TimelineAggregateBuilder::with_settings( + MetricAggregateStorage::default(), + AggregateSettings::default() + .with_window(Duration::from_millis(100)) + .with_scale(AggregateScale::Milliseconds), + ); + let mut reporter = builder.build(); + + populate_test_metric( + &mut reporter, + vec![ + Action::Add(ReportMetric::One, Duration::from_millis(10)), + Action::Error( + ReportMetric::Two, + Duration::from_millis(10), + MetricRecordError::Timeout(Duration::from_millis(10)), + ), + Action::Wait(Duration::from_millis(200)), + Action::Add(ReportMetric::Two, Duration::from_millis(20)), + Action::Wait(Duration::from_millis(51)), + Action::Add(ReportMetric::One, Duration::from_millis(40)), + Action::Wait(Duration::from_millis(151)), + Action::Add(ReportMetric::One, Duration::from_millis(60)), + ], + ) + .await; + + verify_timeline( + vec![ + (Duration::from_millis(0), (10, 10), 1, 1), + (Duration::from_millis(200), (0, 20), 0, 1), + (Duration::from_millis(300), (40, 0), 0, 1), + (Duration::from_millis(400), (60, 0), 0, 1), + ], + reporter.flush().1, + ); + } + + #[tokio::test(start_paused = true)] + async fn merges_aggregated_values_per_each_time_window() { + let builder = TimelineAggregateBuilder::with_settings( + MetricAggregateStorage::default(), + AggregateSettings::default() + .with_window(Duration::from_millis(100)) + .with_scale(AggregateScale::Milliseconds), + ); + + let mut reporter_one = builder.build(); + let mut reporter_two = builder.build(); + + populate_test_metric( + &mut reporter_one, + vec![Action::Add(ReportMetric::One, Duration::from_millis(10))], + ) + .await; + + populate_test_metric( + &mut reporter_two, + vec![Action::Error( + ReportMetric::Two, + Duration::from_millis(10), + MetricRecordError::Timeout(Duration::from_millis(10)), + )], + ) + .await; + + populate_test_metric( + &mut reporter_one, + vec![ + Action::Wait(Duration::from_millis(200)), + Action::Add(ReportMetric::Two, Duration::from_millis(20)), + ], + ) + .await; + + populate_test_metric( + &mut reporter_one, + vec![ + Action::Wait(Duration::from_millis(51)), + Action::Add(ReportMetric::One, Duration::from_millis(40)), + ], + ) + .await; + + let mut aggregated = builder.build(); + reporter_one.merge_into(&mut aggregated); + reporter_two.merge_into(&mut aggregated); + + verify_timeline( + vec![ + (Duration::from_millis(0), (10, 10), 1, 2), + (Duration::from_millis(200), (0, 20), 0, 2), + (Duration::from_millis(300), (40, 0), 0, 2), + ], + aggregated.flush().1, + ); + } + + #[tokio::test(start_paused = true)] + async fn allows_splitting_chained_storage_in_timeline() { + let builder = TimelineAggregateBuilder::with_settings( + MetricAggregateStorage::default() + .and(MetricAggregateStorage::with_sigfig(1).unwrap()), + AggregateSettings::default() + .with_window(Duration::from_millis(100)) + .with_scale(AggregateScale::Microseconds), + ); + + let mut aggregate = builder.build(); + + populate_test_metric( + &mut aggregate, + vec![ + Action::Add(ReportMetric::One, Duration::from_millis(10)), + Action::Add(ReportMetric::Two, Duration::from_millis(20)), + ], + ) + .await; + + let (one, two) = aggregate.split(); + + verify_timeline( + vec![(Duration::new(0, 0), (10000, 20000), 0, 1)], + one.flush().1, + ); + verify_timeline( + vec![(Duration::new(0, 0), (9728, 19456), 0, 1)], + two.flush().1, + ); + } + + #[tokio::test(start_paused = true)] + async fn collects_total_values_for_all_timeline() { + let mut aggregate = TimelineAggregateBuilder::with_settings( + MetricAggregateStorage::default(), + AggregateSettings::default() + .with_window(Duration::from_millis(100)) + .with_scale(AggregateScale::Microseconds), + ) + .build(); + + populate_test_metric( + &mut aggregate, + vec![ + Action::Add(ReportMetric::One, Duration::from_millis(10)), + Action::Add(ReportMetric::Two, Duration::from_millis(20)), + Action::Wait(Duration::from_millis(200)), + Action::Add(ReportMetric::One, Duration::from_millis(100)), + Action::Add(ReportMetric::Two, Duration::from_millis(230)), + Action::Wait(Duration::from_millis(100)), + Action::Add(ReportMetric::One, Duration::from_millis(400)), + Action::Add(ReportMetric::Two, Duration::from_millis(230)), + Action::Wait(Duration::from_millis(100)), + Action::Add(ReportMetric::One, Duration::from_millis(100)), + Action::Add(ReportMetric::Two, Duration::from_millis(2030)), + ], + ) + .await; + + let storage = aggregate.flush().0; + assert_eq!(storage.storage().value(ReportMetric::One).len(), 4); + assert_eq!(storage.storage().value(ReportMetric::Two).len(), 4); + + assert_eq!(storage.storage().value(ReportMetric::One).min(), 10000); + assert_eq!(storage.storage().value(ReportMetric::Two).min(), 20000); + } + + #[tokio::test(start_paused = true)] + async fn counts_errors_and_users_in_total() { + let builder = TimelineAggregateBuilder::with_settings( + MetricAggregateStorage::default(), + AggregateSettings::default() + .with_window(Duration::from_millis(100)) + .with_scale(AggregateScale::Microseconds), + ); + let mut aggregate = builder.build(); + let (_other, _another) = (builder.build(), builder.build()); + + populate_test_metric( + &mut aggregate, + vec![ + Action::Error( + ReportMetric::One, + Duration::from_millis(10), + MetricRecordError::Timeout(Duration::from_millis(10)), + ), + Action::Wait(Duration::from_millis(200)), + Action::Add(ReportMetric::Two, Duration::from_millis(230)), + ], + ) + .await; + + let total = aggregate.flush().0; + assert_eq!(total.users(), 3); + assert_eq!(total.errors(), 1); + } + + #[tokio::test(start_paused = true)] + async fn reduces_users_count_on_removal_of_aggregators() { + let builder = TimelineAggregateBuilder::with_settings( + MetricAggregateStorage::default(), + AggregateSettings::default() + .with_window(Duration::from_millis(100)) + .with_scale(AggregateScale::Milliseconds), + ); + let mut aggregate = builder.build(); + let (other, _another) = (builder.build(), builder.build()); + + populate_test_metric( + &mut aggregate, + vec![ + Action::Add(ReportMetric::One, Duration::from_millis(10)), + Action::Wait(Duration::from_millis(200)), + Action::Add(ReportMetric::Two, Duration::from_millis(230)), + ], + ) + .await; + + drop(other); + populate_test_metric( + &mut aggregate, + vec![ + Action::Wait(Duration::from_millis(200)), + Action::Add(ReportMetric::Two, Duration::from_millis(230)), + ], + ) + .await; + + verify_timeline( + vec![ + (Duration::from_millis(0), (10, 0), 0, 3), + (Duration::from_millis(200), (0, 230), 0, 3), + (Duration::from_millis(400), (0, 230), 0, 2), + ], + aggregate.flush().1, + ); + } + + fn verify_timeline( + expected_values: Vec<(Duration, (u64, u64), usize, usize)>, + result: Vec>>, + ) { + assert_eq!( + result.len(), + expected_values.len(), + "Number of timeline values does not match" + ); + for ( + item, + ( + expected_time, + (expected_metric_one, expected_metric_two), + expected_errors, + expected_users, + ), + ) in result.into_iter().zip(expected_values) + { + assert_eq!(*item.time(), expected_time, "Time window does not match"); + assert_eq!( + item.storage().value(ReportMetric::One).min(), + expected_metric_one, + "Minimum metric one does not match" + ); + assert_eq!( + item.storage().value(ReportMetric::Two).min(), + expected_metric_two, + "Minimum metric two does not match" + ); + assert_eq!( + item.errors(), + expected_errors, + "Number of errors does not match" + ); + assert_eq!( + item.users(), + expected_users, + "Number of users does not match" + ); + } + } +} diff --git a/profusion/src/aggregate/timeline/item.rs b/profusion/src/aggregate/timeline/item.rs new file mode 100644 index 0000000..e76bf41 --- /dev/null +++ b/profusion/src/aggregate/timeline/item.rs @@ -0,0 +1,140 @@ +/* + * Copyright © 2024. EcomDev B.V. + * All rights reserved. + * See LICENSE for license details. + */ + +use std::cmp::max; +use std::time::Duration; + +use crate::aggregate::{AggregateStorage, CombinedAggregateStorage}; +use crate::metric::MetricRecordError; + +#[derive(Debug)] +pub struct TimelineItem { + time: Duration, + storage: S, + errors: usize, + users: usize, +} + +impl Eq for TimelineItem {} + +impl PartialEq for TimelineItem { + fn eq(&self, other: &Self) -> bool { + self.time.eq(&other.time) + } +} + +impl PartialOrd for TimelineItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for TimelineItem { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.time.cmp(&other.time) + } +} + +impl TimelineItem +where + S: AggregateStorage, +{ + pub(crate) fn new(time: Duration, storage: S, errors: usize, users: usize) -> Self { + Self { + time, + storage, + errors, + users, + } + } + + pub fn time(&self) -> &Duration { + &self.time + } + + pub(crate) fn storage(&self) -> &S { + &self.storage + } + + pub fn errors(&self) -> usize { + self.errors + } + + pub fn users(&self) -> usize { + self.users + } + + pub(crate) fn record(&mut self, metric: S::Metric, value: u64) { + self.storage.record(metric, value) + } + + pub(crate) fn update_counters( + &mut self, + error: Option<&MetricRecordError>, + users: usize, + ) { + if error.is_some() { + self.errors += 1; + } + + self.users = users + } + + pub(crate) fn merge_into(self, other: &mut Self) { + let storage = std::mem::take(&mut other.storage); + other.storage = storage.merge(self.storage); + other.users = max(other.users, self.users); + other.errors += self.errors; + } +} + +impl TimelineItem> +where + L: AggregateStorage, + R: AggregateStorage, +{ + pub fn split(self) -> (TimelineItem, TimelineItem) { + let (left_storage, right_storage) = self.storage.unwrap(); + ( + TimelineItem { + time: self.time, + storage: left_storage, + errors: self.errors, + users: self.users, + }, + TimelineItem { + storage: right_storage, + time: self.time, + errors: self.errors, + users: self.users, + }, + ) + } +} + +#[cfg(test)] +mod tests { + use crate::prelude::*; + + use super::*; + + #[test] + fn splits_into_multiple_storages() { + let mut item = TimelineItem::new( + Duration::from_millis(10), + MetricAggregateStorage::default().and(TotalAggregateStorage::default()), + 0, + 1, + ); + + item.record("one", 100); + + let (left, right) = item.split(); + + assert_eq!(left.storage().value("one").max(), 100); + assert_eq!(right.storage().value().max(), 100); + } +} diff --git a/profusion/src/aggregate/timeline/metric.rs b/profusion/src/aggregate/timeline/metric.rs new file mode 100644 index 0000000..6489d3e --- /dev/null +++ b/profusion/src/aggregate/timeline/metric.rs @@ -0,0 +1,143 @@ +use crate::aggregate::MetricAggregateStorage; +use crate::metric::Metric; + +use super::TimelineItem; + +impl TimelineItem> +where + T: Metric + Send, +{ + pub fn min_value(&self, metric: T) -> u64 { + self.storage().value(metric).min() + } + + pub fn max_value(&self, metric: T) -> u64 { + self.storage().value(metric).max() + } + + pub fn mean_value(&self, metric: T) -> f64 { + self.storage().value(metric).mean() + } + + pub fn percentile_value>(&self, metric: T, percentile: P) -> u64 { + self.storage().value(metric).value_at_percentile(percentile.into()) + } + + pub fn histogram(&self, metric: T) -> Vec<(u64, f64, u64)> { + let histogram = self.storage().value(metric); + let total_counts = histogram.len(); + histogram + .iter_log(1, 2.0) + .map(|value| { + ( + value.value_iterated_to() + 1, + (value.count_since_last_iteration() as f64 / total_counts as f64) * 100.0, + value.count_since_last_iteration(), + ) + }) + .collect::>() + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + + fn populate_timeline_item() -> TimelineItem> { + let mut item = TimelineItem::new( + Duration::from_millis(10), + MetricAggregateStorage::default(), + 0, + 1, + ); + + item.record("one", 50); + item.record("two", 100); + item.record("one", 200); + item.record("one", 400); + item.record("one", 800); + item.record("two", 1600); + item.record("one", 3200); + item.record("one", 6400); + item.record("one", 6400); + item.record("one", 6400); + item.record("one", 6400); + item.record("one", 12800); + item + } + + #[test] + fn calculates_minimum_per_metric() { + let item = populate_timeline_item(); + assert_eq!(item.min_value("one"), 50); + assert_eq!(item.min_value("two"), 100); + } + + #[test] + fn calculates_maximum_per_metric() { + let item = populate_timeline_item(); + + assert_eq!(item.max_value("one"), 12807); + assert_eq!(item.max_value("two"), 1600); + } + + #[test] + fn calculates_mean_per_metric() { + let item = populate_timeline_item(); + assert_eq!(item.mean_value("one"), 4306.3); + assert_eq!(item.mean_value("two"), 850.0); + } + + #[test] + fn calculates_percentiles_per_metric() { + let item = populate_timeline_item(); + assert_eq!(item.percentile_value("one", 90), 6403); + assert_eq!(item.percentile_value("one", 50), 3201); + assert_eq!(item.percentile_value("two", 90), 1600); + assert_eq!(item.percentile_value("two", 50), 100); + } + + #[test] + fn returns_log_histogram_per_metric() { + let item = populate_timeline_item(); + assert_eq!( + item.histogram("one"), + vec![ + (1, 0.0, 0), + (2, 0.0, 0), + (4, 0.0, 0), + (8, 0.0, 0), + (16, 0.0, 0), + (32, 0.0, 0), + (64, 10.0, 1), + (128, 0.0, 0), + (256, 10.0, 1), + (512, 10.0, 1), + (1024, 10.0, 1), + (2048, 0.0, 0), + (4096, 10.0, 1), + (8192, 40.0, 4), + (16384, 10.0, 1) + ] + ); + assert_eq!( + item.histogram("two"), + vec![ + (1, 0.0, 0), + (2, 0.0, 0), + (4, 0.0, 0), + (8, 0.0, 0), + (16, 0.0, 0), + (32, 0.0, 0), + (64, 0.0, 0), + (128, 50.0, 1), + (256, 0.0, 0), + (512, 0.0, 0), + (1024, 0.0, 0), + (2048, 50.0, 1) + ] + ); + } +} diff --git a/profusion/src/aggregate/timeline/mod.rs b/profusion/src/aggregate/timeline/mod.rs new file mode 100644 index 0000000..c608efe --- /dev/null +++ b/profusion/src/aggregate/timeline/mod.rs @@ -0,0 +1,8 @@ +pub use aggregate::*; +pub use item::*; + +mod item; +mod metric; + +mod aggregate; +mod total; diff --git a/profusion/src/aggregate/timeline/total.rs b/profusion/src/aggregate/timeline/total.rs new file mode 100644 index 0000000..07a00cf --- /dev/null +++ b/profusion/src/aggregate/timeline/total.rs @@ -0,0 +1,121 @@ +use crate::metric::Metric; +use crate::prelude::TotalAggregateStorage; + +use super::TimelineItem; + +impl TimelineItem> +where + T: Metric + Send, +{ + pub fn min_value(&self) -> u64 { + self.storage().value().min() + } + + pub fn max_value(&self) -> u64 { + self.storage().value().max() + } + + pub fn mean_value(&self) -> f64 { + self.storage().value().mean() + } + + pub fn percentile_value>(&self, percentile: P) -> u64 { + self.storage().value().value_at_percentile(percentile.into()) + } + + pub fn histogram(&self) -> Vec<(u64, f64, u64)> { + let histogram = self.storage().value(); + let total_counts = histogram.len(); + histogram + .iter_log(1, 2.0) + .map(|value| { + ( + value.value_iterated_to() + 1, + (value.count_since_last_iteration() as f64 / total_counts as f64) * 100.0, + value.count_since_last_iteration(), + ) + }) + .collect::>() + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + + fn populate_timeline_item() -> TimelineItem> { + let mut item = TimelineItem::new( + Duration::from_millis(10), + TotalAggregateStorage::default(), + 0, + 1, + ); + + item.record("one", 50); + item.record("two", 100); + item.record("one", 200); + item.record("one", 400); + item.record("one", 800); + item.record("two", 1600); + item.record("one", 3200); + item.record("one", 6400); + item.record("one", 6400); + item.record("one", 6400); + item.record("one", 6400); + item.record("one", 12800); + item + } + + #[test] + fn calculates_minimum_per_metric() { + let item = populate_timeline_item(); + assert_eq!(item.min_value(), 50); + } + + #[test] + fn calculates_maximum_per_metric() { + let item = populate_timeline_item(); + + assert_eq!(item.max_value(), 12807); + } + + #[test] + fn calculates_mean_per_metric() { + let item = populate_timeline_item(); + assert_eq!(item.mean_value(), 3730.25); + } + + #[test] + fn calculates_percentiles_per_metric() { + let item = populate_timeline_item(); + assert_eq!(item.percentile_value(90), 6403); + assert_eq!(item.percentile_value(50), 1600); + } + + #[test] + fn returns_log_histogram_per_metric() { + let item = populate_timeline_item(); + assert_eq!( + item.histogram(), + vec![ + (1, 0.0, 0), + (2, 0.0, 0), + (4, 0.0, 0), + (8, 0.0, 0), + (16, 0.0, 0), + (32, 0.0, 0), + (64, 8.333333333333332, 1), + (128, 8.333333333333332, 1), + (256, 8.333333333333332, 1), + (512, 8.333333333333332, 1), + (1024, 8.333333333333332, 1), + (2048, 8.333333333333332, 1), + (4096, 8.333333333333332, 1), + (8192, 33.33333333333333, 4), + (16384, 8.333333333333332, 1) + ] + ); + } +} diff --git a/src/lib.rs b/profusion/src/lib.rs similarity index 82% rename from src/lib.rs rename to profusion/src/lib.rs index 45f635b..42d4a9b 100644 --- a/src/lib.rs +++ b/profusion/src/lib.rs @@ -1,5 +1,8 @@ //- +#[cfg(feature = "macros")] +pub use profusion_macros::*; + pub mod aggregate; pub mod measurer; pub mod metric; diff --git a/src/measurer.rs b/profusion/src/measurer.rs similarity index 100% rename from src/measurer.rs rename to profusion/src/measurer.rs diff --git a/src/metric/error.rs b/profusion/src/metric/error.rs similarity index 100% rename from src/metric/error.rs rename to profusion/src/metric/error.rs diff --git a/src/metric/mod.rs b/profusion/src/metric/mod.rs similarity index 100% rename from src/metric/mod.rs rename to profusion/src/metric/mod.rs diff --git a/profusion/src/scenario/mod.rs b/profusion/src/scenario/mod.rs new file mode 100644 index 0000000..c1e130b --- /dev/null +++ b/profusion/src/scenario/mod.rs @@ -0,0 +1,49 @@ +use crate::aggregate::MetricAggregate; +use crate::measurer::MetricMeasurer; +use crate::metric::MetricRecordError; +use crate::prelude::Metric; + +pub trait ScenarioBuilder +where + T: Metric, +{ + type Scenario: Scenario; + + fn build(&self) -> Self::Scenario; +} + +#[allow(async_fn_in_trait)] +pub trait Scenario +where + T: Metric, +{ + async fn execute( + &mut self, + aggregate: &mut MetricMeasurer>, + ) -> Result<(), MetricRecordError>; +} + +#[cfg(test)] +mod tests { + use super::*; + + struct TestScenario; + + #[derive(Eq, PartialEq, Hash, Clone, Copy)] + struct TestMetric; + + impl Metric for TestMetric { + fn name(&self) -> &str { + "test" + } + } + + impl Scenario for TestScenario { + async fn execute( + &mut self, + aggregate: &mut MetricMeasurer>, + ) -> Result<(), MetricRecordError> { + aggregate.measure(TestMetric, async {}).await + } + } +} diff --git a/src/start_time.rs b/profusion/src/start_time.rs similarity index 100% rename from src/start_time.rs rename to profusion/src/start_time.rs diff --git a/rustfmt.toml b/rustfmt.toml index 42a5768..e676bc1 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,4 +1,9 @@ -max_width = 90 +max_width = 100 +array_width = 80 +attr_fn_like_width = 90 +chain_width = 80 +comment_width = 90 format_strings = true indent_style = "Block" +reorder_imports = true imports_granularity = "Crate" \ No newline at end of file diff --git a/src/aggregate/timeline.rs b/src/aggregate/timeline.rs deleted file mode 100644 index b30d2fc..0000000 --- a/src/aggregate/timeline.rs +++ /dev/null @@ -1,320 +0,0 @@ -use std::cmp::max; -use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::Duration; - -use crate::aggregate::{ - AggregateSettings, AggregateStorage, MetricAggregate, MetricAggregateBuilder, -}; -use crate::metric::MetricRecordError; - -pub struct TimelineItem { - time: Duration, - storage: S, - errors: usize, - users: usize, -} - -impl Eq for TimelineItem {} - -impl PartialEq for TimelineItem { - fn eq(&self, other: &Self) -> bool { - self.time.eq(&other.time) - } -} - -impl PartialOrd for TimelineItem { - fn partial_cmp(&self, other: &Self) -> Option { - self.time.partial_cmp(&other.time) - } -} - -impl Ord for TimelineItem { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.time.cmp(&other.time) - } -} - -pub struct TimelineAggregateBuilder { - settings: AggregateSettings, - storage: S, - users: Arc, -} - -pub struct TimelineAggregate { - settings: AggregateSettings, - timeline: Vec>, - storage: S, - users: Arc, -} - -impl TimelineItem -where - S: AggregateStorage, -{ - fn new(time: Duration, storage: S, errors: usize, users: usize) -> Self { - Self { - time, - storage, - errors, - users, - } - } - - pub fn time(&self) -> &Duration { - &self.time - } - - pub fn storage(&self) -> &S { - &self.storage - } - - pub fn errors(&self) -> usize { - self.errors - } - - pub fn users(&self) -> usize { - self.users - } - - fn record(&mut self, metric: S::Metric, value: u64) { - self.storage.record(metric, value) - } - - fn update_counters( - &mut self, - error: Option<&MetricRecordError>, - users: &Arc, - ) { - if error.is_some() { - self.errors += 1; - } - - self.users = users.load(Ordering::Relaxed) - } - - fn merge_into(self, other: &mut Self) { - let storage = std::mem::take(&mut other.storage); - other.storage = storage.merge(self.storage); - other.users = max(other.users, self.users); - other.errors += self.errors; - } -} - -impl TimelineAggregateBuilder -where - S: AggregateStorage, - S::Metric: Sync, -{ - pub fn new(storage: S) -> Self { - Self::with_settings(storage, AggregateSettings::default()) - } - - pub fn with_settings(storage: S, settings: AggregateSettings) -> Self { - Self { - settings, - storage, - users: Arc::new(AtomicUsize::new(0)), - } - } -} - -impl MetricAggregateBuilder for TimelineAggregateBuilder -where - S: AggregateStorage, - S::Metric: Sync, -{ - type Reporter = TimelineAggregate; - - fn build(&self) -> Self::Reporter { - self.users.fetch_add(1, Ordering::Relaxed); - TimelineAggregate { - timeline: Vec::new(), - storage: self.storage.clone(), - settings: self.settings, - users: self.users.clone(), - } - } -} - -impl MetricAggregate for TimelineAggregate -where - S: AggregateStorage, -{ - type Metric = S::Metric; - - fn add_entry( - &mut self, - metric: Self::Metric, - latency: Duration, - error: Option<&MetricRecordError>, - ) { - let time_window = self.settings.zero().window(self.settings.window()); - let item = match self.timeline.last_mut() { - Some(item) if item.time().eq(&time_window) => item, - _ => { - let position = self.timeline.len(); - self.timeline.push(TimelineItem::new( - time_window, - self.storage.clone(), - 0, - 0, - )); - &mut self.timeline[position] - } - }; - - item.record(metric, self.settings.scale().duration_to_value(latency)); - item.update_counters(error, &self.users); - } - - fn merge_into(self, other: &mut Self) { - for item in self.timeline.into_iter() { - match other.timeline.binary_search(&item) { - Ok(position) => { - item.merge_into(&mut other.timeline[position]); - } - Err(position) => other.timeline.insert(position, item), - } - } - } -} - -impl TimelineAggregate -where - S: AggregateStorage, - S::Metric: Sync, -{ - pub fn flush(self) -> Vec> { - self.timeline - } -} - -#[cfg(test)] -mod tests { - use tokio::time::advance; - - use crate::aggregate::{AggregateScale, MetricAggregateStorage}; - use crate::metric::Metric; - - use super::*; - - #[derive(Ord, PartialOrd, Eq, PartialEq, Hash, Copy, Clone)] - enum ReportMetric { - One, - Two, - } - - impl Metric for ReportMetric { - fn name(&self) -> &str { - "report_metric" - } - } - - #[tokio::test(start_paused = true)] - async fn aggregates_values_per_each_time_window() { - let mut reporter = TimelineAggregateBuilder::with_settings( - MetricAggregateStorage::default(), - AggregateSettings::default() - .with_window(Duration::from_millis(100)) - .with_scale(AggregateScale::Milliseconds), - ) - .build(); - - reporter.add_entry(ReportMetric::One, Duration::from_millis(10), None); - reporter.add_entry( - ReportMetric::Two, - Duration::from_millis(10), - Some(MetricRecordError::Timeout(Duration::from_millis(10))).as_ref(), - ); - advance(Duration::from_millis(200)).await; - reporter.add_entry(ReportMetric::Two, Duration::from_millis(20), None); - advance(Duration::from_millis(51)).await; - reporter.add_entry(ReportMetric::One, Duration::from_millis(40), None); - - verify_timeline( - vec![ - (Duration::from_millis(0), (10, 10), 1, 1), - (Duration::from_millis(200), (0, 20), 0, 1), - (Duration::from_millis(300), (40, 0), 0, 1), - ], - reporter.flush(), - ); - } - - #[tokio::test(start_paused = true)] - async fn merges_aggregated_values_per_each_time_window() { - let builder = TimelineAggregateBuilder::with_settings( - MetricAggregateStorage::default(), - AggregateSettings::default() - .with_window(Duration::from_millis(100)) - .with_scale(AggregateScale::Milliseconds), - ); - - let mut reporter_one = builder.build(); - let mut reporter_two = builder.build(); - - reporter_one.add_entry(ReportMetric::One, Duration::from_millis(10), None); - reporter_two.add_entry( - ReportMetric::Two, - Duration::from_millis(10), - Some(MetricRecordError::Timeout(Duration::from_millis(10))).as_ref(), - ); - advance(Duration::from_millis(200)).await; - reporter_one.add_entry(ReportMetric::Two, Duration::from_millis(20), None); - advance(Duration::from_millis(51)).await; - reporter_two.add_entry(ReportMetric::One, Duration::from_millis(40), None); - - let mut aggregated = builder.build(); - reporter_one.merge_into(&mut aggregated); - reporter_two.merge_into(&mut aggregated); - - verify_timeline( - vec![ - (Duration::from_millis(0), (10, 10), 1, 2), - (Duration::from_millis(200), (0, 20), 0, 2), - (Duration::from_millis(300), (40, 0), 0, 2), - ], - aggregated.flush(), - ); - } - - fn verify_timeline( - expected_values: Vec<(Duration, (u64, u64), usize, usize)>, - result: Vec>>, - ) { - assert_eq!( - result.len(), - expected_values.len(), - "Number of timeline values does not match" - ); - for ( - item, - ( - expected_time, - (expected_metric_one, expected_metric_two), - expected_errors, - expected_users, - ), - ) in result.into_iter().zip(expected_values) - { - assert_eq!(*item.time(), expected_time, "Time window does not match"); - assert_eq!( - item.storage().value(ReportMetric::One).min(), - expected_metric_one, - "Minimum metric one does not match" - ); - assert_eq!( - item.storage().value(ReportMetric::Two).min(), - expected_metric_two, - "Minimum metric two does not match" - ); - assert_eq!( - item.errors(), - expected_errors, - "Number of errors does not match" - ); - assert_eq!(item.users(), expected_users, "Number of users does match"); - } - } -} diff --git a/src/scenario/mod.rs b/src/scenario/mod.rs deleted file mode 100644 index 12c2c4f..0000000 --- a/src/scenario/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub trait ScenarioBuilder {} - -pub trait Scenario {}