Skip to content

Commit

Permalink
Merge pull request #316 from akumuli/bugfix-315
Browse files Browse the repository at this point in the history
Event-store bugfix
  • Loading branch information
Lazin authored Sep 6, 2019
2 parents cddfb74 + dfe7e13 commit d111d43
Show file tree
Hide file tree
Showing 17 changed files with 152 additions and 63 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ matrix:
before_install:
- docker pull i386/ubuntu:xenial
script:
- docker run --log-driver=none -a stdin -a stdout -a stderr --rm -ti -u root -e FORCE_ARCH=i386 -e GENERIC_BUILD='false' -v "${TRAVIS_BUILD_DIR}":/opt/akumuli -w /opt/akumuli i386/ubuntu:xenial ./CI/runci.sh ubuntu-16.04
- echo "Disabled"
# Disabled because build fails due to timeout because of the slow ubuntu mirror
#- docker run --log-driver=none -a stdin -a stdout -a stderr --rm -ti -u root -e FORCE_ARCH=i386 -e GENERIC_BUILD='false' -v "${TRAVIS_BUILD_DIR}":/opt/akumuli -w /opt/akumuli i386/ubuntu:xenial ./CI/runci.sh ubuntu-16.04
# Container based 64-bit Xenial build
- os: linux
dist: trusty
Expand Down
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
Release notes
=============

Version 0.8.70
--------------

IMPROVEMENT

* Implement events filtering

BUG FIX

* Fix materializaton step for event

Version 0.8.69
--------------

Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 2.8)

set(APP_VERSION_MAJOR "0")
set(APP_VERSION_MINOR "8")
set(APP_VERSION_PATCH "69")
set(APP_VERSION_PATCH "70")

set(APP_VERSION "${APP_VERSION_MAJOR}.${APP_VERSION_MINOR}.${APP_VERSION_PATCH}")
add_definitions(-DAKU_VERSION="${APP_VERSION}")
Expand Down
2 changes: 1 addition & 1 deletion akumulid/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ void cmd_run_server(boost::optional<std::string> cmd_config_path) {
}

auto connection = std::make_shared<AkumuliConnection>(full_path.c_str(), params);
auto qproc = std::make_shared<QueryProcessor>(connection, 1000);
auto qproc = std::make_shared<QueryProcessor>(connection, 2048);

SignalHandler sighandler;
int srvid = 0;
Expand Down
10 changes: 6 additions & 4 deletions akumulid/protocolparser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,8 @@ bool RESPProtocolParser::parse_values(RESPStream& stream,
return true;
};
auto parse_event_value = [&](int at) {
std::tie(success, bytes_read) = stream.read_string(buf, buflen);
event_inp_buf_.resize(RESPStream::STRING_LENGTH_MAX);
std::tie(success, bytes_read) = stream.read_string(event_inp_buf_.data(), event_inp_buf_.size());
if (!success) {
return false;
}
Expand All @@ -425,7 +426,8 @@ bool RESPProtocolParser::parse_values(RESPStream& stream,
std::tie(msg, pos) = rdbuf_.get_error_context("event value is too big");
BOOST_THROW_EXCEPTION(ProtocolParserError(msg, pos));
}
events[at].assign(buf, buf + bytes_read);
events[at].assign(event_inp_buf_.data(),
event_inp_buf_.data() + bytes_read);
return true;
};
auto next = stream.next_type();
Expand Down Expand Up @@ -601,8 +603,8 @@ void RESPProtocolParser::worker() {
evt.payload.size = static_cast<u16>(len); // len guaranteed to fit
evt.timestamp = sample.timestamp;
evt.paramid = paramids_[i];
event_buf_.resize(len);
auto pevt = reinterpret_cast<aku_Sample*>(event_buf_.data());
event_out_buf_.resize(len);
auto pevt = reinterpret_cast<aku_Sample*>(event_out_buf_.data());
memcpy(pevt, &evt, sizeof(evt));
memcpy(pevt->payload.data, events_[i].data(), events_[i].size());
status = consumer_->write(*pevt);
Expand Down
3 changes: 2 additions & 1 deletion akumulid/protocolparser.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ class RESPProtocolParser {
u64 paramids_[AKU_LIMITS_MAX_ROW_WIDTH];
double values_ [AKU_LIMITS_MAX_ROW_WIDTH];
std::string events_ [AKU_LIMITS_MAX_ROW_WIDTH];
std::vector<char> event_buf_;
std::vector<char> event_inp_buf_;
std::vector<char> event_out_buf_;

//! Process frames from queue
void worker();
Expand Down
2 changes: 1 addition & 1 deletion akumulid/query_results_pooler.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ struct QueryResultsPooler : ReadOperation {
std::vector<char> rdbuf_; //! Read buffer
int rdbuf_pos_; //! Read position in buffer
int rdbuf_top_; //! Last initialized item _index_ in `rdbuf_`
static const size_t DEFAULT_RDBUF_SIZE_ = 1000u;
static const size_t DEFAULT_RDBUF_SIZE_ = 1024;
static const size_t DEFAULT_ITEM_SIZE_ = sizeof(aku_Sample);
ApiEndpoint endpoint_;
bool error_produced_;
Expand Down
25 changes: 0 additions & 25 deletions functests/storage_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -760,31 +760,6 @@ int main(int argc, const char** argv) {
storage.close();
}

{
storage.open();
std::vector<DataPoint> exppoints = {
{ "20150101T000020.000000000", "cpu key=2", 2.0 },
{ "20150101T000021.000000000", "cpu key=3", 2.1 },
{ "20150101T000022.000000000", "cpu key=4", 2.2 },
{ "20150101T000023.000000000", "cpu key=5", 2.3 },
};
std::vector<DataPoint> newpoints = {
{ "20150101T000024.000000000", "cpu key=1", 2.4 },
{ "20150101T000025.000000000", "cpu key=2", 2.5 },
{ "20150101T000026.000000000", "cpu key=3", 2.6 },
{ "20150101T000027.000000000", "cpu key=4", 2.7 },
{ "20150101T000028.000000000", "cpu key=5", 2.8 },
{ "20150101T000029.000000000", "cpu key=1", 2.8 },
};
std::vector<std::string> ids = {
"cpu key=1",
"cpu key=2",
"cpu key=3",
"cpu key=4",
"cpu key=5",
};
}

std::cout << "OK!" << std::endl;

storage.delete_all();
Expand Down
26 changes: 4 additions & 22 deletions libakumuli/metadatastorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ void MetadataStorage::upsert_rescue_points(std::unordered_map<aku_ParamId, std::
"INSERT OR REPLACE INTO akumuli_rescue_points (storage_id, addr0, addr1, addr2, addr3, addr4, addr5, addr6, addr7) VALUES ";
size_t ix = 0;
for (auto const& kv: batch) {
query << "( " << kv.first;
query << "( " << static_cast<i64>(kv.first);
for (auto id: kv.second) {
if (id == ~0ull) {
// Values that big can't be represented in SQLite, -1 value should be interpreted as EMPTY_ADDR,
Expand Down Expand Up @@ -527,26 +527,8 @@ void MetadataStorage::insert_new_names(std::vector<SeriesT> &&items) {
}

boost::optional<i64> MetadataStorage::get_prev_largest_id() {
auto query_max = "SELECT max(storage_id) FROM akumuli_series;";
auto query_min = "SELECT max(storage_id) FROM akumuli_series;";
auto query_max = "SELECT max(abs(storage_id)) FROM akumuli_series;";
i64 max_id = 0;
i64 min_id = 0;
try {
auto results = select_query(query_min);
auto row = results.at(0);
if (row.empty()) {
AKU_PANIC("Can't get max storage id");
}
auto id = row.at(0);
if (id == "") {
// Table is empty
return boost::optional<i64>();
}
min_id = boost::lexical_cast<i64>(id);
} catch(...) {
Logger::msg(AKU_LOG_ERROR, boost::current_exception_diagnostic_information().c_str());
AKU_PANIC("Can't get max storage id");
}
try {
auto results = select_query(query_max);
auto row = results.at(0);
Expand All @@ -563,7 +545,7 @@ boost::optional<i64> MetadataStorage::get_prev_largest_id() {
Logger::msg(AKU_LOG_ERROR, boost::current_exception_diagnostic_information().c_str());
AKU_PANIC("Can't get max storage id");
}
return std::max(max_id, -1*min_id);
return max_id;
}

aku_Status MetadataStorage::load_matcher_data(SeriesMatcherBase& matcher) {
Expand All @@ -575,7 +557,7 @@ aku_Status MetadataStorage::load_matcher_data(SeriesMatcherBase& matcher) {
continue;
}
auto series = row.at(0);
auto id = boost::lexical_cast<u64>(row.at(1));
auto id = boost::lexical_cast<i64>(row.at(1));
matcher._add(series, id);
}
} catch(...) {
Expand Down
27 changes: 27 additions & 0 deletions libakumuli/query_processing/queryparser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,23 @@ static std::tuple<aku_Status, std::string, ErrorMsg> parse_select_events_stmt(bo
return std::make_tuple(AKU_EQUERY_PARSING_ERROR, "", "Query object doesn't have a 'select-events' field");
}

static std::tuple<aku_Status, std::string, ErrorMsg> parse_select_events_filter_field(boost::property_tree::ptree const& ptree) {
auto flt = ptree.get_child_optional("filter");
if (flt && flt->empty()) {
// select query
auto str = flt->get_value<std::string>("");
if (!str.empty()) {
try {
std::regex rexp(str.data(), std::regex_constants::ECMAScript);
} catch (const std::regex_error& w) {
return std::make_tuple(AKU_EBAD_ARG, "", w.what());
}
return std::make_tuple(AKU_SUCCESS, str, ErrorMsg());
}
}
return std::make_tuple(AKU_SUCCESS, "", "");
}

/** Parse `join` statement, format:
* { "join": [ "metric1", "metric2", ... ], ... }
*/
Expand Down Expand Up @@ -1349,6 +1366,16 @@ std::tuple<aku_Status, ReshapeRequest, ErrorMsg> QueryParser::parse_select_event
return std::make_tuple(status, result, error);
}

// Filter
std::string flt;
std::tie(status, flt, error) = parse_select_events_filter_field(ptree);
if (status != AKU_SUCCESS) {
return std::make_tuple(status, result, error);
}
if (!flt.empty()) {
result.select.event_body_regex = flt;
}

// Group-by statement
GroupByOpType op;
std::vector<std::string> tags;
Expand Down
30 changes: 27 additions & 3 deletions libakumuli/query_processing/queryplan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ struct ScanEventsProcessingStep : ProcessingPrelude {
aku_Timestamp begin_;
aku_Timestamp end_;
std::vector<aku_ParamId> ids_;
std::string regex_;

//! C-tor (1), create scan without filter
template<class T>
ScanEventsProcessingStep(aku_Timestamp begin, aku_Timestamp end, T&& t)
: begin_(begin)
Expand All @@ -99,7 +101,20 @@ struct ScanEventsProcessingStep : ProcessingPrelude {
{
}

//! C-tor (2), create scan with filter
template<class T>
ScanEventsProcessingStep(aku_Timestamp begin, aku_Timestamp end, const std::string& exp, T&& t)
: begin_(begin)
, end_(end)
, ids_(std::forward<T>(t))
, regex_(exp)
{
}

virtual aku_Status apply(const ColumnStore& cstore) {
if (!regex_.empty()) {
return cstore.filter_events(ids_, begin_, end_, regex_, &scanlist_);
}
return cstore.scan_events(ids_, begin_, end_, &scanlist_);
}

Expand Down Expand Up @@ -991,9 +1006,18 @@ static std::tuple<aku_Status, std::unique_ptr<IQueryPlan>> scan_events_query_pla
}

std::unique_ptr<ProcessingPrelude> t1stage;
t1stage.reset(new ScanEventsProcessingStep (req.select.begin,
req.select.end,
req.select.columns.at(0).ids));
if (req.select.event_body_regex.empty()) {
// Regex filter is not set
t1stage.reset(new ScanEventsProcessingStep (req.select.begin,
req.select.end,
req.select.columns.at(0).ids));
}
else {
t1stage.reset(new ScanEventsProcessingStep (req.select.begin,
req.select.end,
req.select.event_body_regex,
req.select.columns.at(0).ids));
}

std::unique_ptr<MaterializationStep> t2stage;
if (req.group_by.enabled) {
Expand Down
1 change: 1 addition & 0 deletions libakumuli/queryprocessor_framework.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ std::shared_ptr<Node> create_node(std::string tag, boost::property_tree::ptree c

MutableSample::MutableSample(const aku_Sample* source)
: istuple_((source->payload.type & AKU_PAYLOAD_TUPLE) == AKU_PAYLOAD_TUPLE)
, orig_(nullptr)
{
auto size = std::max(sizeof(aku_Sample), static_cast<size_t>(source->payload.size));
memcpy(payload_.raw, source, size);
Expand Down
4 changes: 3 additions & 1 deletion libakumuli/queryprocessor_framework.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ struct Selection {
aku_Timestamp begin;
aku_Timestamp end;
bool events;
std::string event_body_regex;

//! This matcher should be used by Join-statement
std::shared_ptr<PlainSeriesMatcher> matcher;
Expand Down Expand Up @@ -175,7 +176,7 @@ struct Node;

struct MutableSample {
static constexpr size_t MAX_PAYLOAD_SIZE = sizeof(double)*58;
static constexpr size_t MAX_SIZE = sizeof(aku_Sample) + MAX_PAYLOAD_SIZE;
static constexpr size_t MAX_SIZE = 1024 + sizeof(aku_Sample);
union Payload {
aku_Sample sample;
char raw[MAX_SIZE];
Expand All @@ -184,6 +185,7 @@ struct MutableSample {
u32 size_;
u32 bitmap_;
const bool istuple_;
const aku_Sample *orig_;

MutableSample(const aku_Sample* source);

Expand Down
11 changes: 11 additions & 0 deletions libakumuli/storage_engine/column_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,17 @@ class ColumnStore : public std::enable_shared_from_this<ColumnStore> {
});
}

aku_Status filter_events(std::vector<aku_ParamId> const& ids,
aku_Timestamp begin,
aku_Timestamp end,
const std::string& expr,
std::vector<std::unique_ptr<BinaryDataOperator>>* dest) const
{
return iterate(ids, dest, [begin, end, expr](const NBTreeExtentsList& elist) {
return std::make_tuple(AKU_SUCCESS, elist.filter_binary(begin, end, expr));
});
}

aku_Status filter(std::vector<aku_ParamId> const& ids,
aku_Timestamp begin,
aku_Timestamp end,
Expand Down
50 changes: 50 additions & 0 deletions libakumuli/storage_engine/nbtree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <sstream>
#include <stack>
#include <array>
#include <regex>

// App
#include "nbtree.h"
Expand Down Expand Up @@ -1664,6 +1665,48 @@ class BinaryDataIterator : public BinaryDataOperator {
}
};

class BinaryDataFilter : public BinaryDataOperator {
std::unique_ptr<BinaryDataOperator> it_;
std::regex regex_;
public:
BinaryDataFilter(std::unique_ptr<BinaryDataOperator> base, const std::string& regex)
: it_(std::move(base))
, regex_(regex.data(), std::regex_constants::ECMAScript)
{
}

virtual std::tuple<aku_Status, size_t> read(aku_Timestamp *destts, std::string *destxs, size_t size) {
aku_Timestamp ts;
std::string xs;
aku_Status status;
size_t len;
size_t outlen = 0;
while (size != 0) {
std::tie(status, len) = it_->read(&ts, &xs, 1);
if (status != AKU_SUCCESS) {
if (status == AKU_ENO_DATA && len == 0) {
break;
} else if (status != AKU_ENO_DATA){
break;
}
}
if (len == 1) {
if (std::regex_search(xs, regex_)) {
outlen++;
*destts++ = ts;
*destxs++ = xs;
size--;
}
}
}
return std::make_pair(status, outlen);
}

virtual Direction get_direction() {
return it_->get_direction();
}
};


// ///////// //
// IOVecLeaf //
Expand Down Expand Up @@ -3891,6 +3934,13 @@ std::unique_ptr<BinaryDataOperator> NBTreeExtentsList::search_binary(aku_Timesta
return res;
}

std::unique_ptr<BinaryDataOperator> NBTreeExtentsList::filter_binary(aku_Timestamp begin, aku_Timestamp end, const std::string& regex) const {
auto it = search_binary(begin, end);
std::unique_ptr<BinaryDataOperator> op;
op.reset(new BinaryDataFilter(std::move(it), regex));
return op;
}

std::unique_ptr<RealValuedOperator> NBTreeExtentsList::filter(aku_Timestamp begin,
aku_Timestamp end,
const ValueFilter& filter) const
Expand Down
Loading

0 comments on commit d111d43

Please sign in to comment.