Skip to content
This repository has been archived by the owner on Mar 22, 2023. It is now read-only.

Commit

Permalink
[tests] add async cases for timestamp test
Browse files Browse the repository at this point in the history
  • Loading branch information
KFilipek committed Nov 29, 2022
1 parent 8459a93 commit 23e290a
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 6 deletions.
1 change: 0 additions & 1 deletion tests/common/stream_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,6 @@ struct pmemstream_helpers_type {
rrt = it->second;
}
for (const auto &e : data) {

auto [ret, entry] = stream.append(region, e, rrt);
UT_ASSERTeq(ret, 0);
}
Expand Down
89 changes: 84 additions & 5 deletions tests/unittest/timestamp.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// SPDX-License-Identifier: BSD-3-Clause
/* Copyright 2022, Intel Corporation */

#include <random>

#include "libpmemstream.h"
#include "rapidcheck_helpers.hpp"
#include "stream_helpers.hpp"
Expand All @@ -11,6 +13,31 @@
* timestamp - unit test for testing method pmemstream_entry_timestamp()
*/

void multithreaded_asynchronous_append(pmemstream_test_base &stream, const std::vector<pmemstream_region> &regions,
const std::vector<std::vector<std::string>> &data)
{
using future_type = decltype(stream.helpers.async_append(regions[0], data[0]));
std::vector<std::vector<future_type>> futures(data.size());

parallel_exec(data.size(), [&](size_t thread_id) {
for (auto &chunk : data) {
futures[thread_id].emplace_back(stream.helpers.async_append(regions[thread_id], chunk));
}
});

for (auto &future_sequence : futures) {
std::mt19937_64 g(*rc::gen::arbitrary<size_t>());
std::shuffle(future_sequence.begin(), future_sequence.end(), g);
}

parallel_exec(data.size(), [&](size_t thread_id) {
for (auto &fut : futures[thread_id]) {
while (fut.poll() != FUTURE_STATE_COMPLETE)
;
}
});
}

void multithreaded_synchronous_append(pmemstream_test_base &stream, const std::vector<pmemstream_region> &regions,
const std::vector<std::vector<std::string>> &data)
{
Expand All @@ -37,8 +64,7 @@ std::tuple<std::vector<pmemstream_region>, size_t> generate_and_append_data(pmem
concurrency_level, rc::gen::arbitrary<std::vector<std::string>>());

if (async) {
// XXX: multithreaded_asynchronous_append(stream, regions, data);
UT_ASSERT(false);
multithreaded_asynchronous_append(stream, regions, data);
} else {
multithreaded_synchronous_append(stream, regions, data);
}
Expand All @@ -52,9 +78,9 @@ std::tuple<std::vector<pmemstream_region>, size_t> generate_and_append_data(pmem
}

size_t remove_random_region(pmemstream_with_multi_empty_regions &stream, std::vector<pmemstream_region> &regions,
size_t concurrency_level)
test_config_type &config)
{
size_t pos = *rc::gen::inRange<size_t>(0, concurrency_level);
size_t pos = *rc::gen::inRange<size_t>(0, get_concurrency_level(config, regions));
auto region_to_remove = regions[pos];
auto region_size = stream.sut.region_size(region_to_remove);
UT_ASSERTeq(stream.helpers.remove_region(region_to_remove.offset), 0);
Expand Down Expand Up @@ -104,7 +130,7 @@ int main(int argc, char *argv[])
generate_and_append_data(stream, test_config, false /* sync */);
auto concurrency_level = get_concurrency_level(test_config, regions);

auto region_size = remove_random_region(stream, regions, concurrency_level);
auto region_size = remove_random_region(stream, regions, test_config);

/* Global ordering validation. */
if (regions.size() >= 1)
Expand All @@ -115,6 +141,59 @@ int main(int argc, char *argv[])

regions.push_back(stream.helpers.initialize_single_region(region_size, extra_data));

UT_ASSERTeq(stream.helpers.get_entries_from_regions(regions).size(),
elements * (concurrency_level - 1) + extra_data.size());
UT_ASSERT(stream.helpers.validate_timestamps_possible_gaps(regions));
});

ret += rc::check("timestamp values should increase in each region after asynchronous append",
[&](pmemstream_with_multi_empty_regions &&stream) {
auto [regions, elements] =
generate_and_append_data(stream, test_config, true /* async */);

/* Single region ordering validation. */
for (auto &region : regions) {
UT_ASSERT(stream.helpers.validate_timestamps_possible_gaps({region}));
}
});

ret += rc::check(
"timestamp values should globally increase in multi-region environment after asynchronous append",
[&](pmemstream_with_multi_empty_regions &&stream) {
auto [regions, elements] =
generate_and_append_data(stream, test_config, true /* async */);

/* Global ordering validation */
UT_ASSERT(stream.helpers.validate_timestamps_no_gaps(regions));
});

ret += rc::check(
"timestamp values should globally increase in multi-region environment after asynchronous append to respawned region",
[&](pmemstream_with_multi_empty_regions &&stream, const std::vector<std::string> &extra_data) {
RC_PRE(extra_data.size() > 0);
auto [regions, elements] =
generate_and_append_data(stream, test_config, true /* async */);
auto concurrency_level = get_concurrency_level(test_config, regions);

auto region_size = remove_random_region(stream, regions, test_config);

/* Global ordering validation. */
if (regions.size() >= 1)
UT_ASSERT(stream.helpers.validate_timestamps_possible_gaps(regions));

UT_ASSERTeq(stream.helpers.get_entries_from_regions(regions).size(),
elements * (concurrency_level - 1));

{
auto [ret, region] = stream.sut.region_allocate(region_size);
UT_ASSERTeq(ret, 0);
regions.push_back(region);

auto future = stream.helpers.async_append(region, extra_data);
while (future.poll() != FUTURE_STATE_COMPLETE)
;
}

UT_ASSERTeq(stream.helpers.get_entries_from_regions(regions).size(),
elements * (concurrency_level - 1) + extra_data.size());
UT_ASSERT(stream.helpers.validate_timestamps_possible_gaps(regions));
Expand Down

0 comments on commit 23e290a

Please sign in to comment.