Skip to content

Commit

Permalink
Bug 36021033 - [35865034->23.09.2] Topics: messages getting lost befo…
Browse files Browse the repository at this point in the history
…re they are able to be consumed

(merge ce/main -> ce/23.09 104700)

[git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v23.09/": change = 104702]
  • Loading branch information
thegridman committed Nov 17, 2023
1 parent c29a24c commit 8951e37
Show file tree
Hide file tree
Showing 11 changed files with 595 additions and 347 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2022, Oracle and/or its affiliates.
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
Expand Down Expand Up @@ -70,6 +70,24 @@ public BatchingOperationsQueue(Consumer<Integer> functionBatch, int cbInitialBat
Runnable::run);
}

/**
* Create a new {@link BatchingOperationsQueue} that will call the specified
* {@link Consumer} function to process a batch of operations.
* <p>
* This constructor takes a {@link Consumer} to use to complete futures. This allows us
* to, for example, optionally use a daemon pool to complete futures that may otherwise
* complete on a service thread. If the {code completer} parameter is {@code null} futures
* will complete on the calling thread.
*
* @param functionBatch the {@link Consumer} to call to process batches of operations
* @param cbInitialBatch the size of the initial batch of operations
* @param backlog the governing FlowControl object
*/
public BatchingOperationsQueue(Consumer<Integer> functionBatch, int cbInitialBatch, DebouncedFlowControl backlog)
{
this(functionBatch, cbInitialBatch, backlog, v -> 1, Runnable::run);
}

/**
* Create a new {@link BatchingOperationsQueue} that will call the specified
* {@link Consumer} function to process a batch of operations.
Expand Down Expand Up @@ -220,8 +238,7 @@ public LinkedList<V> getCurrentBatchValues()
*/
public boolean isBatchComplete()
{
purgeCurrentBatch();
return getCurrentBatchValues().isEmpty();
return purgeCurrentBatch();
}

/**
Expand Down Expand Up @@ -495,11 +512,28 @@ protected void triggerOperations(int cBatchSize)
*
* @param oValue the value to use to complete the elements
* @param onComplete an optional {@link Consumer} to call when requests are completed
*
* @return {@code true} if the element was completed
*/
@SuppressWarnings("unchecked")
public void completeElement(Object oValue, BiFunction<Throwable, V, Throwable> function, Consumer<R> onComplete)
public boolean completeElement(Object oValue, Consumer<R> onComplete)
{
completeElements(1, NullImplementation.getLongArray(), LongArray.singleton((R) oValue), function, onComplete);
Queue<Element> queueCurrent = getCurrentBatch();
boolean fCompleted = false;

// remove the element from the current batch
Element element = queueCurrent.poll();
if (element != null)
{
V value = element.getValue();
m_cbCurrentBatch -= value != null ? f_backlogCalculator.applyAsLong(value) : 0;
// If the element is not yet complete then...
if (!element.isDone())
{
fCompleted = element.completeSynchronous((R) oValue, onComplete);
}
}
return fCompleted;
}

/**
Expand Down Expand Up @@ -855,6 +889,34 @@ public void complete(R result, Consumer<R> onComplete)
}
}

/**
* Complete this element's {@link CompletableFuture} synchronously.
*
* @param result the value to use to complete the future
* @param onComplete an optional {@link Consumer} to call when the future is actually completed
*/
public boolean completeSynchronous(R result, Consumer<R> onComplete)
{
boolean fCompleted = false;
if (!m_fDone)
{
m_fDone = true;
fCompleted = f_future.complete(result);
if (fCompleted && onComplete != null)
{
try
{
onComplete.accept(result);
}
catch (Throwable t)
{
Logger.err(t);
}
}
}
return fCompleted;
}

/**
* Complete exceptionally this element's {@link CompletableFuture}.
*
Expand Down Expand Up @@ -1009,7 +1071,8 @@ default <R> void complete(CompletableFuture<R> future, R oValue, Consumer<R> onC
{
execute(() ->
{
if (onComplete != null)
boolean fCompleted = future.complete(oValue);
if (fCompleted && onComplete != null)
{
try
{
Expand All @@ -1020,7 +1083,6 @@ default <R> void complete(CompletableFuture<R> future, R oValue, Consumer<R> onC
Logger.err(t);
}
}
future.complete(oValue);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,11 @@ public void notifyAll(int[] anNotify)
int nPart = getPartition();
for (int nNotify : anNotify)
{
ctxNotify.getBackingMapEntry(toBinaryKey(new NotificationKey(nPart, nNotify))).remove(false);
InvocableMap.Entry entry = ctxNotify.getBackingMapEntry(toBinaryKey(new NotificationKey(nPart, nNotify)));
if (entry.isPresent())
{
entry.remove(false);
}
}
}
}
Expand Down Expand Up @@ -1502,7 +1506,21 @@ else if (lPage < lPageThis) // read from fully consumed page
{
Binary binPosKey = ContentKey.toBinary(f_nPartition, nChannel, lPage, nPos);
BinaryEntry entryElement = (BinaryEntry) ctxElements.getReadOnlyEntry(binPosKey);
Binary binValue = entryElement == null ? null : entryElement.getBinaryValue();
Binary binValue = entryElement.getBinaryValue();

if (binValue == null)
{
// For some reason the content cache entry was null.
// This could mean it really is not there (i.e. deleted or expired or something)
// or there is a race where the offer processor is still finishing and has committed
// the update to the Page but not yet committed the contents.
// So we will try to enlist the Content and get the value again
entryElement = ctxElements.getBackingMapEntry(binPosKey).asBinaryEntry();
if (entryElement.isPresent())
{
binValue = entryElement.getBinaryValue();
}
}

if (binValue != null && (filter == null || InvocableMapHelper.evaluateEntry(filter, entryElement)))
{
Expand Down Expand Up @@ -1557,7 +1575,18 @@ else if (lPage < lPageThis) // read from fully consumed page
if (nPos > nPosTail)
{
// that position is currently empty; register for notification when it is set
requestInsertionNotification(enlistPage(nChannel, lPage), nNotifierId, nChannel);
Page pageEnlisted = enlistPage(nChannel, lPage);
// The Page "may" have been update by an offer on another thread while we have been processing this method,
// so there may now be an entry in the page we have not read.
// Now we have enlisted the page it is not going to change, so we can tell by checking the tail.
int nPosTailLatest = pageEnlisted.getTail();
if (nPosTail == nPosTailLatest)
{
// the tail has not changed, so we need to add a notification
requestInsertionNotification(pageEnlisted, nNotifierId, nChannel);
}
// make sure the tail is set to the "latest" tail from the enlisted page
nPosTail = nPosTailLatest;
}
result = new PollProcessor.Result(nPosTail - nPos + 1, nPos, listValues, subscription.getSubscriptionHead());
}
Expand Down Expand Up @@ -2056,11 +2085,12 @@ else if (lPage < lPageSeek)
}

// if the commit position is after the new position roll it back too
PagedPosition committed = subscription.getCommittedPosition();
PagedPosition committed = subscription.getCommittedPosition();
long lPageCommitted = committed.getPage();
int nOffsetCommitted = committed.getOffset();
long lPageSub = subscription.getPage();
int nOffsetSub = subscription.getPosition();

if (lPageCommitted > lPageSub || (lPageCommitted == lPageSub && nOffsetCommitted > nOffsetSub))
{
PagedPosition posRollback = new PagedPosition(lPageSub, nOffsetSub);
Expand Down
Loading

0 comments on commit 8951e37

Please sign in to comment.