Skip to content

Commit

Permalink
Fix initialization of DataStorm samples after session recovery - zero…
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone committed Dec 19, 2024
1 parent 50fbf7e commit 0e31c7a
Showing 1 changed file with 28 additions and 19 deletions.
47 changes: 28 additions & 19 deletions cpp/src/DataStorm/SessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1134,28 +1134,37 @@ SessionI::subscriberInitialized(
out << _id << ": initialized '" << element << "' from 'e" << elementId << '@' << topicId << "'";
}
elementSubscriber->initialized = true;
elementSubscriber->lastId = samples.empty() ? 0 : samples.back().id;

vector<shared_ptr<Sample>> samplesI;
samplesI.reserve(samples.size());
auto sampleFactory = element->getTopic()->getSampleFactory();
auto keyFactory = element->getTopic()->getKeyFactory();
for (const auto& sample : samples)
if (samples.empty())
{
assert((!key && !sample.keyValue.empty()) || key == subscriber.keys[sample.keyId].first);

samplesI.push_back(sampleFactory->create(
_id,
elementSubscribers->name,
sample.id,
sample.event,
key ? key : keyFactory->decode(_instance->getCommunicator(), sample.keyValue),
subscriber.tags[sample.tag],
sample.value,
sample.timestamp));
assert(samplesI.back()->key);
return {};
}
else
{
assert(samples.back().id > elementSubscriber->lastId);
elementSubscriber->lastId = samples.back().id;

vector<shared_ptr<Sample>> samplesI;
samplesI.reserve(samples.size());
auto sampleFactory = element->getTopic()->getSampleFactory();
auto keyFactory = element->getTopic()->getKeyFactory();
for (const auto& sample : samples)
{
assert((!key && !sample.keyValue.empty()) || key == subscriber.keys[sample.keyId].first);

samplesI.push_back(sampleFactory->create(
_id,
elementSubscribers->name,
sample.id,
sample.event,
key ? key : keyFactory->decode(_instance->getCommunicator(), sample.keyValue),
subscriber.tags[sample.tag],
sample.value,
sample.timestamp));
assert(samplesI.back()->key);
}
return samplesI;
}
return samplesI;
}

void
Expand Down

0 comments on commit 0e31c7a

Please sign in to comment.