diff --git a/kvrocks.conf b/kvrocks.conf index 16a980e5d51..461d077eb3f 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -20,13 +20,13 @@ bind 127.0.0.1 # unixsocket /tmp/kvrocks.sock # unixsocketperm 777 -# Allows a parent process to open a socket and pass its FD down to kvrocks as a child +# Allows a parent process to open a socket and pass its FD down to kvrocks as a child # process. Useful to reserve a port and prevent race conditions. -# -# PLEASE NOTE: -# If this is overridden to a value other than -1, the bind and tls* directives will be +# +# PLEASE NOTE: +# If this is overridden to a value other than -1, the bind and tls* directives will be # ignored. -# +# # Default: -1 (not overridden, defer to creating a connection to the specified port) socket-fd -1 @@ -369,10 +369,22 @@ json-storage-format json # NOTE: This is an experimental feature. If you find errors, performance degradation, # excessive memory usage, excessive disk I/O, etc. after enabling it, please try disabling it. # At the same time, we welcome feedback on related issues to help iterative improvements. -# +# # Default: no txn-context-enabled no +# Define the histogram bucket values. +# +# If enabled, those values will be used to store the command execution latency values +# in buckets defined below. The values should be integers and must be sorted. +# An implicit bucket (+Inf in prometheus jargon) will be added to track the highest values +# that are beyond the bucket limits. + +# NOTE: This is an experimental feature. There might be some performance overhead when using this +# feature, please be aware. +# Default: disabled +# histogram-bucket-boundaries 10,20,40,60,80,100,150,250,350,500,750,1000,1500,2000,4000,8000 + ################################## TLS ################################### # By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0. @@ -1031,7 +1043,7 @@ rocksdb.partition_filters yes # Specifies the maximum size in bytes for a write batch in RocksDB. # If set to 0, there is no size limit for write batches. # This option can help control memory usage and manage large WriteBatch operations more effectively. -# +# # Default: 0 # rocksdb.write_options.write_batch_max_bytes 0 diff --git a/src/config/config.cc b/src/config/config.cc index fe7e3fb0e2f..284f47e9d4d 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -239,6 +239,7 @@ Config::Config() { new EnumField(&json_storage_format, json_storage_formats, JsonStorageFormat::JSON)}, {"txn-context-enabled", true, new YesNoField(&txn_context_enabled, false)}, {"skip-block-cache-deallocation-on-close", false, new YesNoField(&skip_block_cache_deallocation_on_close, false)}, + {"histogram-bucket-boundaries", true, new StringField(&histogram_bucket_boundaries_str_, "")}, /* rocksdb options */ {"rocksdb.compression", false, @@ -754,6 +755,25 @@ void Config::initFieldCallback() { {"tls-session-cache-size", set_tls_option}, {"tls-session-cache-timeout", set_tls_option}, #endif + {"histogram-bucket-boundaries", + [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status { + std::vector buckets = util::Split(v, ","); + histogram_bucket_boundaries.clear(); + if (buckets.size() < 1) { + return Status::OK(); + } + for (const auto &bucket_val : buckets) { + auto parse_result = ParseFloat(bucket_val); + if (!parse_result) { + return {Status::NotOK, "The values in the bucket list must be double or integer."}; + } + histogram_bucket_boundaries.push_back(*parse_result); + } + if (!std::is_sorted(histogram_bucket_boundaries.begin(), histogram_bucket_boundaries.end())) { + return {Status::NotOK, "The values for the histogram must be sorted."}; + } + return Status::OK(); + }}, }; for (const auto &iter : callbacks) { auto field_iter = fields_.find(iter.first); diff --git a/src/config/config.h b/src/config/config.h index 9fa5d4168fc..7c76759c241 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -177,6 +177,8 @@ struct Config { bool skip_block_cache_deallocation_on_close = false; + std::vector histogram_bucket_boundaries; + struct RocksDB { int block_size; bool cache_index_and_filter_blocks; @@ -260,6 +262,7 @@ struct Config { std::string profiling_sample_commands_str_; std::map> fields_; std::vector rename_command_; + std::string histogram_bucket_boundaries_str_; void initFieldValidator(); void initFieldCallback(); diff --git a/src/server/server.cc b/src/server/server.cc index 5b52eb333fb..e809444baa3 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -52,7 +52,8 @@ #include "worker.h" Server::Server(engine::Storage *storage, Config *config) - : storage(storage), + : stats(config->histogram_bucket_boundaries), + storage(storage), indexer(storage), index_mgr(&indexer, storage), start_time_secs_(util::GetTimeStamp()), @@ -60,9 +61,19 @@ Server::Server(engine::Storage *storage, Config *config) namespace_(storage) { // init commands stats here to prevent concurrent insert, and cause core auto commands = redis::CommandTable::GetOriginal(); + for (const auto &iter : *commands) { stats.commands_stats[iter.first].calls = 0; stats.commands_stats[iter.first].latency = 0; + + if (stats.bucket_boundaries.size() > 0) { + // NB: Extra index for the last bucket (Inf) + for (std::size_t i{0}; i <= stats.bucket_boundaries.size(); ++i) { + stats.commands_histogram[iter.first].buckets.push_back(std::make_unique>(0)); + } + stats.commands_histogram[iter.first].calls = 0; + stats.commands_histogram[iter.first].sum = 0; + } } // init cursor_dict_ @@ -1165,6 +1176,25 @@ void Server::GetCommandsStatsInfo(std::string *info) { << ",usec_per_call=" << static_cast(latency / calls) << "\r\n"; } + for (const auto &cmd_hist : stats.commands_histogram) { + auto command_name = cmd_hist.first; + auto calls = stats.commands_histogram[command_name].calls.load(); + if (calls == 0) continue; + + auto sum = stats.commands_histogram[command_name].sum.load(); + string_stream << "cmdstathist_" << command_name << ":"; + for (std::size_t i{0}; i < stats.commands_histogram[command_name].buckets.size(); ++i) { + auto bucket_value = stats.commands_histogram[command_name].buckets[i]->load(); + auto bucket_bound = std::numeric_limits::infinity(); + if (i < stats.bucket_boundaries.size()) { + bucket_bound = stats.bucket_boundaries[i]; + } + + string_stream << bucket_bound << "=" << bucket_value << ","; + } + string_stream << "sum=" << sum << ",count=" << calls << "\r\n"; + } + *info = string_stream.str(); } diff --git a/src/stats/stats.cc b/src/stats/stats.cc index ae18638b221..5baea34d6f6 100644 --- a/src/stats/stats.cc +++ b/src/stats/stats.cc @@ -26,7 +26,7 @@ #include "fmt/format.h" #include "time_util.h" -Stats::Stats() { +Stats::Stats(std::vector bucket_boundaries) : bucket_boundaries(std::move(bucket_boundaries)) { for (int i = 0; i < STATS_METRIC_COUNT; i++) { InstMetric im; im.last_sample_time_ms = 0; @@ -86,10 +86,22 @@ int64_t Stats::GetMemoryRSS() { void Stats::IncrCalls(const std::string &command_name) { total_calls.fetch_add(1, std::memory_order_relaxed); commands_stats[command_name].calls.fetch_add(1, std::memory_order_relaxed); + + if (bucket_boundaries.size() > 0) { + commands_histogram[command_name].calls.fetch_add(1, std::memory_order_relaxed); + } } void Stats::IncrLatency(uint64_t latency, const std::string &command_name) { commands_stats[command_name].latency.fetch_add(latency, std::memory_order_relaxed); + + if (bucket_boundaries.size() > 0) { + commands_histogram[command_name].sum.fetch_add(latency, std::memory_order_relaxed); + + const auto bucket_index = static_cast(std::distance( + bucket_boundaries.begin(), std::lower_bound(bucket_boundaries.begin(), bucket_boundaries.end(), latency))); + commands_histogram[command_name].buckets[bucket_index]->fetch_add(1, std::memory_order_relaxed); + } } void Stats::TrackInstantaneousMetric(int metric, uint64_t current_reading) { diff --git a/src/stats/stats.h b/src/stats/stats.h index e00506a9672..0bae042d640 100644 --- a/src/stats/stats.h +++ b/src/stats/stats.h @@ -22,8 +22,10 @@ #include +#include #include #include +#include #include #include #include @@ -43,6 +45,13 @@ enum StatsMetricFlags { constexpr int STATS_METRIC_SAMPLES = 16; // Number of samples per metric +// Experimental part to support histograms for cmd statistics +struct CommandHistogram { + std::vector>> buckets; + std::atomic calls; + std::atomic sum; +}; + struct CommandStat { std::atomic calls; std::atomic latency; @@ -69,7 +78,12 @@ class Stats { std::atomic psync_ok_count = {0}; std::map commands_stats; - Stats(); + using BucketBoundaries = std::vector; + BucketBoundaries bucket_boundaries; + std::map commands_histogram; + + explicit Stats(std::vector histogram_bucket_boundaries); + void IncrCalls(const std::string &command_name); void IncrLatency(uint64_t latency, const std::string &command_name); void IncrInboundBytes(uint64_t bytes) { in_bytes.fetch_add(bytes, std::memory_order_relaxed); } diff --git a/tests/cppunit/config_test.cc b/tests/cppunit/config_test.cc index 4e9b3817a9f..2a6ac1b5608 100644 --- a/tests/cppunit/config_test.cc +++ b/tests/cppunit/config_test.cc @@ -130,6 +130,8 @@ TEST(Config, GetAndSet) { {"rocksdb.rate_limiter_auto_tuned", "yes"}, {"rocksdb.compression_level", "32767"}, {"rocksdb.wal_compression", "no"}, + {"histogram-bucket-boundaries", "10,100,1000,10000"}, + }; for (const auto &iter : immutable_cases) { s = config.Set(nullptr, iter.first, iter.second); diff --git a/tests/gocase/unit/info/info_test.go b/tests/gocase/unit/info/info_test.go index dd4b0cdcc78..128e57748f3 100644 --- a/tests/gocase/unit/info/info_test.go +++ b/tests/gocase/unit/info/info_test.go @@ -23,17 +23,20 @@ import ( "context" "fmt" "strconv" + "strings" "testing" "time" - "github.com/redis/go-redis/v9" - "github.com/apache/kvrocks/tests/gocase/util" + "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" ) func TestInfo(t *testing.T) { - srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + srv0 := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + "histogram-bucket-boundaries": "10,20,30,50", + }) defer func() { srv0.Close() }() rdb0 := srv0.NewClient() defer func() { require.NoError(t, rdb0.Close()) }() @@ -102,6 +105,20 @@ func TestInfo(t *testing.T) { t.Run("get cluster information by INFO - cluster enabled", func(t *testing.T) { require.Equal(t, "1", util.FindInfoEntry(rdb0, "cluster_enabled", "cluster")) }) + + t.Run("get command latencies via histogram INFO - histogram-bucket-boundaries", func(t *testing.T) { + output := util.FindInfoEntry(rdb0, "cmdstathist_info", "commandstats") + + splitValues := strings.FieldsFunc(output, func(r rune) bool { + return r == '=' || r == ',' + }) + + // expected: 10=..,20=..,30=..,50=..,inf=..,sum=...,count=.. + require.GreaterOrEqual(t, len(splitValues), 14) + require.Contains(t, splitValues, "sum") + require.Contains(t, splitValues, "count") + require.Contains(t, splitValues, "inf") + }) } func TestKeyspaceHitMiss(t *testing.T) {