Skip to content

Commit

Permalink
Simplify iterator end de-duplicate tuples.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Jun 27, 2024
1 parent 4f6e0a6 commit 9603e6b
Showing 1 changed file with 35 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@

public class KvinPartitioned implements Kvin {
static final Logger log = LoggerFactory.getLogger(KvinPartitioned.class);

final ReentrantReadWriteLock storeLock = new ReentrantReadWriteLock();
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
final ReentrantReadWriteLock storeLock = new ReentrantReadWriteLock();
protected List<KvinListener> listeners = new ArrayList<>();
protected File path;
protected Duration archiveInterval;
Expand Down Expand Up @@ -180,16 +179,15 @@ protected IExtendedIterator<KvinTuple> fetchInternal(URI item, URI property, URI
return 0;
}));
long propertyValueCount;
URI currentProperty;
KvinTuple nextTuple;
KvinTuple prevTuple, nextTuple;
boolean closed;

@Override
public boolean hasNext() {
if (nextTuple != null) {
return true;
}
if (currentProperty == null) {
if (prevTuple == null) {
// this is the case if the iterator is not yet initialized
List<Kvin> stores = new ArrayList<>();
stores.add(hotStore);
Expand All @@ -207,37 +205,42 @@ public boolean hasNext() {
}
}
}
// skip properties if limit is reached
if (limit != 0 && propertyValueCount >= limit) {
while (!nextTuples.isEmpty()) {
var next = nextTuples.poll();
nextTuple = next.getFirst();
if (next.getSecond().hasNext()) {
nextTuples.add(new Pair<>(next.getSecond().next(), next.getSecond()));
} else {
next.getSecond().close();
}
if (!nextTuple.property.equals(currentProperty)) {
propertyValueCount = 1;
currentProperty = nextTuple.property;
break;
}
}
}
if (nextTuple == null && !nextTuples.isEmpty()) {
var next = nextTuples.poll();
nextTuple = next.getFirst();
if (currentProperty == null || !nextTuple.property.equals(currentProperty)) {

while (nextTuple == null && !nextTuples.isEmpty()) {
var min = nextTuples.poll();
var candidate = min.getFirst();

boolean isDuplicate = false;
if (prevTuple == null ||
!candidate.property.equals(prevTuple.property) ||
!candidate.item.equals(prevTuple.item)) {
propertyValueCount = 1;
currentProperty = nextTuple.property;
} else {
// omit duplicates in terms of id, time, and seqNr
isDuplicate = prevTuple != null
&& prevTuple.time == candidate.time
&& prevTuple.seqNr == candidate.seqNr;
}
if (next.getSecond().hasNext()) {
nextTuples.add(new Pair<>(next.getSecond().next(), next.getSecond()));

if (min.getSecond().hasNext()) {
nextTuples.add(new Pair<>(min.getSecond().next(), min.getSecond()));
} else {
next.getSecond().close();
min.getSecond().close();
}

// omit duplicates
if (isDuplicate) {
continue;
}
// skip properties if limit is reached
if (limit != 0 && propertyValueCount >= limit) {
continue;
}

nextTuple = candidate;
propertyValueCount++;
}

if (nextTuple != null) {
return true;
} else {
Expand All @@ -250,6 +253,7 @@ public boolean hasNext() {
public KvinTuple next() {
if (hasNext()) {
KvinTuple result = nextTuple;
prevTuple = result;
nextTuple = null;
return result;
}
Expand All @@ -258,7 +262,7 @@ public KvinTuple next() {

@Override
public void close() {
if (! closed) {
if (!closed) {
try {
while (!nextTuples.isEmpty()) {
try {
Expand Down

0 comments on commit 9603e6b

Please sign in to comment.