Skip to content

Commit

Permalink
Merge branch 'issue233'. Closes: #233
Browse files Browse the repository at this point in the history
  • Loading branch information
spanezz committed Sep 1, 2020
2 parents 0dabb5f + bda129a commit 0bfb801
Show file tree
Hide file tree
Showing 32 changed files with 288 additions and 222 deletions.
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
* Fixed error reading offline archives (#232)
* Allow to create an arkimet session with `force_dir_segments=True` to always
store each data in a separate file, to help arkimaps prototyping.
* New `eatmydata = yes` dataset configuration (disabled by default) to turn off
multi-process dataset access and consistency guarantees on the datasets in
case of system crashes, in favour of speed. This is useful when creating, for
example, a temporary work dataset in a temporary directory (#233)

# New in version 1.28

Expand Down
26 changes: 24 additions & 2 deletions arki/dataset-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ namespace {
struct ForceDirMockDataSession : public arki::dataset::Session
{
public:
std::shared_ptr<arki::segment::Writer> segment_writer(const std::string& format, const std::string& root, const std::string& relpath) override
std::shared_ptr<arki::segment::Writer> segment_writer(const segment::WriterConfig& writer_config, const std::string& format, const std::string& root, const std::string& relpath) override
{
std::string abspath = str::joinpath(root, relpath);
return std::shared_ptr<arki::segment::Writer>(new arki::segment::dir::HoleWriter(format, root, relpath, abspath));
return std::shared_ptr<arki::segment::Writer>(new arki::segment::dir::HoleWriter(writer_config, format, root, relpath, abspath));
}
};

Expand Down Expand Up @@ -438,5 +438,27 @@ this->add_method("test_acquire", [](Fixture& f) {
// TODO: add tests for test_acquire
});

this->add_method("import_eatmydata", [](Fixture& f) {
f.cfg->set("eatmydata", "yes");

{
auto ds = f.config().create_writer();
for (auto& md: f.td.mds)
wassert(actual(*ds).import(*md));

dataset::WriterBatch batch;
for (auto& md: f.td.mds)
batch.emplace_back(make_shared<dataset::WriterBatchElement>(*md));
wassert(ds->acquire_batch(batch, dataset::REPLACE_ALWAYS));
}

metadata::Collection mdc(*f.config().create_reader(), "");
if (f.cfg->value("type") == "simple")
wassert(actual(mdc.size()) == 6u);
else
wassert(actual(mdc.size()) == 3u);
});


}
}
22 changes: 13 additions & 9 deletions arki/dataset/index/manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,8 @@ class PlainManifest : public Manifest
for (vector<Info>::const_iterator i = info.begin();
i != info.end(); ++i)
i->write(out);
out.fdatasync();
if (!dataset->eatmydata)
out.fdatasync();
out.close();

if (::rename(pathname.c_str(), str::joinpath(m_path, "MANIFEST").c_str()) < 0)
Expand Down Expand Up @@ -555,16 +556,19 @@ class SqliteManifest : public Manifest
mutable utils::sqlite::SQLiteDB m_db;
utils::sqlite::InsertQuery m_insert;

void setupPragmas()
void setup_pragmas()
{
// Also, since the way we do inserts cause no trouble if a reader reads a
// partial insert, we do not need read locking
//m_db.exec("PRAGMA read_uncommitted = 1");
if (dataset->eatmydata)
{
m_db.exec("PRAGMA synchronous = OFF");
m_db.exec("PRAGMA journal_mode = MEMORY");
} else {
// Use a WAL journal, which allows reads and writes together
m_db.exec("PRAGMA journal_mode = WAL");
}
// Use new features, if we write we read it, so we do not need to
// support sqlite < 3.3.0 if we are above that version
m_db.exec("PRAGMA legacy_file_format = 0");
// Enable WAL journaling, which doesn't lock reads while writing
m_db.exec("PRAGMA journal_mode = WAL");
}

void initQueries()
Expand Down Expand Up @@ -615,7 +619,7 @@ class SqliteManifest : public Manifest
throw std::runtime_error("opening archive index: index " + pathname + " does not exist");

m_db.open(pathname);
setupPragmas();
setup_pragmas();

initQueries();
}
Expand All @@ -633,7 +637,7 @@ class SqliteManifest : public Manifest
bool need_create = !sys::access(pathname, F_OK);

m_db.open(pathname);
setupPragmas();
setup_pragmas();

if (need_create)
initDB();
Expand Down
20 changes: 9 additions & 11 deletions arki/dataset/iseg/index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,14 @@ std::set<types::Code> Index::unique_codes() const

void Index::setup_pragmas()
{
// Faster but riskier, since we do not have a flagfile to trap
// interrupted transactions
//m_db.exec("PRAGMA synchronous = OFF");
// Faster but riskier, since we do not have a flagfile to trap
// interrupted transactions
//m_db.exec("PRAGMA journal_mode = MEMORY");
// Truncate the journal instead of delete: faster on many file systems
// m_db.exec("PRAGMA journal_mode = TRUNCATE");
// Zero the header of the journal instead of delete: faster on many file systems
// Use a WAL journal, which allows reads and writes together
m_db.exec("PRAGMA journal_mode = WAL");
if (dataset->eatmydata)
{
m_db.exec("PRAGMA synchronous = OFF");
m_db.exec("PRAGMA journal_mode = MEMORY");
} else {
// Use a WAL journal, which allows reads and writes together
m_db.exec("PRAGMA journal_mode = WAL");
}
// Also, since the way we do inserts cause no trouble if a reader reads a
// partial insert, we do not need read locking
//m_db.exec("PRAGMA read_uncommitted = 1");
Expand Down Expand Up @@ -483,6 +480,7 @@ WIndex::WIndex(std::shared_ptr<iseg::Dataset> dataset, const std::string& data_r
} else {
m_db.open(index_pathname);
if (dataset->trace_sql) m_db.trace();
setup_pragmas();
init_others();
}
}
Expand Down
66 changes: 39 additions & 27 deletions arki/dataset/iseg/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct AppendSegment
{
}

WriterAcquireResult acquire_replace_never(Metadata& md, index::SummaryCache& scache, bool drop_cached_data_on_commit)
WriterAcquireResult acquire_replace_never(Metadata& md, index::SummaryCache& scache)
{
Pending p_idx = idx.begin_transaction();

Expand All @@ -51,7 +51,7 @@ struct AppendSegment
}
// Invalidate the summary cache for this month
scache.invalidate(md);
segment->append(md, drop_cached_data_on_commit);
segment->append(md);
segment->commit();
p_idx.commit();
return ACQ_OK;
Expand All @@ -62,15 +62,15 @@ struct AppendSegment
}
}

WriterAcquireResult acquire_replace_always(Metadata& md, index::SummaryCache& scache, bool drop_cached_data_on_commit)
WriterAcquireResult acquire_replace_always(Metadata& md, index::SummaryCache& scache)
{
Pending p_idx = idx.begin_transaction();

try {
idx.replace(md, segment->next_offset());
// Invalidate the summary cache for this month
scache.invalidate(md);
segment->append(md, drop_cached_data_on_commit);
segment->append(md);
segment->commit();
p_idx.commit();
return ACQ_OK;
Expand All @@ -81,7 +81,7 @@ struct AppendSegment
}
}

WriterAcquireResult acquire_replace_higher_usn(Metadata& md, index::SummaryCache& scache, bool drop_cached_data_on_commit)
WriterAcquireResult acquire_replace_higher_usn(Metadata& md, index::SummaryCache& scache)
{
Pending p_idx = idx.begin_transaction();

Expand Down Expand Up @@ -112,12 +112,12 @@ struct AppendSegment

// Replace, reusing the pending datafile transaction from earlier
idx.replace(md, segment->next_offset());
segment->append(md, drop_cached_data_on_commit);
segment->append(md);
segment->commit();
p_idx.commit();
return ACQ_OK;
} else {
segment->append(md, drop_cached_data_on_commit);
segment->append(md);
// Invalidate the summary cache for this month
scache.invalidate(md);
segment->commit();
Expand All @@ -131,7 +131,7 @@ struct AppendSegment
}
}

void acquire_batch_replace_never(WriterBatch& batch, index::SummaryCache& scache, bool drop_cached_data_on_commit)
void acquire_batch_replace_never(WriterBatch& batch, index::SummaryCache& scache)
{
Pending p_idx = idx.begin_transaction();

Expand All @@ -149,7 +149,7 @@ struct AppendSegment

// Invalidate the summary cache for this month
scache.invalidate(e->md);
segment->append(e->md, drop_cached_data_on_commit);
segment->append(e->md);
e->result = ACQ_OK;
e->dataset_name = dataset->name();
}
Expand All @@ -163,7 +163,7 @@ struct AppendSegment
p_idx.commit();
}

void acquire_batch_replace_always(WriterBatch& batch, index::SummaryCache& scache, bool drop_cached_data_on_commit)
void acquire_batch_replace_always(WriterBatch& batch, index::SummaryCache& scache)
{
Pending p_idx = idx.begin_transaction();

Expand All @@ -174,7 +174,7 @@ struct AppendSegment
idx.replace(e->md, segment->next_offset());
// Invalidate the summary cache for this month
scache.invalidate(e->md);
segment->append(e->md, drop_cached_data_on_commit);
segment->append(e->md);
e->result = ACQ_OK;
e->dataset_name = dataset->name();
}
Expand All @@ -188,7 +188,7 @@ struct AppendSegment
p_idx.commit();
}

void acquire_batch_replace_higher_usn(WriterBatch& batch, index::SummaryCache& scache, bool drop_cached_data_on_commit)
void acquire_batch_replace_higher_usn(WriterBatch& batch, index::SummaryCache& scache)
{
Pending p_idx = idx.begin_transaction();

Expand Down Expand Up @@ -232,14 +232,14 @@ struct AppendSegment

// Replace, reusing the pending datafile transaction from earlier
idx.replace(e->md, segment->next_offset());
segment->append(e->md, drop_cached_data_on_commit);
segment->append(e->md);
e->result = ACQ_OK;
e->dataset_name = dataset->name();
continue;
} else {
// Invalidate the summary cache for this month
scache.invalidate(e->md);
segment->append(e->md, drop_cached_data_on_commit);
segment->append(e->md);
e->result = ACQ_OK;
e->dataset_name = dataset->name();
}
Expand Down Expand Up @@ -271,18 +271,18 @@ Writer::~Writer()

std::string Writer::type() const { return "iseg"; }

std::unique_ptr<AppendSegment> Writer::file(const Metadata& md)
std::unique_ptr<AppendSegment> Writer::file(const segment::WriterConfig& writer_config, const Metadata& md)
{
const core::Time& time = md.get<types::reftime::Position>()->time;
string relpath = dataset().step()(time) + "." + dataset().format;
return file(relpath);
return file(writer_config, relpath);
}

std::unique_ptr<AppendSegment> Writer::file(const std::string& relpath)
std::unique_ptr<AppendSegment> Writer::file(const segment::WriterConfig& writer_config, const std::string& relpath)
{
sys::makedirs(str::dirname(str::joinpath(dataset().path, relpath)));
std::shared_ptr<dataset::AppendLock> append_lock(dataset().append_lock_segment(relpath));
auto segment = dataset().session->segment_writer(dataset().format, dataset().path, relpath);
auto segment = dataset().session->segment_writer(writer_config, dataset().format, dataset().path, relpath);
return std::unique_ptr<AppendSegment>(new AppendSegment(m_dataset, append_lock, segment));
}

Expand All @@ -297,12 +297,16 @@ WriterAcquireResult Writer::acquire(Metadata& md, const AcquireConfig& cfg)

ReplaceStrategy replace = cfg.replace == REPLACE_DEFAULT ? dataset().default_replace_strategy : cfg.replace;

auto segment = file(md);
segment::WriterConfig writer_config;
writer_config.drop_cached_data_on_commit = cfg.drop_cached_data_on_commit;
writer_config.eatmydata = dataset().eatmydata;

auto segment = file(writer_config, md);
switch (replace)
{
case REPLACE_NEVER: return segment->acquire_replace_never(md, scache, cfg.drop_cached_data_on_commit);
case REPLACE_ALWAYS: return segment->acquire_replace_always(md, scache, cfg.drop_cached_data_on_commit);
case REPLACE_HIGHER_USN: return segment->acquire_replace_higher_usn(md, scache, cfg.drop_cached_data_on_commit);
case REPLACE_NEVER: return segment->acquire_replace_never(md, scache);
case REPLACE_ALWAYS: return segment->acquire_replace_always(md, scache);
case REPLACE_HIGHER_USN: return segment->acquire_replace_higher_usn(md, scache);
default:
{
stringstream ss;
Expand All @@ -324,22 +328,26 @@ void Writer::acquire_batch(WriterBatch& batch, const AcquireConfig& cfg)
return;
}

segment::WriterConfig writer_config;
writer_config.drop_cached_data_on_commit = cfg.drop_cached_data_on_commit;
writer_config.eatmydata = dataset().eatmydata;

std::map<std::string, WriterBatch> by_segment = batch_by_segment(batch);

// Import segment by segment
for (auto& s: by_segment)
{
auto segment = file(s.first);
auto segment = file(writer_config, s.first);
switch (replace)
{
case REPLACE_NEVER:
segment->acquire_batch_replace_never(s.second, scache, cfg.drop_cached_data_on_commit);
segment->acquire_batch_replace_never(s.second, scache);
break;
case REPLACE_ALWAYS:
segment->acquire_batch_replace_always(s.second, scache, cfg.drop_cached_data_on_commit);
segment->acquire_batch_replace_always(s.second, scache);
break;
case REPLACE_HIGHER_USN:
segment->acquire_batch_replace_higher_usn(s.second, scache, cfg.drop_cached_data_on_commit);
segment->acquire_batch_replace_higher_usn(s.second, scache);
break;
default: throw std::runtime_error("programming error: unsupported replace value " + std::to_string(replace));
}
Expand All @@ -348,7 +356,11 @@ void Writer::acquire_batch(WriterBatch& batch, const AcquireConfig& cfg)

void Writer::remove(Metadata& md)
{
auto segment = file(md);
segment::WriterConfig writer_config;
writer_config.drop_cached_data_on_commit = false;
writer_config.eatmydata = dataset().eatmydata;

auto segment = file(writer_config, md);

const types::source::Blob* source = md.has_source_blob();
if (!source)
Expand Down
4 changes: 2 additions & 2 deletions arki/dataset/iseg/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ class Writer : public DatasetAccess<iseg::Dataset, segmented::Writer>
index::SummaryCache scache;

/// Return an inserter for the given Metadata
std::unique_ptr<AppendSegment> file(const Metadata& md);
std::unique_ptr<AppendSegment> file(const segment::WriterConfig& writer_config, const Metadata& md);

/// Return an inserter for the given relative pathname
std::unique_ptr<AppendSegment> file(const std::string& relpath);
std::unique_ptr<AppendSegment> file(const segment::WriterConfig& writer_config, const std::string& relpath);

public:
Writer(std::shared_ptr<iseg::Dataset> config);
Expand Down
9 changes: 5 additions & 4 deletions arki/dataset/ondisk2/checker-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ add_method("reindex_with_duplicates", [](Fixture& f) {
sys::makedirs("testds/2007/07");
// TODO: use segments also in the other tests, and instantiate a new test suite for different segment types
{
auto s = f.session()->segment_writer("grib", f.local_config()->path, "2007/07.grib");
s->append(data.mds[1], false);
s->append(data.mds[1], false);
s->append(data.mds[0], false);
segment::WriterConfig writer_config;
auto s = f.session()->segment_writer(writer_config, "grib", f.local_config()->path, "2007/07.grib");
s->append(data.mds[1]);
s->append(data.mds[1]);
s->append(data.mds[0]);
s->commit();
}

Expand Down
Loading

0 comments on commit 0bfb801

Please sign in to comment.