Skip to content

Commit

Permalink
Merge pull request #314 from akumuli/fix-protocol-parser
Browse files Browse the repository at this point in the history
Fix protocol parser
Fix #313
  • Loading branch information
Lazin authored Aug 14, 2019
2 parents 1a9ff2c + 67a88e6 commit cddfb74
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 74 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
Release notes
=============

Version 0.8.69
--------------

IMPROVEMENT

* Add --config command line argument

BUG FIX

* Fix framing issue in RESP protocol parser

Version 0.8.68
--------------

Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 2.8)

set(APP_VERSION_MAJOR "0")
set(APP_VERSION_MINOR "8")
set(APP_VERSION_PATCH "68")
set(APP_VERSION_PATCH "69")

set(APP_VERSION "${APP_VERSION_MAJOR}.${APP_VERSION_MINOR}.${APP_VERSION_PATCH}")
add_definitions(-DAKU_VERSION="${APP_VERSION}")
Expand Down
49 changes: 30 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
README [![Build Status](https://travis-ci.org/akumuli/Akumuli.svg?branch=master)](https://travis-ci.org/akumuli/Akumuli) [![Coverity Scan Build Status](https://scan.coverity.com/projects/8879/badge.svg)](https://scan.coverity.com/projects/akumuli) [![Join the chat at https://gitter.im/akumuli/Akumuli](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/akumuli/Akumuli?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
README [![Build Status](https://travis-ci.org/akumuli/Akumuli.svg?branch=master)](https://travis-ci.org/akumuli/Akumuli) [![Coverity Scan Build Status](https://scan.coverity.com/projects/8879/badge.svg)](https://scan.coverity.com/projects/akumuli)
======

**Akumuli** is a time-series database for modern hardware.
Expand All @@ -8,25 +8,30 @@ The word "akumuli" can be translated from Esperanto as "accumulate".
Features
-------

* True column-oriented format (not PAX).
* Column-oriented storage.
* Based on novel [LSM and B+tree hybrid datastructure](http://akumuli.org/akumuli/2017/04/29/nbplustree/) with multiversion concurrency control (no concurrency bugs, parallel writes, optimized for SSD and NVMe).
* Supports both metrics and events.
* Fast and effecient compression algorithm that outperforms 'Gorilla' time-series compression.
* Crash safety and recovery.
* Fast aggregation without pre-configured rollups or materialized views.
* Queries can be executed without decompressing the data.
* Fast compression algorithm (dictionary + entropy) with small memory overhead (about 2.5 bytes per element on appropriate data).
* Many queries can be executed without decompressing the data.
* Compressed in-memory storage for recent data.
* Can be used as a server application or an embedded library.
* Simple query language based on JSON and HTTP.
* Can be used as a server application or embedded library.
* Simple API based on JSON and HTTP.
* Fast range scans and joins, read speed doesn't depend on database cardinality.
* Fast data ingestion over the network:
* 4.5M data points per second on 8-core Intel Xeon E5-2670 v2 (m3.2xlarge EC2 instance).
* 16.1M data points per second on 32-core Intel Xeon E5-2680 v2 (c3.8xlarge EC2 instance).
* Query results are streamed to client using the chunked transfer encoding of the HTTP protocol.
* Decompression algorithm and input parsers were fuzz-tested.
* Fast data ingestion:
* 5.4M writes/sec on DigitalOcean droplet with 8-cores 32GB of RAM (using only 6 cores)
* 4.6M writes/sec on DigitalOcean droplet with 8-cores 32GB of RAM (6 cores with enabled WAL)
* 16.1M writes/sec on 32-core Intel Xeon E5-2680 v2 (c3.8xlarge EC2 instance).
* Queries are executed lazily. Query results are produced as long as client reads them.
* Compression algorithm and input parsers are fuzz-tested on every code change.
* Grafana [datasource](https://github.com/akumuli/akumuli-datasource) plugin.
* Fast and compact inverted index for series lookup.
* Fast and compact inverted index for time-series lookup.


Roadmap
------

|Storage engine features |Current version|Future versions|
|-------------------------------|---------------|---------------|
|Inserts |In order |Out of order |
Expand All @@ -36,7 +41,7 @@ Features
|Compression |+ |+ |
|Tags |+ |+ |
|High-throughput ingestion |+ |+ |
|High cardinality |- |+ |
|High cardinality |+ |+ |
|Crash recovery |+ |+ |
|Incremental backup |- |+ |
|Clustering |- |+ |
Expand All @@ -60,6 +65,7 @@ Features
|Filter & group-aggregate |+ |+ |
|Filter & join |+ |+ |


Gettings Started
----------------
* You can find [documentation](https://akumuli.gitbook.io/docs) here
Expand All @@ -70,15 +76,20 @@ Gettings Started
Supported Platforms
-------------------

Akumuli supports 64 and 32-bit Intel processors. It also works on 64 and 32-bit ARM processors but these architectures are not covered by continous integration.

Pre-built [Debian/RPM packages](https://packagecloud.io/Lazin/Akumuli) for the following platforms
are available via packagecloud:

* Ubuntu 14.04
* Ubuntu 16.04
* Ubuntu 18.04
* Debian Jessie
* Debian Stretch
* CentOS 7
* AMD 64 Ubuntu 14.04
* AMD 64 Ubuntu 16.04
* AMD 64 Ubuntu 18.04
* AMD 64 Debian Jessie
* AMD 64 Debian Stretch
* AMD 64 CentOS 7
* ARM 64 Ubuntu 16.04
* ARM 64 Ubuntu 18.04
* ARM 64 CentOS 7

Docker image is availabe through [Docker Hub](https://hub.docker.com/r/akumuli/akumuli/tags/).

Expand Down
89 changes: 48 additions & 41 deletions akumulid/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ nvolumes=4
struct ConfigFile {
typedef boost::property_tree::ptree PTree;

static boost::filesystem::path default_config_path() {
static boost::filesystem::path get_config_path(boost::optional<std::string> config_path) {
if (config_path) {
return expand_path(*config_path);
}
auto path2cfg = boost::filesystem::path(getenv("HOME"));
path2cfg /= ".akumulid";
return path2cfg;
Expand Down Expand Up @@ -448,9 +451,9 @@ void create_db_files(const char* path,
/** Read configuration file and run server.
* If config file can't be found - report error.
*/
void cmd_run_server() {
void cmd_run_server(boost::optional<std::string> cmd_config_path) {

auto config_path = ConfigFile::default_config_path();
auto config_path = ConfigFile::get_config_path(cmd_config_path);
auto config = ConfigFile::read_config_file(config_path);
auto path = ConfigFile::get_path(config);
auto ingestion_servers = ConfigFile::get_server_settings(config);
Expand Down Expand Up @@ -537,9 +540,8 @@ void cmd_run_server() {

/** Create database command.
*/
void cmd_create_database(bool test_db=false, bool allocate=false) {
auto config_path = ConfigFile::default_config_path();

void cmd_create_database(boost::optional<std::string> cmd_config_path, bool test_db=false, bool allocate=false) {
auto config_path = ConfigFile::get_config_path(cmd_config_path);
auto config = ConfigFile::read_config_file(config_path);
auto path = ConfigFile::get_path(config);
auto volumes = ConfigFile::get_nvolumes(config);
Expand All @@ -552,12 +554,11 @@ void cmd_create_database(bool test_db=false, bool allocate=false) {
create_db_files(path.c_str(), volumes, volsize, allocate);
}

void cmd_delete_database() {
auto config_path = ConfigFile::default_config_path();

auto config = ConfigFile::read_config_file(config_path);
auto path = ConfigFile::get_path(config);
auto wal_path = ConfigFile::get_wal_settings(config).path;
void cmd_delete_database(boost::optional<std::string> cmd_config_path) {
auto config_path = ConfigFile::get_config_path(cmd_config_path);
auto config = ConfigFile::read_config_file(config_path);
auto path = ConfigFile::get_path(config);
auto wal_path = ConfigFile::get_wal_settings(config).path;

auto full_path = boost::filesystem::path(path) / "db.akumuli";
if (boost::filesystem::exists(full_path)) {
Expand All @@ -583,10 +584,10 @@ void cmd_delete_database() {
}
}

void cmd_dump_debug_information(const char* outfname) {
auto config_path = ConfigFile::default_config_path();
auto config = ConfigFile::read_config_file(config_path);
auto path = ConfigFile::get_path(config);
void cmd_dump_debug_information(boost::optional<std::string> cmd_config_path, const char* outfname) {
auto config_path = ConfigFile::get_config_path(cmd_config_path);
auto config = ConfigFile::read_config_file(config_path);
auto path = ConfigFile::get_path(config);

auto full_path = boost::filesystem::path(path) / "db.akumuli";
if (boost::filesystem::exists(full_path)) {
Expand Down Expand Up @@ -615,10 +616,10 @@ void cmd_dump_debug_information(const char* outfname) {
}
}

void cmd_dump_recovery_debug_information(const char* outfname) {
auto config_path = ConfigFile::default_config_path();
auto config = ConfigFile::read_config_file(config_path);
auto path = ConfigFile::get_path(config);
void cmd_dump_recovery_debug_information(boost::optional<std::string> cmd_config_path, const char* outfname) {
auto config_path = ConfigFile::get_config_path(cmd_config_path);
auto config = ConfigFile::read_config_file(config_path);
auto path = ConfigFile::get_path(config);

auto full_path = boost::filesystem::path(path) / "db.akumuli";
if (boost::filesystem::exists(full_path)) {
Expand Down Expand Up @@ -665,17 +666,10 @@ int main(int argc, char** argv) {
try {
std::locale::global(std::locale("C"));

aku_initialize(&panic_handler, &static_logger);

// Init logger
auto path = ConfigFile::default_config_path();
if (boost::filesystem::exists(path)) {
Logger::init(path.c_str());
}

po::options_description cli_only_options;
cli_only_options.add_options()
("help", "Produce help message")
("config", po::value<std::string>(), "Path to configuration file")
("create", "Create database")
("allocate", "Preallocate disk space")
("delete", "Delete database")
Expand All @@ -691,6 +685,24 @@ int main(int argc, char** argv) {
po::store(po::parse_command_line(argc, argv, cli_only_options), vm);
po::notify(vm);

if (vm.count("help")) {
rich_print(CLI_HELP_MESSAGE);
exit(EXIT_SUCCESS);
}

boost::optional<std::string> cmd_config_path;
if (vm.count("config")) {
cmd_config_path = vm["config"].as<std::string>();
}

aku_initialize(&panic_handler, &static_logger);

// Init logger
auto path = ConfigFile::get_config_path(cmd_config_path);
if (boost::filesystem::exists(path)) {
Logger::init(path.c_str());
}

std::stringstream header;
#ifndef AKU_VERSION
header << "\n\nStarted\n\n";
Expand All @@ -704,11 +716,6 @@ int main(int argc, char** argv) {
header << "\n\n";
logger.info() << header.str();

if (vm.count("help")) {
rich_print(CLI_HELP_MESSAGE);
exit(EXIT_SUCCESS);
}

if (vm.count("init")) {
bool disable_wal = vm.count("disable-wal");
ConfigFile::init_config(path, disable_wal);
Expand All @@ -733,41 +740,41 @@ int main(int argc, char** argv) {
bool allocate = false;
if(vm.count("allocate"))
allocate = true;
cmd_create_database(false, allocate);
cmd_create_database(cmd_config_path, false, allocate);
exit(EXIT_SUCCESS);
}

if (vm.count("CI")) {
cmd_create_database(true);
cmd_create_database(cmd_config_path, true);
exit(EXIT_SUCCESS);
}

if (vm.count("delete")) {
cmd_delete_database();
cmd_delete_database(cmd_config_path);
exit(EXIT_SUCCESS);
}

if (vm.count("debug-dump")) {
auto path = vm["debug-dump"].as<std::string>();
if (path == "stdout") {
cmd_dump_debug_information(nullptr);
cmd_dump_debug_information(cmd_config_path, nullptr);
} else {
cmd_dump_debug_information(path.c_str());
cmd_dump_debug_information(cmd_config_path, path.c_str());
}
exit(EXIT_SUCCESS);
}

if (vm.count("debug-recovery-dump")) {
auto path = vm["debug-recovery-dump"].as<std::string>();
if (path == "stdout") {
cmd_dump_recovery_debug_information(nullptr);
cmd_dump_recovery_debug_information(cmd_config_path, nullptr);
} else {
cmd_dump_recovery_debug_information(path.c_str());
cmd_dump_recovery_debug_information(cmd_config_path, path.c_str());
}
exit(EXIT_SUCCESS);
}

cmd_run_server();
cmd_run_server(cmd_config_path);

logger.info() << "\n\nClean exit\n\n";

Expand Down
19 changes: 12 additions & 7 deletions akumulid/protocolparser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,15 +509,20 @@ bool RESPProtocolParser::parse_values(RESPStream& stream,
}
}
} else {
if (next == RESPStream::STRING) {
if (!parse_event_value(i)) {
switch (next) {
case RESPStream::STRING:
if (!parse_event_value(i)) {
return false;
}
break;
case RESPStream::_AGAIN:
return false;
default: {
std::string msg;
size_t pos;
std::tie(msg, pos) = rdbuf_.get_error_context("unexpected event format");
BOOST_THROW_EXCEPTION(ProtocolParserError(msg, pos));
}
} else {
std::string msg;
size_t pos;
std::tie(msg, pos) = rdbuf_.get_error_context("unexpected event format");
BOOST_THROW_EXCEPTION(ProtocolParserError(msg, pos));
}
}
}
Expand Down
Loading

0 comments on commit cddfb74

Please sign in to comment.