From 2d5cbf90afd7765605ffaefa02a2da602a4767c4 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Sat, 10 Aug 2019 15:04:53 -0700 Subject: [PATCH 1/6] Fix error in RESP protocol parser --- akumulid/protocolparser.cpp | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/akumulid/protocolparser.cpp b/akumulid/protocolparser.cpp index f961f9a5..4c0a02b4 100644 --- a/akumulid/protocolparser.cpp +++ b/akumulid/protocolparser.cpp @@ -509,15 +509,20 @@ bool RESPProtocolParser::parse_values(RESPStream& stream, } } } else { - if (next == RESPStream::STRING) { - if (!parse_event_value(i)) { + switch (next) { + case RESPStream::STRING: + if (!parse_event_value(i)) { + return false; + } + break; + case RESPStream::_AGAIN: return false; + default: { + std::string msg; + size_t pos; + std::tie(msg, pos) = rdbuf_.get_error_context("unexpected event format"); + BOOST_THROW_EXCEPTION(ProtocolParserError(msg, pos)); } - } else { - std::string msg; - size_t pos; - std::tie(msg, pos) = rdbuf_.get_error_context("unexpected event format"); - BOOST_THROW_EXCEPTION(ProtocolParserError(msg, pos)); } } } From 13702b9a9a2a065846599c7639049ad7f00fabbe Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Sun, 11 Aug 2019 15:20:52 -0700 Subject: [PATCH 2/6] Add framing tests --- unittests/test_protocolparser.cpp | 115 ++++++++++++++++++++++++++++-- 1 file changed, 109 insertions(+), 6 deletions(-) diff --git a/unittests/test_protocolparser.cpp b/unittests/test_protocolparser.cpp index c7dd2a95..f4aeb277 100644 --- a/unittests/test_protocolparser.cpp +++ b/unittests/test_protocolparser.cpp @@ -16,14 +16,25 @@ struct ConsumerMock : DbSession { std::vector param_; std::vector ts_; std::vector data_; + std::vector event_; virtual ~ConsumerMock() {} virtual aku_Status write(const aku_Sample &sample) override { - param_.push_back(sample.paramid); - ts_.push_back(sample.timestamp); - data_.push_back(sample.payload.float64); - return AKU_SUCCESS; + if (sample.payload.type == AKU_PAYLOAD_FLOAT) { + param_.push_back(sample.paramid); + ts_.push_back(sample.timestamp); + data_.push_back(sample.payload.float64); + return AKU_SUCCESS; + } + else if (sample.payload.type == AKU_PAYLOAD_EVENT) { + param_.push_back(sample.paramid); + ts_.push_back(sample.timestamp); + int len = sample.payload.size - sizeof(aku_Sample); + event_.push_back(std::string(sample.payload.data, sample.payload.data + len)); + return AKU_SUCCESS; + } + return AKU_EBAD_ARG; } virtual std::shared_ptr query(std::string) override { @@ -46,9 +57,16 @@ struct ConsumerMock : DbSession { } virtual aku_Status series_to_param_id(const char* begin, size_t sz, aku_Sample* sample) override { + int sign = 1; + if (*begin == '!' && sz > 1) { + // Events names start with ! + begin++; + sz--; + sign = -1; + } std::string num(begin, begin + sz); boost::algorithm::trim(num); - sample->paramid = boost::lexical_cast(num); + sample->paramid = sign * boost::lexical_cast(num); return AKU_SUCCESS; } @@ -64,8 +82,14 @@ struct ConsumerMock : DbSession { while(*it_end != '|' && it_end < end) { it_end++; } + int sign = 1; + if (*it_begin == '!') { + // Events names start with ! + it_begin++; + sign = -1; + } std::string val(it_begin, it_end); - ids[i] = boost::lexical_cast(val); + ids[i] = sign * boost::lexical_cast(val); it_end++; it_begin = it_end; } @@ -395,6 +419,85 @@ BOOST_AUTO_TEST_CASE(Test_protocol_parser_framing_dict) { } } +BOOST_AUTO_TEST_CASE(Test_protocol_parser_framing_event) { + + const char *message = "+!1\r\n:2\r\n+event1\r\n" + "+!2\r\n:7\r\n+event2\r\n" + "+!3\r\n:11\r\n+event3\r\n" + "+!4\r\n:15\r\n+event4\r\n"; + + auto pred = [] (std::shared_ptr cons) { + + BOOST_REQUIRE_EQUAL(cons->param_.size(), 4); + // 0 + BOOST_REQUIRE_EQUAL(cons->param_[0], static_cast(-1)); + BOOST_REQUIRE_EQUAL(cons->ts_[0], 2); + BOOST_REQUIRE_EQUAL(cons->event_[0], "event1"); + // 1 + BOOST_REQUIRE_EQUAL(cons->param_[1], static_cast(-2)); + BOOST_REQUIRE_EQUAL(cons->ts_[1], 7); + BOOST_REQUIRE_EQUAL(cons->event_[1], "event2"); + // 2 + BOOST_REQUIRE_EQUAL(cons->param_[2], static_cast(-3)); + BOOST_REQUIRE_EQUAL(cons->ts_[2], 11); + BOOST_REQUIRE_EQUAL(cons->event_[2], "event3"); + // 3 + BOOST_REQUIRE_EQUAL(cons->param_[3], static_cast(-4)); + BOOST_REQUIRE_EQUAL(cons->ts_[3], 15); + BOOST_REQUIRE_EQUAL(cons->event_[3], "event4"); + }; + + size_t msglen = strlen(message); + + for (int i = 0; i < 100; i++) { + size_t pivot1 = 1 + static_cast(rand()) % (msglen / 2); + size_t pivot2 = 1+ static_cast(rand()) % (msglen - pivot1 - 2) + pivot1; + std::shared_ptr cons(new ConsumerMock); + find_framing_issues(message, msglen, pivot1, pivot2, pred, cons); + } +} + +BOOST_AUTO_TEST_CASE(Test_protocol_parser_framing_event_bulk) { + + const char *message = "+!1|!2|!3|!4\r\n" + ":9\r\n" + "*4\r\n" + "+event1\r\n" + "+event2\r\n" + "+event3\r\n" + "+event4\r\n"; + + auto pred = [] (std::shared_ptr cons) { + + BOOST_REQUIRE_EQUAL(cons->param_.size(), 4); + // 0 + BOOST_REQUIRE_EQUAL(cons->param_[0], static_cast(-1)); + BOOST_REQUIRE_EQUAL(cons->ts_[0], 9); + BOOST_REQUIRE_EQUAL(cons->event_[0], "event1"); + // 1 + BOOST_REQUIRE_EQUAL(cons->param_[1], static_cast(-2)); + BOOST_REQUIRE_EQUAL(cons->ts_[1], 9); + BOOST_REQUIRE_EQUAL(cons->event_[1], "event2"); + // 2 + BOOST_REQUIRE_EQUAL(cons->param_[2], static_cast(-3)); + BOOST_REQUIRE_EQUAL(cons->ts_[2], 9); + BOOST_REQUIRE_EQUAL(cons->event_[2], "event3"); + // 3 + BOOST_REQUIRE_EQUAL(cons->param_[3], static_cast(-4)); + BOOST_REQUIRE_EQUAL(cons->ts_[3], 9); + BOOST_REQUIRE_EQUAL(cons->event_[3], "event4"); + }; + + size_t msglen = strlen(message); + + for (int i = 0; i < 100; i++) { + size_t pivot1 = 1 + static_cast(rand()) % (msglen / 2); + size_t pivot2 = 1+ static_cast(rand()) % (msglen - pivot1 - 2) + pivot1; + std::shared_ptr cons(new ConsumerMock); + find_framing_issues(message, msglen, pivot1, pivot2, pred, cons); + } +} + struct NameCheckingConsumer : DbSession { enum { ID = 101 }; From c8f2616d2211b7a0045c3748692ddc97aa8f60f6 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Mon, 12 Aug 2019 14:32:16 -0700 Subject: [PATCH 3/6] Add --config configuration parameter --- akumulid/main.cpp | 89 +++++++++++++++++++++++++---------------------- 1 file changed, 48 insertions(+), 41 deletions(-) diff --git a/akumulid/main.cpp b/akumulid/main.cpp index eedc8db3..a1c937e6 100644 --- a/akumulid/main.cpp +++ b/akumulid/main.cpp @@ -117,7 +117,10 @@ nvolumes=4 struct ConfigFile { typedef boost::property_tree::ptree PTree; - static boost::filesystem::path default_config_path() { + static boost::filesystem::path get_config_path(boost::optional config_path) { + if (config_path) { + return expand_path(*config_path); + } auto path2cfg = boost::filesystem::path(getenv("HOME")); path2cfg /= ".akumulid"; return path2cfg; @@ -448,9 +451,9 @@ void create_db_files(const char* path, /** Read configuration file and run server. * If config file can't be found - report error. */ -void cmd_run_server() { +void cmd_run_server(boost::optional cmd_config_path) { - auto config_path = ConfigFile::default_config_path(); + auto config_path = ConfigFile::get_config_path(cmd_config_path); auto config = ConfigFile::read_config_file(config_path); auto path = ConfigFile::get_path(config); auto ingestion_servers = ConfigFile::get_server_settings(config); @@ -537,9 +540,8 @@ void cmd_run_server() { /** Create database command. */ -void cmd_create_database(bool test_db=false, bool allocate=false) { - auto config_path = ConfigFile::default_config_path(); - +void cmd_create_database(boost::optional cmd_config_path, bool test_db=false, bool allocate=false) { + auto config_path = ConfigFile::get_config_path(cmd_config_path); auto config = ConfigFile::read_config_file(config_path); auto path = ConfigFile::get_path(config); auto volumes = ConfigFile::get_nvolumes(config); @@ -552,12 +554,11 @@ void cmd_create_database(bool test_db=false, bool allocate=false) { create_db_files(path.c_str(), volumes, volsize, allocate); } -void cmd_delete_database() { - auto config_path = ConfigFile::default_config_path(); - - auto config = ConfigFile::read_config_file(config_path); - auto path = ConfigFile::get_path(config); - auto wal_path = ConfigFile::get_wal_settings(config).path; +void cmd_delete_database(boost::optional cmd_config_path) { + auto config_path = ConfigFile::get_config_path(cmd_config_path); + auto config = ConfigFile::read_config_file(config_path); + auto path = ConfigFile::get_path(config); + auto wal_path = ConfigFile::get_wal_settings(config).path; auto full_path = boost::filesystem::path(path) / "db.akumuli"; if (boost::filesystem::exists(full_path)) { @@ -583,10 +584,10 @@ void cmd_delete_database() { } } -void cmd_dump_debug_information(const char* outfname) { - auto config_path = ConfigFile::default_config_path(); - auto config = ConfigFile::read_config_file(config_path); - auto path = ConfigFile::get_path(config); +void cmd_dump_debug_information(boost::optional cmd_config_path, const char* outfname) { + auto config_path = ConfigFile::get_config_path(cmd_config_path); + auto config = ConfigFile::read_config_file(config_path); + auto path = ConfigFile::get_path(config); auto full_path = boost::filesystem::path(path) / "db.akumuli"; if (boost::filesystem::exists(full_path)) { @@ -615,10 +616,10 @@ void cmd_dump_debug_information(const char* outfname) { } } -void cmd_dump_recovery_debug_information(const char* outfname) { - auto config_path = ConfigFile::default_config_path(); - auto config = ConfigFile::read_config_file(config_path); - auto path = ConfigFile::get_path(config); +void cmd_dump_recovery_debug_information(boost::optional cmd_config_path, const char* outfname) { + auto config_path = ConfigFile::get_config_path(cmd_config_path); + auto config = ConfigFile::read_config_file(config_path); + auto path = ConfigFile::get_path(config); auto full_path = boost::filesystem::path(path) / "db.akumuli"; if (boost::filesystem::exists(full_path)) { @@ -665,17 +666,10 @@ int main(int argc, char** argv) { try { std::locale::global(std::locale("C")); - aku_initialize(&panic_handler, &static_logger); - - // Init logger - auto path = ConfigFile::default_config_path(); - if (boost::filesystem::exists(path)) { - Logger::init(path.c_str()); - } - po::options_description cli_only_options; cli_only_options.add_options() ("help", "Produce help message") + ("config", po::value(), "Path to configuration file") ("create", "Create database") ("allocate", "Preallocate disk space") ("delete", "Delete database") @@ -691,6 +685,24 @@ int main(int argc, char** argv) { po::store(po::parse_command_line(argc, argv, cli_only_options), vm); po::notify(vm); + if (vm.count("help")) { + rich_print(CLI_HELP_MESSAGE); + exit(EXIT_SUCCESS); + } + + boost::optional cmd_config_path; + if (vm.count("config")) { + cmd_config_path = vm["config"].as(); + } + + aku_initialize(&panic_handler, &static_logger); + + // Init logger + auto path = ConfigFile::get_config_path(cmd_config_path); + if (boost::filesystem::exists(path)) { + Logger::init(path.c_str()); + } + std::stringstream header; #ifndef AKU_VERSION header << "\n\nStarted\n\n"; @@ -704,11 +716,6 @@ int main(int argc, char** argv) { header << "\n\n"; logger.info() << header.str(); - if (vm.count("help")) { - rich_print(CLI_HELP_MESSAGE); - exit(EXIT_SUCCESS); - } - if (vm.count("init")) { bool disable_wal = vm.count("disable-wal"); ConfigFile::init_config(path, disable_wal); @@ -733,26 +740,26 @@ int main(int argc, char** argv) { bool allocate = false; if(vm.count("allocate")) allocate = true; - cmd_create_database(false, allocate); + cmd_create_database(cmd_config_path, false, allocate); exit(EXIT_SUCCESS); } if (vm.count("CI")) { - cmd_create_database(true); + cmd_create_database(cmd_config_path, true); exit(EXIT_SUCCESS); } if (vm.count("delete")) { - cmd_delete_database(); + cmd_delete_database(cmd_config_path); exit(EXIT_SUCCESS); } if (vm.count("debug-dump")) { auto path = vm["debug-dump"].as(); if (path == "stdout") { - cmd_dump_debug_information(nullptr); + cmd_dump_debug_information(cmd_config_path, nullptr); } else { - cmd_dump_debug_information(path.c_str()); + cmd_dump_debug_information(cmd_config_path, path.c_str()); } exit(EXIT_SUCCESS); } @@ -760,14 +767,14 @@ int main(int argc, char** argv) { if (vm.count("debug-recovery-dump")) { auto path = vm["debug-recovery-dump"].as(); if (path == "stdout") { - cmd_dump_recovery_debug_information(nullptr); + cmd_dump_recovery_debug_information(cmd_config_path, nullptr); } else { - cmd_dump_recovery_debug_information(path.c_str()); + cmd_dump_recovery_debug_information(cmd_config_path, path.c_str()); } exit(EXIT_SUCCESS); } - cmd_run_server(); + cmd_run_server(cmd_config_path); logger.info() << "\n\nClean exit\n\n"; From 42397bc00708de927353962bdea8e76108a9f735 Mon Sep 17 00:00:00 2001 From: Eugene Lazin <4lazin@gmail.com> Date: Tue, 13 Aug 2019 16:22:00 +0300 Subject: [PATCH 4/6] Update version/changelog --- CHANGELOG.md | 11 +++++++++++ CMakeLists.txt | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e19c252..16fc26dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,17 @@ Release notes ============= +Version 0.8.69 +-------------- + +IMPROVEMENT + +* Add --config command line argument + +BUG FIX + +* Fix framing issue in RESP protocol parser + Version 0.8.68 -------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index b1d51aae..249b4b94 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 2.8) set(APP_VERSION_MAJOR "0") set(APP_VERSION_MINOR "8") -set(APP_VERSION_PATCH "68") +set(APP_VERSION_PATCH "69") set(APP_VERSION "${APP_VERSION_MAJOR}.${APP_VERSION_MINOR}.${APP_VERSION_PATCH}") add_definitions(-DAKU_VERSION="${APP_VERSION}") From f33e25cab78b52f9133704b8f48c4dfc9da95a78 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Tue, 13 Aug 2019 14:53:47 -0700 Subject: [PATCH 5/6] Update README --- README.md | 51 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 1cc9a24a..9da60027 100644 --- a/README.md +++ b/README.md @@ -8,25 +8,30 @@ The word "akumuli" can be translated from Esperanto as "accumulate". Features ------- -* True column-oriented format (not PAX). +* Column-oriented storage. * Based on novel [LSM and B+tree hybrid datastructure](http://akumuli.org/akumuli/2017/04/29/nbplustree/) with multiversion concurrency control (no concurrency bugs, parallel writes, optimized for SSD and NVMe). +* Supports both metrics and arbitrary events. +* Fast and effecient compression algorithm that outperforms 'Gorilla' time-series compression. * Crash safety and recovery. * Fast aggregation without pre-configured rollups or materialized views. -* Queries can be executed without decompressing the data. -* Fast compression algorithm (dictionary + entropy) with small memory overhead (about 2.5 bytes per element on appropriate data). +* Many queries can be executed without decompressing the data. * Compressed in-memory storage for recent data. -* Can be used as a server application or an embedded library. -* Simple query language based on JSON and HTTP. +* Can be used as a server application or embedded library. +* Simple API based on JSON and HTTP. * Fast range scans and joins, read speed doesn't depend on database cardinality. -* Fast data ingestion over the network: - * 4.5M data points per second on 8-core Intel Xeon E5-2670 v2 (m3.2xlarge EC2 instance). - * 16.1M data points per second on 32-core Intel Xeon E5-2680 v2 (c3.8xlarge EC2 instance). -* Query results are streamed to client using the chunked transfer encoding of the HTTP protocol. -* Decompression algorithm and input parsers were fuzz-tested. +* Fast data ingestion: + * 5.4M writes/sec on DigitalOcean droplet with 8-cores 32GB of RAM (using only 6 cores) + * 4.6M writes/sec on DigitalOcean droplet with 8-cores 32GB of RAM (6 cores with enabled WAL) + * 16.1M writes/sec on 32-core Intel Xeon E5-2680 v2 (c3.8xlarge EC2 instance). +* Queries are executed lazily. Query results are produced as long as client reads them. +* Compression algorithm and input parsers are fuzz-tested on every code change. * Grafana [datasource](https://github.com/akumuli/akumuli-datasource) plugin. -* Fast and compact inverted index for series lookup. +* Fast and compact inverted index for time-series lookup. +Roadmap +------ + |Storage engine features |Current version|Future versions| |-------------------------------|---------------|---------------| |Inserts |In order |Out of order | @@ -36,7 +41,7 @@ Features |Compression |+ |+ | |Tags |+ |+ | |High-throughput ingestion |+ |+ | -|High cardinality |- |+ | +|High cardinality |+ |+ | |Crash recovery |+ |+ | |Incremental backup |- |+ | |Clustering |- |+ | @@ -60,6 +65,13 @@ Features |Filter & group-aggregate |+ |+ | |Filter & join |+ |+ | + + +Supported Architectures +----------------------- + +Akumuli supports 64 and 32-bit Intel processors. It also works on 64 and 32-bit ARM processors but these architectures are not covered by continous integration. + Gettings Started ---------------- * You can find [documentation](https://akumuli.gitbook.io/docs) here @@ -73,12 +85,15 @@ Supported Platforms Pre-built [Debian/RPM packages](https://packagecloud.io/Lazin/Akumuli) for the following platforms are available via packagecloud: -* Ubuntu 14.04 -* Ubuntu 16.04 -* Ubuntu 18.04 -* Debian Jessie -* Debian Stretch -* CentOS 7 +* AMD 64 Ubuntu 14.04 +* AMD 64 Ubuntu 16.04 +* AMD 64 Ubuntu 18.04 +* AMD 64 Debian Jessie +* AMD 64 Debian Stretch +* AMD 64 CentOS 7 +* ARM 64 Ubuntu 16.04 +* ARM 64 Ubuntu 18.04 +* ARM 64 CentOS 7 Docker image is availabe through [Docker Hub](https://hub.docker.com/r/akumuli/akumuli/tags/). From 67a88e6caae9eff3258b44612e61a49689386fe3 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Wed, 14 Aug 2019 12:29:12 -0700 Subject: [PATCH 6/6] Update README --- README.md | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 9da60027..2b9dbd71 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -README [![Build Status](https://travis-ci.org/akumuli/Akumuli.svg?branch=master)](https://travis-ci.org/akumuli/Akumuli) [![Coverity Scan Build Status](https://scan.coverity.com/projects/8879/badge.svg)](https://scan.coverity.com/projects/akumuli) [![Join the chat at https://gitter.im/akumuli/Akumuli](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/akumuli/Akumuli?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) +README [![Build Status](https://travis-ci.org/akumuli/Akumuli.svg?branch=master)](https://travis-ci.org/akumuli/Akumuli) [![Coverity Scan Build Status](https://scan.coverity.com/projects/8879/badge.svg)](https://scan.coverity.com/projects/akumuli) ====== **Akumuli** is a time-series database for modern hardware. @@ -10,7 +10,7 @@ Features * Column-oriented storage. * Based on novel [LSM and B+tree hybrid datastructure](http://akumuli.org/akumuli/2017/04/29/nbplustree/) with multiversion concurrency control (no concurrency bugs, parallel writes, optimized for SSD and NVMe). -* Supports both metrics and arbitrary events. +* Supports both metrics and events. * Fast and effecient compression algorithm that outperforms 'Gorilla' time-series compression. * Crash safety and recovery. * Fast aggregation without pre-configured rollups or materialized views. @@ -66,12 +66,6 @@ Roadmap |Filter & join |+ |+ | - -Supported Architectures ------------------------ - -Akumuli supports 64 and 32-bit Intel processors. It also works on 64 and 32-bit ARM processors but these architectures are not covered by continous integration. - Gettings Started ---------------- * You can find [documentation](https://akumuli.gitbook.io/docs) here @@ -82,6 +76,8 @@ Gettings Started Supported Platforms ------------------- +Akumuli supports 64 and 32-bit Intel processors. It also works on 64 and 32-bit ARM processors but these architectures are not covered by continous integration. + Pre-built [Debian/RPM packages](https://packagecloud.io/Lazin/Akumuli) for the following platforms are available via packagecloud: