From 8951e3701aa44127dd5b05b0c7e7900319db145d Mon Sep 17 00:00:00 2001 From: Jonathan Knight Date: Fri, 17 Nov 2023 11:22:12 +0000 Subject: [PATCH] Bug 36021033 - [35865034->23.09.2] Topics: messages getting lost before they are able to be consumed (merge ce/main -> ce/23.09 104700) [git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v23.09/": change = 104702] --- .../impl/paged/BatchingOperationsQueue.java | 76 ++++- .../topic/impl/paged/PagedTopicPartition.java | 38 ++- .../impl/paged/PagedTopicSubscriber.java | 298 +++++++++++------- .../java/topics/AbstractNamedTopicTests.java | 239 ++------------ ...DefaultConfigJavaSerializerTopicTests.java | 6 +- .../DefaultConfigPofSerializerTopicTests.java | 6 +- .../java/topics/LocalNamedTopicTests.java | 225 ++++++++++++- .../src/main/java/topics/NamedTopicTests.java | 4 +- .../topics/bug_35945522/Bug35945522Tests.java | 17 +- .../topics/bug_35945522/PublisherMain.java | 22 +- .../topics/bug_35945522/SubscriberMain.java | 11 +- 11 files changed, 595 insertions(+), 347 deletions(-) diff --git a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/BatchingOperationsQueue.java b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/BatchingOperationsQueue.java index f76c910671709..0a608e13de6f7 100644 --- a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/BatchingOperationsQueue.java +++ b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/BatchingOperationsQueue.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2022, Oracle and/or its affiliates. + * Copyright (c) 2000, 2023, Oracle and/or its affiliates. * * Licensed under the Universal Permissive License v 1.0 as shown at * https://oss.oracle.com/licenses/upl. @@ -70,6 +70,24 @@ public BatchingOperationsQueue(Consumer functionBatch, int cbInitialBat Runnable::run); } + /** + * Create a new {@link BatchingOperationsQueue} that will call the specified + * {@link Consumer} function to process a batch of operations. + *

+ * This constructor takes a {@link Consumer} to use to complete futures. This allows us + * to, for example, optionally use a daemon pool to complete futures that may otherwise + * complete on a service thread. If the {code completer} parameter is {@code null} futures + * will complete on the calling thread. + * + * @param functionBatch the {@link Consumer} to call to process batches of operations + * @param cbInitialBatch the size of the initial batch of operations + * @param backlog the governing FlowControl object + */ + public BatchingOperationsQueue(Consumer functionBatch, int cbInitialBatch, DebouncedFlowControl backlog) + { + this(functionBatch, cbInitialBatch, backlog, v -> 1, Runnable::run); + } + /** * Create a new {@link BatchingOperationsQueue} that will call the specified * {@link Consumer} function to process a batch of operations. @@ -220,8 +238,7 @@ public LinkedList getCurrentBatchValues() */ public boolean isBatchComplete() { - purgeCurrentBatch(); - return getCurrentBatchValues().isEmpty(); + return purgeCurrentBatch(); } /** @@ -495,11 +512,28 @@ protected void triggerOperations(int cBatchSize) * * @param oValue the value to use to complete the elements * @param onComplete an optional {@link Consumer} to call when requests are completed + * + * @return {@code true} if the element was completed */ @SuppressWarnings("unchecked") - public void completeElement(Object oValue, BiFunction function, Consumer onComplete) + public boolean completeElement(Object oValue, Consumer onComplete) { - completeElements(1, NullImplementation.getLongArray(), LongArray.singleton((R) oValue), function, onComplete); + Queue queueCurrent = getCurrentBatch(); + boolean fCompleted = false; + + // remove the element from the current batch + Element element = queueCurrent.poll(); + if (element != null) + { + V value = element.getValue(); + m_cbCurrentBatch -= value != null ? f_backlogCalculator.applyAsLong(value) : 0; + // If the element is not yet complete then... + if (!element.isDone()) + { + fCompleted = element.completeSynchronous((R) oValue, onComplete); + } + } + return fCompleted; } /** @@ -855,6 +889,34 @@ public void complete(R result, Consumer onComplete) } } + /** + * Complete this element's {@link CompletableFuture} synchronously. + * + * @param result the value to use to complete the future + * @param onComplete an optional {@link Consumer} to call when the future is actually completed + */ + public boolean completeSynchronous(R result, Consumer onComplete) + { + boolean fCompleted = false; + if (!m_fDone) + { + m_fDone = true; + fCompleted = f_future.complete(result); + if (fCompleted && onComplete != null) + { + try + { + onComplete.accept(result); + } + catch (Throwable t) + { + Logger.err(t); + } + } + } + return fCompleted; + } + /** * Complete exceptionally this element's {@link CompletableFuture}. * @@ -1009,7 +1071,8 @@ default void complete(CompletableFuture future, R oValue, Consumer onC { execute(() -> { - if (onComplete != null) + boolean fCompleted = future.complete(oValue); + if (fCompleted && onComplete != null) { try { @@ -1020,7 +1083,6 @@ default void complete(CompletableFuture future, R oValue, Consumer onC Logger.err(t); } } - future.complete(oValue); }); } diff --git a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicPartition.java b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicPartition.java index d41453975dbbe..4c85747fae967 100644 --- a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicPartition.java +++ b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicPartition.java @@ -790,7 +790,11 @@ public void notifyAll(int[] anNotify) int nPart = getPartition(); for (int nNotify : anNotify) { - ctxNotify.getBackingMapEntry(toBinaryKey(new NotificationKey(nPart, nNotify))).remove(false); + InvocableMap.Entry entry = ctxNotify.getBackingMapEntry(toBinaryKey(new NotificationKey(nPart, nNotify))); + if (entry.isPresent()) + { + entry.remove(false); + } } } } @@ -1502,7 +1506,21 @@ else if (lPage < lPageThis) // read from fully consumed page { Binary binPosKey = ContentKey.toBinary(f_nPartition, nChannel, lPage, nPos); BinaryEntry entryElement = (BinaryEntry) ctxElements.getReadOnlyEntry(binPosKey); - Binary binValue = entryElement == null ? null : entryElement.getBinaryValue(); + Binary binValue = entryElement.getBinaryValue(); + + if (binValue == null) + { + // For some reason the content cache entry was null. + // This could mean it really is not there (i.e. deleted or expired or something) + // or there is a race where the offer processor is still finishing and has committed + // the update to the Page but not yet committed the contents. + // So we will try to enlist the Content and get the value again + entryElement = ctxElements.getBackingMapEntry(binPosKey).asBinaryEntry(); + if (entryElement.isPresent()) + { + binValue = entryElement.getBinaryValue(); + } + } if (binValue != null && (filter == null || InvocableMapHelper.evaluateEntry(filter, entryElement))) { @@ -1557,7 +1575,18 @@ else if (lPage < lPageThis) // read from fully consumed page if (nPos > nPosTail) { // that position is currently empty; register for notification when it is set - requestInsertionNotification(enlistPage(nChannel, lPage), nNotifierId, nChannel); + Page pageEnlisted = enlistPage(nChannel, lPage); + // The Page "may" have been update by an offer on another thread while we have been processing this method, + // so there may now be an entry in the page we have not read. + // Now we have enlisted the page it is not going to change, so we can tell by checking the tail. + int nPosTailLatest = pageEnlisted.getTail(); + if (nPosTail == nPosTailLatest) + { + // the tail has not changed, so we need to add a notification + requestInsertionNotification(pageEnlisted, nNotifierId, nChannel); + } + // make sure the tail is set to the "latest" tail from the enlisted page + nPosTail = nPosTailLatest; } result = new PollProcessor.Result(nPosTail - nPos + 1, nPos, listValues, subscription.getSubscriptionHead()); } @@ -2056,11 +2085,12 @@ else if (lPage < lPageSeek) } // if the commit position is after the new position roll it back too - PagedPosition committed = subscription.getCommittedPosition(); + PagedPosition committed = subscription.getCommittedPosition(); long lPageCommitted = committed.getPage(); int nOffsetCommitted = committed.getOffset(); long lPageSub = subscription.getPage(); int nOffsetSub = subscription.getPosition(); + if (lPageCommitted > lPageSub || (lPageCommitted == lPageSub && nOffsetCommitted > nOffsetSub)) { PagedPosition posRollback = new PagedPosition(lPageSub, nOffsetSub); diff --git a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber.java b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber.java index cbd90d86269ea..4650d3c44ccbc 100644 --- a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber.java +++ b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriber.java @@ -123,6 +123,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -1135,11 +1137,6 @@ private void receiveInternal(BatchingOperationsQueue queueRequest, I if (!queueRequest.isBatchComplete() || queueRequest.fillCurrentBatch(cBatch)) { - if (queueRequest.isBatchComplete()) - { - return; - } - heartbeat(); complete(queueRequest); @@ -1148,7 +1145,7 @@ private void receiveInternal(BatchingOperationsQueue queueRequest, I { // we have emptied the pre-fetch queue but the batch has more in it, so fetch more PagedTopicChannel channel = m_aChannel[nChannel]; - long lVersion = channel.m_lVersion.get(); + long lVersion = channel.getVersion(); long lHead = channel.m_lHead == PagedTopicChannel.HEAD_UNKNOWN ? getSubscriptionHead(channel) : channel.m_lHead; @@ -1249,12 +1246,22 @@ private long getSubscriptionHead(PagedTopicChannel channel) * * @param queueRequest the queue of requests to complete */ - @SuppressWarnings("unchecked") - private void complete(BatchingOperationsQueue queueRequest) + protected void complete(BatchingOperationsQueue queueRequest) { LinkedList queueBatch = queueRequest.getCurrentBatchValues(); + complete(queueRequest, queueBatch); + } - Queue queueValuesPrefetched = m_queueValuesPrefetched; + /** + * Complete as many outstanding requests as possible from the contents of the pre-fetch queue. + * + * @param queueRequest the queue of requests to complete + */ + @SuppressWarnings("unchecked") + protected void complete(BatchingOperationsQueue queueRequest, + LinkedList queueBatch) + { + ConcurrentLinkedDeque queuePrefetched = m_queueValuesPrefetched; Request firstRequest = queueBatch.peek(); while (firstRequest instanceof FunctionalRequest) @@ -1266,7 +1273,7 @@ private void complete(BatchingOperationsQueue queueRequest) int cValues = 0; int cRequest = queueBatch.size(); - if (isActive() && !queueValuesPrefetched.isEmpty()) + if (isActive() && !queuePrefetched.isEmpty()) { Gate gate = f_gate; // Wait to enter the gate as we need to check channel ownership under a lock @@ -1274,12 +1281,12 @@ private void complete(BatchingOperationsQueue queueRequest) try { LongArray aValues = new SparseArray<>(); - CommittableElement element = queueValuesPrefetched.peek(); + CommittableElement element = queuePrefetched.peek(); if (element != null && element.isEmpty()) { // we're empty, remove the empty/null element from the pre-fetch queue - queueValuesPrefetched.poll(); + queuePrefetched.poll(); while (cValues < cRequest) { Request request = queueBatch.get(cValues); @@ -1288,19 +1295,23 @@ private void complete(BatchingOperationsQueue queueRequest) ReceiveRequest receiveRequest = (ReceiveRequest) request; if (!receiveRequest.isBatch()) { + // this is a single request, i.e subscriber.receive(); aValues.set(cValues, null); } else { + // this is a batch request, i.e subscriber.receive(100); aValues.set(cValues, Collections.emptyList()); } cValues++; } } + // complete all the requests as "empty" + queueRequest.completeElements(cValues, aValues, this::onReceiveError, this::onReceiveComplete); } else { - while (m_nState == STATE_CONNECTED && cValues < cRequest && !queueValuesPrefetched.isEmpty()) + while (m_nState == STATE_CONNECTED && cValues < cRequest && !queuePrefetched.isEmpty()) { Request request = queueBatch.get(cValues); if (request instanceof ReceiveRequest) @@ -1308,32 +1319,51 @@ private void complete(BatchingOperationsQueue queueRequest) ReceiveRequest receiveRequest = (ReceiveRequest) request; if (receiveRequest.isBatch()) { + // this is a batch request, i.e subscriber.receive(100); int cElement = receiveRequest.getElementCount(); - List list = new ArrayList<>(); - for (int i = 0; i < cElement && !queueValuesPrefetched.isEmpty(); i++) + LinkedList list = new LinkedList<>(); + for (int i = 0; i < cElement && !queuePrefetched.isEmpty(); i++) { - element = queueValuesPrefetched.poll(); + element = queuePrefetched.poll(); // ensure we still own the channel if (element != null && !element.isEmpty() && isOwner(element.getChannel())) { list.add(element); } } - aValues.set(cValues++, list); + cValues++; + boolean fCompleted = queueRequest.completeElement(list, this::onReceiveComplete); + if (!fCompleted) + { + // failed to complete the future, it could have been cancelled + // push the all the elements back on the queue + CommittableElement e; + while ((e = list.pollLast()) != null) + { + queuePrefetched.offerFirst(e); + } + } } else { - element = queueValuesPrefetched.poll(); + // this is a single request, i.e subscriber.receive(); + element = queuePrefetched.poll(); // ensure we still own the channel if (element != null && !element.isEmpty() && isOwner(element.getChannel())) { - aValues.set(cValues++, element); + cValues++; + boolean fCompleted = queueRequest.completeElement(element, this::onReceiveComplete); + if (!fCompleted) + { + // failed to complete the future, it could have been cancelled + // push the element back on the queue + queuePrefetched.offerFirst(element); + } } } } } } - queueRequest.completeElements(cValues, aValues, this::onReceiveError, this::onReceiveComplete); } finally { @@ -1343,6 +1373,16 @@ private void complete(BatchingOperationsQueue queueRequest) } } + /** + * Return the size of the prefetch queue. + * + * @return the size of the prefetch queue + */ + public int getReceiveQueueSize() + { + return f_queueReceiveOrders.size(); + } + private Throwable onReceiveError(Throwable err, Object o) { m_cReceived.mark(); @@ -2000,16 +2040,6 @@ public boolean isConnected() return m_nState == STATE_CONNECTED; } - /** - * Returns {@code true} if this subscriber is initialising. - * - * @return {@code true} if this subscriber is initialising - */ - public boolean isInitialising() - { - return m_nState == STATE_INITIAL; - } - /** * Disconnect this subscriber. *

@@ -2548,100 +2578,108 @@ protected void onReceiveResult(PagedTopicChannel channel, long lVersion, long lP { int nChannel = channel.subscriberPartitionSync.getChannelId(); - // check that there is no error, and we still own the channel - if (e == null ) + f_receiveLock.lock(); + try { - Queue queueValues = result.getElements(); - int cReceived = queueValues.size(); - int cRemaining = result.getRemainingElementCount(); - int nNext = result.getNextIndex(); - - channel.setPolled(); - ++m_cPolls; - - if (cReceived == 0) + // check that there is no error, and we still own the channel + if (e == null ) { - ++m_cMisses; - } - else if (!queueValues.isEmpty()) - { - channel.setHit(); - m_cValues += cReceived; - channel.adjustPolls(cReceived); + Queue queueValues = result.getElements(); + int cReceived = queueValues.size(); + int cRemaining = result.getRemainingElementCount(); + int nNext = result.getNextIndex(); - // add the received elements to the pre-fetch queue - queueValues.stream() - .map(bin -> new CommittableElement(bin, nChannel)) - .forEach(m_queueValuesPrefetched::add); + channel.setPolled(); + ++m_cPolls; - if (!m_queueValuesPrefetched.isEmpty()) + if (cReceived == 0) { - long nTime = System.currentTimeMillis(); - channel.setFirstPolled((PagedPosition) m_queueValuesPrefetched.getFirst().getPosition(), nTime); - channel.setLastPolled((PagedPosition) m_queueValuesPrefetched.getLast().getPosition(), nTime); + ++m_cMisses; } - } + else if (!queueValues.isEmpty()) + { + channel.setHit(); + m_cValues += cReceived; + channel.adjustPolls(cReceived); - channel.m_nNext = nNext; + // add the received elements to the pre-fetch queue + queueValues.stream() + .map(bin -> new CommittableElement(bin, nChannel)) + .forEach(m_queueValuesPrefetched::add); - if (cRemaining == PollProcessor.Result.EXHAUSTED) - { - // we know the page is exhausted, so the new head is at least one higher - if (lPageId >= channel.m_lHead && lPageId != Page.NULL_PAGE) - { - channel.m_lHead = lPageId + 1; - channel.m_nNext = 0; + if (!m_queueValuesPrefetched.isEmpty()) + { + long nTime = System.currentTimeMillis(); + channel.setFirstPolled((PagedPosition) m_queueValuesPrefetched.getFirst().getPosition(), nTime); + channel.setLastPolled((PagedPosition) m_queueValuesPrefetched.getLast().getPosition(), nTime); + } } - // we're actually on the EMPTY_PAGE, so we'll concurrently increment the durable - // head pointer and then update our pointer accordingly - if (lPageId == Page.NULL_PAGE) - { - scheduleHeadIncrement(channel, lPageId); - } + channel.m_nNext = nNext; - // switch to a new channel since we've exhausted this page - switchChannel(); - } - else if (cRemaining == 0 || cRemaining == PollProcessor.Result.NOT_ALLOCATED_CHANNEL) - { - // we received nothing or polled a channel we do not own - if (cRemaining == 0) + if (cRemaining == PollProcessor.Result.EXHAUSTED) { - // we received nothing, mark the channel as empty - onChannelEmpty(nChannel, lVersion); - } + // we know the page is exhausted, so the new head is at least one higher + if (lPageId >= channel.m_lHead && lPageId != Page.NULL_PAGE) + { + channel.m_lHead = lPageId + 1; + channel.m_nNext = 0; + } + + // we're actually on the EMPTY_PAGE, so we'll concurrently increment the durable + // head pointer and then update our pointer accordingly + if (lPageId == Page.NULL_PAGE) + { + scheduleHeadIncrement(channel, lPageId); + } - // attempt to switch to a non-empty channel - if (!switchChannel()) + // switch to a new channel since we've exhausted this page + switchChannel(); + } + else if (cRemaining == 0 || cRemaining == PollProcessor.Result.NOT_ALLOCATED_CHANNEL) { - // we've run out of channels to poll from - if (f_fCompleteOnEmpty) + // we received nothing or polled a channel we do not own + if (cRemaining == 0) { - // add an empty element, which signals to the completion method that we're done - m_queueValuesPrefetched.add(getEmptyElement()); + // we received nothing, mark the channel as empty + onChannelEmpty(nChannel, lVersion); } - else + + // attempt to switch to a non-empty channel + if (!switchChannel()) { - // wait for non-empty; - // Note: automatically registered for notification as part of returning an empty result set - ++m_cWait; + // we've run out of channels to poll from + if (f_fCompleteOnEmpty) + { + // add an empty element, which signals to the completion method that we're done + m_queueValuesPrefetched.add(getEmptyElement()); + } + else + { + // wait for non-empty; + // Note: automatically registered for notification as part of returning an empty result set + ++m_cWait; + } } } + else if (cRemaining == PollProcessor.Result.UNKNOWN_SUBSCRIBER) + { + // The subscriber was unknown, possibly due to a persistence snapshot recovery or the topic being + // destroyed whilst the poll was in progress. + // Disconnect and let reconnection sort us out + disconnectInternal(true); + } } - else if (cRemaining == PollProcessor.Result.UNKNOWN_SUBSCRIBER) + else // remove failed; this is fairly catastrophic { - // The subscriber was unknown, possibly due to a persistence snapshot recovery or the topic being - // destroyed whilst the poll was in progress. - // Disconnect and let reconnection sort us out - disconnectInternal(true); + // TODO: figure out error handling + // fail all currently (and even concurrently) scheduled removes + f_queueReceiveOrders.handleError((err, bin) -> e, BatchingOperationsQueue.OnErrorAction.CompleteWithException); } } - else // remove failed; this is fairly catastrophic + finally { - // TODO: figure out error handling - // fail all currently (and even concurrently) scheduled removes - f_queueReceiveOrders.handleError((err, bin) -> e, BatchingOperationsQueue.OnErrorAction.CompleteWithException); + f_receiveLock.unlock(); } } @@ -3209,7 +3247,7 @@ public interface WithNotificationId * CommittableElement is a wrapper around a {@link PageElement} * that makes it committable. */ - private class CommittableElement + protected class CommittableElement implements Element { // ----- constructors ----------------------------------------------- @@ -3219,7 +3257,7 @@ private class CommittableElement * * @param binValue the binary element value */ - CommittableElement(Binary binValue, int nChannel) + protected CommittableElement(Binary binValue, int nChannel) { m_element = PageElement.fromBinary(binValue, f_serializer); f_nChannel = nChannel; @@ -3513,9 +3551,30 @@ public double getReceivedFifteenMinuteRate() */ protected void setEmpty(long lVersion) { - if (m_lVersion.get() == lVersion) + m_lock.lock(); + try { - m_fEmpty = true; + if (m_lVersion.get() == lVersion) + { + m_fEmpty = true; + } + } + finally + { + m_lock.unlock(); + } + } + + protected long getVersion() + { + m_lock.lock(); + try + { + return m_lVersion.get(); + } + finally + { + m_lock.unlock(); } } @@ -3543,8 +3602,16 @@ protected void onChannelPopulatedNotification() */ protected void setPopulated() { - m_lVersion.incrementAndGet(); - m_fEmpty = false; + m_lock.lock(); + try + { + long l = m_lVersion.incrementAndGet(); + m_fEmpty = false; + } + finally + { + m_lock.unlock(); + } } /** @@ -3637,10 +3704,11 @@ public String toString() return "Channel=" + subscriberPartitionSync.getChannelId() + ", owned=" + m_fOwned + ", empty=" + m_fEmpty + + ", version=" + m_lVersion.get() + ", head=" + m_lHead + ", next=" + m_nNext + ", polls=" + m_cPolls + - ", received=" + m_cReceived + + ", received=" + m_cReceived.getCount() + ", committed=" + m_cCommited + ", first=" + m_firstPolled + ", firstTimestamp=" + m_firstPolledTimestamp + @@ -3764,6 +3832,11 @@ public String toString() * A flag indicating a message has been received from this channel since ownership was last assigned. */ boolean m_fHit; + + /** + * The lock to control state updates. + */ + private final Lock m_lock = new ReentrantLock(); } // ----- inner class: FlushMode ---------------------------------------- @@ -3955,7 +4028,7 @@ private SeekRequest(SeekType type, Map mapPosition, Instant i protected void execute(PagedTopicSubscriber subscriber, BatchingOperationsQueue queueBatch) { Map map = subscriber.seekInternal(this); - queueBatch.completeElement(map, this::onRequestError, this::onRequestComplete); + queueBatch.completeElement(map, this::onRequestComplete); } /** @@ -4082,7 +4155,7 @@ protected void execute(PagedTopicSubscriber subscriber, BatchingOperationsQue default: throw new IllegalStateException("Unexpected value: " + f_type); } - queue.completeElement(map, this::onRequestError, this::onRequestComplete); + queue.completeElement(map, this::onRequestComplete); } // ----- data members --------------------------------------------------- @@ -4648,6 +4721,11 @@ public interface StateListener */ private final Gate f_gateState; + /** + * The lock to control receive processing. + */ + private final Lock f_receiveLock = new ReentrantLock(); + /** * The state of the subscriber. */ @@ -4662,7 +4740,7 @@ public interface StateListener /** * Optional queue of prefetched values which can be used to fulfil future receive requests. */ - private final ConcurrentLinkedDeque m_queueValuesPrefetched = new ConcurrentLinkedDeque<>(); + protected final ConcurrentLinkedDeque m_queueValuesPrefetched = new ConcurrentLinkedDeque<>(); /** * Queue of pending receive awaiting values. diff --git a/prj/test/functional/topics/src/main/java/topics/AbstractNamedTopicTests.java b/prj/test/functional/topics/src/main/java/topics/AbstractNamedTopicTests.java index d32be7ee22ad1..49f40298dc44b 100644 --- a/prj/test/functional/topics/src/main/java/topics/AbstractNamedTopicTests.java +++ b/prj/test/functional/topics/src/main/java/topics/AbstractNamedTopicTests.java @@ -70,10 +70,10 @@ import com.tangosol.util.filter.LessFilter; import java.io.File; -import java.io.FileNotFoundException; +import java.io.FileWriter; import java.io.IOException; -import java.io.PrintStream; +import java.io.PrintWriter; import java.io.Serializable; import java.time.Instant; import java.util.ArrayList; @@ -188,14 +188,14 @@ public static void setProperties() @Before public void beforeEach() { - System.err.println(">>>>> Starting test: " + m_testName.getMethodName()); + System.err.println(">>>>> Starting test: " + m_testWatcher.getMethodName()); System.err.flush(); } @After public void cleanup() { - System.err.println(">>>>> Starting cleanup: " + m_testName.getMethodName()); + System.err.println(">>>>> Starting cleanup: " + m_testWatcher.getMethodName()); try { if (m_topic != null) @@ -219,7 +219,7 @@ else if (m_sTopicName != null) } finally { - System.err.println(">>>>> Finished test: " + m_testName.getMethodName()); + System.err.println(">>>>> Finished test: " + m_testWatcher.getMethodName()); System.err.flush(); } } @@ -2586,6 +2586,7 @@ public void shouldThrottlePublisher() throws Exception } @Test + @Ignore("This test is flawed as it needs a message in almost every partition to guarantee to pass but that can cause the test to take too long and fail anyway") public void shouldThrottlePublisherWhenFull() throws Exception { final long SERVER_CAPACITY = 10L * 1024L; @@ -2631,7 +2632,7 @@ public void shouldThrottlePublisherWhenFull() throws Exception for (int c = cValues.get(); c > 0; --c) { futureDrain = subscriber.receive() - .whenComplete((r, e) -> + .whenCompleteAsync((r, e) -> { cValues.decrementAndGet(); r.commit(); @@ -3111,198 +3112,6 @@ public void shouldSeekGroupSubscriberForwards() throws Exception } } - @Test - public void shouldSeekGroupSubscriberForwardsUsingTimestamp() throws Exception - { - NamedTopic topic = ensureTopic(m_sSerializer + "-small-rewindable"); - - Assume.assumeThat("Test only applies when paged-topic-scheme has retain-consumed configured", - getDependencies(topic).isRetainConsumed(), is(true)); - - // publish a lot of messages, so we have multiple pages spread over all the partitions - String sSuffix = "abcdefghijklmnopqrstuvwxyz"; - int nChannel = 1; - - try (Publisher publisher = topic.createPublisher(OrderBy.id(nChannel))) - { - for (int i = 0; i < 500; i++) - { - String sMsg = "element-" + i + sSuffix; - CompletableFuture future = publisher.publish(sMsg); - Status status = future.get(1, TimeUnit.MINUTES); - - assertThat(status.getChannel(), is(nChannel)); - // sleep to ensure that every message has a different millisecond timestamp - Thread.sleep(3L); - } - } - - // Create two subscribers in different groups. - // We will receive messages from one and then seek the other to the same place - String sGroupPrefix = ensureGroupName(); - try (PagedTopicSubscriber subscriberOne = (PagedTopicSubscriber) topic.createSubscriber(inGroup(sGroupPrefix + "-one"), Subscriber.CompleteOnEmpty.enabled()); - PagedTopicSubscriber subscriberTwo = (PagedTopicSubscriber) topic.createSubscriber(inGroup(sGroupPrefix + "-two"))) - { - // move subscriber two on by receiving pages - // (we'll then seek subscriber one to the same place) - CompletableFuture> future = null; - for (int i = 0; i < 250; i++) - { - future = subscriberTwo.receive(); - } - - // Obtain the last received element for subscriber two - Element element = future.get(2, TimeUnit.MINUTES); - PagedPosition pagedPosition = (PagedPosition) element.getPosition(); - int nOffset = pagedPosition.getOffset(); - - // ensure the position is not a head or tail of a page - PagedTopicCaches caches = new PagedTopicCaches(topic.getName(), (PagedTopicService) topic.getService()); - Page page = caches.Pages.get(new Page.Key(nChannel, pagedPosition.getPage())); - while (nOffset == 0 || nOffset == page.getTail()) - { - // we're at a head or tail so read another - future = subscriberTwo.receive(); - element = future.get(2, TimeUnit.MINUTES); - pagedPosition = (PagedPosition) element.getPosition(); - nOffset = pagedPosition.getOffset(); - page = caches.Pages.get(new Page.Key(nChannel, pagedPosition.getPage())); - } - - Instant seekTimestamp = element.getTimestamp(); - PagedPosition expectedSeekPosition = (PagedPosition) element.getPosition(); - - // Poll the next element from subscriber two - Element elementTwo = subscriberTwo.receive().get(2, TimeUnit.MINUTES); - assertThat(elementTwo, is(notNullValue())); - while (elementTwo.getTimestamp().equals(seekTimestamp)) - { - // the next element is the same timestamp, so we need to read more - expectedSeekPosition = (PagedPosition) elementTwo.getPosition(); - elementTwo = subscriberTwo.receive().get(2, TimeUnit.MINUTES); - assertThat(elementTwo, is(notNullValue())); - } - - // we know we're now not at a head or tail of a page - // Seek subscriber one to the next message with a timestamp higher than seekTimestamp - Position result = subscriberOne.seek(element.getChannel(), seekTimestamp); - // should have the correct seek result - assertThat(result, is(expectedSeekPosition)); - - // Poll the next element for each subscriber, they should match - Element elementOne = subscriberOne.receive().get(2, TimeUnit.MINUTES); - assertThat(elementOne, is(notNullValue())); - assertThat(elementOne.getPosition(), is(elementTwo.getPosition())); - assertThat(elementOne.getValue(), is(elementTwo.getValue())); - - // now move subscriber two some way ahead - for (int i = 0; i < 100; i++) - { - subscriberTwo.receive().get(2, TimeUnit.MINUTES); - } - - element = subscriberTwo.receive().get(2, TimeUnit.MINUTES); - pagedPosition = (PagedPosition) element.getPosition(); - nOffset = pagedPosition.getOffset(); - page = caches.Pages.get(new Page.Key(nChannel, pagedPosition.getPage())); - - // keep reading until subscriber two has read the tail of the page - while (nOffset != page.getTail()) - { - // we're not at the tail so read another - future = subscriberTwo.receive(); - element = future.get(2, TimeUnit.MINUTES); - pagedPosition = (PagedPosition) element.getPosition(); - nOffset = pagedPosition.getOffset(); - } - - // we're now at the tail of a page - // Seek subscriber one to the last timestamp read by subscription two - System.err.println(">>>> Seeking subscriber one to timestamp from element: " + element + " "); - seekTimestamp = element.getTimestamp(); - result = subscriberOne.seek(element.getChannel(), seekTimestamp); - // should have a seeked result - assertThat(result, is(notNullValue())); - System.err.println(">>>> Seeked subscriber one to timestamp from element: " + element + " result: " + result); - - // Poll the next element for each subscriber, they should match - elementTwo = subscriberTwo.receive().get(2, TimeUnit.MINUTES); - assertThat(elementTwo, is(notNullValue())); - // ensure that we have read a later timestamp than the seeked to position - while (!elementTwo.getTimestamp().isAfter(seekTimestamp)) - { - elementTwo = subscriberTwo.receive().get(2, TimeUnit.MINUTES); - assertThat(elementTwo, is(notNullValue())); - } - - elementOne = subscriberOne.receive().get(2, TimeUnit.MINUTES); - assertThat(elementOne, is(notNullValue())); - - System.err.println(">>>> ElementOne: " + elementOne); - System.err.println(">>>> ElementTwo: " + elementTwo); - - assertThat(elementOne.getPosition(), is(elementTwo.getPosition())); - assertThat(elementOne.getValue(), is(elementTwo.getValue())); - - // now move subscriber two some way ahead - for (int i = 0; i < 100; i++) - { - subscriberTwo.receive().get(2, TimeUnit.MINUTES); - } - - // Poll the next element from subscriber two - element = subscriberTwo.receive().get(2, TimeUnit.MINUTES); - assertThat(element, is(notNullValue())); - while (element.getTimestamp().equals(seekTimestamp)) - { - // the next element is the same timestamp, so we need to read more - element = subscriberTwo.receive().get(2, TimeUnit.MINUTES); - assertThat(element, is(notNullValue())); - } - - pagedPosition = (PagedPosition) element.getPosition(); - nOffset = pagedPosition.getOffset(); - expectedSeekPosition = pagedPosition; - - // keep reading until subscriber two has read the head of the page - while (nOffset != 0) - { - // we're not at the head so read another - future = subscriberTwo.receive(); - element = future.get(2, TimeUnit.MINUTES); - pagedPosition = (PagedPosition) element.getPosition(); - nOffset = pagedPosition.getOffset(); - expectedSeekPosition = pagedPosition; - } - - // we're now at the head of a page - // Seek subscriber one to the last timestamp read by subscription two - System.err.println(">>>> Seeking subscriber one to timestamp from element: " + element + " "); - seekTimestamp = element.getTimestamp(); - result = subscriberOne.seek(element.getChannel(), seekTimestamp); - System.err.println(">>>> Seeked subscriber one to timestamp from element: " + element + " result: " + result); - // should have correct seeked result - assertThat(result, is(expectedSeekPosition)); - - // Poll the next element for each subscriber, they should match - elementTwo = subscriberTwo.receive().get(2, TimeUnit.MINUTES); - assertThat(elementTwo, is(notNullValue())); - // ensure that we have read a later timestamp than the seeked to position - while (!elementTwo.getTimestamp().isAfter(seekTimestamp)) - { - elementTwo = subscriberTwo.receive().get(2, TimeUnit.MINUTES); - assertThat(elementTwo, is(notNullValue())); - } - - elementOne = subscriberOne.receive().get(2, TimeUnit.MINUTES); - assertThat(elementOne, is(notNullValue())); - System.err.println(">>>> ElementOne: " + elementOne); - System.err.println(">>>> ElementTwo: " + elementTwo); - assertThat(elementOne.getPosition(), is(elementTwo.getPosition())); - assertThat(elementOne.getValue(), is(elementTwo.getValue())); - } - } - @Test public void shouldSeekGroupSubscriberForwardsAfterReadingSome() throws Exception { @@ -4109,7 +3918,13 @@ public void shouldSeekToHead() throws Exception { for (int i = 0; i < 10000; i++) { - publisher.publish("element-" + i); + int n = i; + String sMsg = "element-" + i; + publisher.publish(sMsg) + .thenAcceptAsync(status -> + { + m_testWatcher.println(">>>> Published message " + n + " '" + sMsg + "' status=" + status); + }); } publisher.flush().get(2, TimeUnit.MINUTES); @@ -4119,6 +3934,8 @@ public void shouldSeekToHead() throws Exception assertThat(elementHead, is(notNullValue())); assertThat(elementHead.getValue(), is("element-0")); +System.setProperty("test.log.page", String.valueOf(((PagedPosition) elementHead.getPosition()).getPage())); + CompletableFuture> future = null; for (int i = 0; i < 5000; i ++) { @@ -4130,9 +3947,15 @@ public void shouldSeekToHead() throws Exception assertThat(nChannelSub, is(nChannel)); // seek to the head of the channel - subscriber.seekToHead(nChannelSub); + Map map = subscriber.getHeads(); + System.err.println(">>>> Current subscriber heads are " + map); + System.err.println(">>>> Seeking subscriber to head for channel " + nChannelSub); + map = subscriber.seekToHead(nChannelSub); + System.err.println(">>>> Seeked to head: " + map); + System.err.println(">>>> Calling receive on : " + subscriber); element = subscriber.receive().get(2, TimeUnit.MINUTES); + System.err.println(">>>> Received element : " + element); assertThat(element, is(notNullValue())); assertThat(element.getValue(), is(elementHead.getValue())); assertThat(element.getPosition(), is(elementHead.getPosition())); @@ -4551,7 +4374,7 @@ public void shouldPurgeSubscriberGroup() throws Exception * Assert that {@code subscriberTest} has been positioned at the tail of the topic. *

* Subscriber {@code subscriberExpected} has been positioned at the tail by calling {@link Subscriber#receive()} - * whereas {@code subscriberTest} has been positioned by a call to {@link Subscriber#seek}. + * whereas {@code subscriberTest} has been positioned by a call to {@link Subscriber#seek(int, Position)}. * * @param subscriberTest the {@link Subscriber} to seek * @param subscriberExpected the {@link Subscriber} already at the tail @@ -4873,9 +4696,9 @@ public Statement apply(Statement base, Description d) { Class testClass = d.getTestClass(); File folder = MavenProjectFileUtils.ensureTestOutputBaseFolder(testClass); - m_out = new PrintStream(new File(folder, testClass.getSimpleName() + ".log")); + m_out = new PrintWriter(new FileWriter(new File(folder, testClass.getSimpleName() + ".log"), true)); } - catch (FileNotFoundException e) + catch (IOException e) { throw Exceptions.ensureRuntimeException(e); } @@ -4931,6 +4754,12 @@ protected void finished(Description description) super.finished(description); } + protected void println(String sMessage) + { + m_out.println(sMessage); + m_out.flush(); + } + /** * @return the name of the currently-running test method */ @@ -4942,7 +4771,7 @@ public String getMethodName() { private volatile String m_sName; - private PrintStream m_out; + private PrintWriter m_out; } static class ListLogger @@ -4979,7 +4808,7 @@ ArrayList getLog() public static final ThreadDumpOnTimeoutRule timeout = ThreadDumpOnTimeoutRule.after(30, TimeUnit.MINUTES); @Rule - public Watcher m_testName = new Watcher(); + public Watcher m_testWatcher = new Watcher(); // MUST BE STATIC because JUnit creates a new test class per test method protected static final AtomicInteger m_nTopic = new AtomicInteger(0); diff --git a/prj/test/functional/topics/src/main/java/topics/DefaultConfigJavaSerializerTopicTests.java b/prj/test/functional/topics/src/main/java/topics/DefaultConfigJavaSerializerTopicTests.java index 8a280396764e2..cfed2bf46c091 100644 --- a/prj/test/functional/topics/src/main/java/topics/DefaultConfigJavaSerializerTopicTests.java +++ b/prj/test/functional/topics/src/main/java/topics/DefaultConfigJavaSerializerTopicTests.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2022, Oracle and/or its affiliates. + * Copyright (c) 2000, 2023, Oracle and/or its affiliates. * * Licensed under the Universal Permissive License v 1.0 as shown at * https://oss.oracle.com/licenses/upl. @@ -66,7 +66,7 @@ public static void setup() throws Exception @Before public void logStart() { - String sMsg = ">>>>> Starting test: " + m_testName.getMethodName(); + String sMsg = ">>>>> Starting test: " + m_testWatcher.getMethodName(); for (CoherenceClusterMember member : cluster.getCluster()) { member.submit(() -> System.err.println(sMsg)).join(); @@ -76,7 +76,7 @@ public void logStart() @After public void logEnd() { - String sMsg = ">>>>> Finished test: " + m_testName.getMethodName(); + String sMsg = ">>>>> Finished test: " + m_testWatcher.getMethodName(); for (CoherenceClusterMember member : cluster.getCluster()) { member.submit(() -> System.err.println(sMsg)).join(); diff --git a/prj/test/functional/topics/src/main/java/topics/DefaultConfigPofSerializerTopicTests.java b/prj/test/functional/topics/src/main/java/topics/DefaultConfigPofSerializerTopicTests.java index 8964b98c273f4..2ae959bae6393 100644 --- a/prj/test/functional/topics/src/main/java/topics/DefaultConfigPofSerializerTopicTests.java +++ b/prj/test/functional/topics/src/main/java/topics/DefaultConfigPofSerializerTopicTests.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2022, Oracle and/or its affiliates. + * Copyright (c) 2000, 2023, Oracle and/or its affiliates. * * Licensed under the Universal Permissive License v 1.0 as shown at * https://oss.oracle.com/licenses/upl. @@ -69,7 +69,7 @@ public static void setup() throws Exception @Before public void logStart() { - String sMsg = ">>>>> Starting test: " + m_testName.getMethodName(); + String sMsg = ">>>>> Starting test: " + m_testWatcher.getMethodName(); for (CoherenceClusterMember member : cluster.getCluster()) { member.submit(() -> System.err.println(sMsg)).join(); @@ -79,7 +79,7 @@ public void logStart() @After public void logEnd() { - String sMsg = ">>>>> Finished test: " + m_testName.getMethodName(); + String sMsg = ">>>>> Finished test: " + m_testWatcher.getMethodName(); for (CoherenceClusterMember member : cluster.getCluster()) { member.submit(() -> System.err.println(sMsg)).join(); diff --git a/prj/test/functional/topics/src/main/java/topics/LocalNamedTopicTests.java b/prj/test/functional/topics/src/main/java/topics/LocalNamedTopicTests.java index fd06e218ed764..b41d1b0787c84 100644 --- a/prj/test/functional/topics/src/main/java/topics/LocalNamedTopicTests.java +++ b/prj/test/functional/topics/src/main/java/topics/LocalNamedTopicTests.java @@ -16,8 +16,11 @@ import com.tangosol.internal.net.ConfigurableCacheFactorySession; import com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches; +import com.tangosol.internal.net.topic.impl.paged.PagedTopicSubscriber; import com.tangosol.internal.net.topic.impl.paged.model.ContentKey; +import com.tangosol.internal.net.topic.impl.paged.model.Page; +import com.tangosol.internal.net.topic.impl.paged.model.PagedPosition; import com.tangosol.net.BackingMapContext; import com.tangosol.net.BackingMapManager; import com.tangosol.net.CacheFactory; @@ -25,6 +28,7 @@ import com.tangosol.net.DefaultCacheServer; import com.tangosol.net.ExtensibleConfigurableCacheFactory; import com.tangosol.net.NamedCache; +import com.tangosol.net.PagedTopicService; import com.tangosol.net.Session; import com.tangosol.net.events.EventDispatcher; @@ -34,6 +38,7 @@ import com.tangosol.net.events.partition.cache.PartitionedCacheDispatcher; import com.tangosol.net.partition.ObservableSplittingBackingCache; +import com.tangosol.net.topic.Position; import com.tangosol.net.topic.Subscriber.CompleteOnEmpty; import com.tangosol.run.xml.XmlElement; import com.tangosol.run.xml.XmlHelper; @@ -63,11 +68,13 @@ import org.junit.runners.Parameterized; import java.net.URL; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -77,12 +84,12 @@ import static com.oracle.bedrock.deferred.DeferredHelper.invoking; import static com.tangosol.net.cache.TypeAssertion.withoutTypeChecking; -import static com.tangosol.net.topic.Subscriber.Name.of; import static com.tangosol.net.topic.Subscriber.inGroup; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.fail; @@ -314,7 +321,7 @@ public void shouldHandleErrorWhenPublishing() throws Exception // topic should contain just the first value assertThat(subscriber.receive().get(2, TimeUnit.MINUTES).getValue(), is(sPrefix + 0)); - assertThat(subscriber.receive().get(2, TimeUnit.MINUTES) == null, is(true)); + assertThat(subscriber.receive().get(2, TimeUnit.MINUTES), is(nullValue())); } for (int i = 0; i < listFutures.size(); i++) @@ -395,6 +402,220 @@ public void shouldUseLocalScheme() } } + @Test + public void shouldSeekGroupSubscriberForwardsUsingTimestamp() throws Exception + { + NamedTopic topic = ensureTopic(m_sSerializer + "-small-rewindable"); + + Assume.assumeThat("Test only applies when paged-topic-scheme has retain-consumed configured", + getDependencies(topic).isRetainConsumed(), is(true)); + + // publish a lot of messages, so we have multiple pages spread over all the partitions + String sSuffix = "abcdefghijklmnopqrstuvwxyz"; + int nChannel = 1; + + try (Publisher publisher = topic.createPublisher(Publisher.OrderBy.id(nChannel))) + { + for (int i = 0; i < 500; i++) + { + String sMsg = "element-" + i + sSuffix; + CompletableFuture future = publisher.publish(sMsg); + Publisher.Status status = future.get(1, TimeUnit.MINUTES); + + assertThat(status.getChannel(), is(nChannel)); + // sleep to ensure that every message has a different millisecond timestamp + Thread.sleep(3L); + } + } + + // Create two subscribers in different groups. + // We will receive messages from one and then seek the other to the same place + String sGroupPrefix = ensureGroupName(); + try (PagedTopicSubscriber subscriberOne = (PagedTopicSubscriber) topic.createSubscriber(inGroup(sGroupPrefix + "-one"), Subscriber.CompleteOnEmpty.enabled()); + PagedTopicSubscriber subscriberTwo = (PagedTopicSubscriber) topic.createSubscriber(inGroup(sGroupPrefix + "-two"))) + { + // move subscriber two on by receiving pages + // (we'll then seek subscriber one to the same place) + CompletableFuture> future = null; + for (int i = 0; i < 250; i++) + { + future = subscriberTwo.receive(); + } + + // Obtain the last received element for subscriber two + Subscriber.Element element = future.get(2, TimeUnit.MINUTES); + PagedPosition pagedPosition = (PagedPosition) element.getPosition(); + int nOffset = pagedPosition.getOffset(); + + // ensure the position is not a head or tail of a page + PagedTopicCaches caches = new PagedTopicCaches(topic.getName(), (PagedTopicService) topic.getService()); + Page page = caches.Pages.get(new Page.Key(nChannel, pagedPosition.getPage())); + while (nOffset == 0 || nOffset == page.getTail()) + { + // we're at a head or tail so read another + future = subscriberTwo.receive(); + element = future.get(2, TimeUnit.MINUTES); + pagedPosition = (PagedPosition) element.getPosition(); + nOffset = pagedPosition.getOffset(); + page = caches.Pages.get(new Page.Key(nChannel, pagedPosition.getPage())); + } + + Instant seekTimestamp = element.getTimestamp(); + PagedPosition expectedSeekPosition = (PagedPosition) element.getPosition(); + + // Poll the next element from subscriber two + Subscriber.Element elementTwo = subscriberTwo.receive().get(2, TimeUnit.MINUTES); + assertThat(elementTwo, is(notNullValue())); + while (elementTwo.getTimestamp().equals(seekTimestamp)) + { + // the next element is the same timestamp, so we need to read more + expectedSeekPosition = (PagedPosition) elementTwo.getPosition(); + elementTwo = subscriberTwo.receive().get(2, TimeUnit.MINUTES); + assertThat(elementTwo, is(notNullValue())); + } + + // we know we're now not at a head or tail of a page + // Seek subscriber one to the next message with a timestamp higher than seekTimestamp + Position result = subscriberOne.seek(element.getChannel(), seekTimestamp); + // should have the correct seek result + assertThat(result, is(expectedSeekPosition)); + + // Poll the next element for each subscriber, they should match + Subscriber.Element elementOne = subscriberOne.receive().get(2, TimeUnit.MINUTES); + assertThat(elementOne, is(notNullValue())); + assertThat(elementOne.getPosition(), is(elementTwo.getPosition())); + assertThat(elementOne.getValue(), is(elementTwo.getValue())); + + // now move subscriber two some way ahead + for (int i = 0; i < 100; i++) + { + subscriberTwo.receive().get(2, TimeUnit.MINUTES); + } + + element = subscriberTwo.receive().get(2, TimeUnit.MINUTES); + pagedPosition = (PagedPosition) element.getPosition(); + nOffset = pagedPosition.getOffset(); + page = caches.Pages.get(new Page.Key(nChannel, pagedPosition.getPage())); + + // keep reading until subscriber two has read the tail of the page + while (nOffset != page.getTail()) + { + // we're not at the tail so read another + future = subscriberTwo.receive(); + element = future.get(2, TimeUnit.MINUTES); + pagedPosition = (PagedPosition) element.getPosition(); + nOffset = pagedPosition.getOffset(); + } + + // we're now at the tail of a page + // Seek subscriber one to the last timestamp read by subscription two + System.err.println(">>>> Seeking subscriber one to timestamp from element: " + element + " "); + seekTimestamp = element.getTimestamp(); + result = subscriberOne.seek(element.getChannel(), seekTimestamp); + // should have a seeked result + Optional> peek = subscriberOne.peek(element.getChannel()); + if (peek.isPresent()) + { + Subscriber.Element elementPeek = peek.get(); + System.err.println(">>>> Head element timestamp is " + elementPeek.getTimestamp()); + System.err.println(">>>> Head element is " + elementPeek); + } + else + { + System.err.println(">>>> Head element is MISSING!!!!"); + } + assertThat(result, is(notNullValue())); + System.err.println(">>>> Seeked subscriber one to timestamp from element: " + element + " result: " + result); + + // Poll the next element for each subscriber, they should match + elementTwo = subscriberTwo.receive().get(2, TimeUnit.MINUTES); + assertThat(elementTwo, is(notNullValue())); + // ensure that we have read a later timestamp than the seeked to position + while (!elementTwo.getTimestamp().isAfter(seekTimestamp)) + { + elementTwo = subscriberTwo.receive().get(2, TimeUnit.MINUTES); + assertThat(elementTwo, is(notNullValue())); + } + + elementOne = subscriberOne.receive().get(2, TimeUnit.MINUTES); + assertThat(elementOne, is(notNullValue())); + + System.err.println(">>>> ElementOne: " + elementOne); + System.err.println(">>>> ElementTwo: " + elementTwo); + + assertThat(elementOne.getPosition(), is(elementTwo.getPosition())); + assertThat(elementOne.getValue(), is(elementTwo.getValue())); + + // now move subscriber two some way ahead + for (int i = 0; i < 100; i++) + { + subscriberTwo.receive().get(2, TimeUnit.MINUTES); + } + + // Poll the next element from subscriber two + element = subscriberTwo.receive().get(2, TimeUnit.MINUTES); + assertThat(element, is(notNullValue())); + while (element.getTimestamp().equals(seekTimestamp)) + { + // the next element is the same timestamp, so we need to read more + element = subscriberTwo.receive().get(2, TimeUnit.MINUTES); + assertThat(element, is(notNullValue())); + } + + pagedPosition = (PagedPosition) element.getPosition(); + nOffset = pagedPosition.getOffset(); + expectedSeekPosition = pagedPosition; + + // keep reading until subscriber two has read the head of the page + while (nOffset != 0) + { + // we're not at the head so read another + future = subscriberTwo.receive(); + element = future.get(2, TimeUnit.MINUTES); + pagedPosition = (PagedPosition) element.getPosition(); + nOffset = pagedPosition.getOffset(); + expectedSeekPosition = pagedPosition; + } + + // we're now at the head of a page + // Seek subscriber one to the last timestamp read by subscription two + System.err.println(">>>> Seeking subscriber one to timestamp from element: " + element + " "); + seekTimestamp = element.getTimestamp(); + result = subscriberOne.seek(element.getChannel(), seekTimestamp); + System.err.println(">>>> Seeked subscriber one to timestamp from element: " + element + " result: " + result); + peek = subscriberOne.peek(element.getChannel()); + if (peek.isPresent()) + { + Subscriber.Element elementPeek = peek.get(); + System.err.println(">>>> Head element timestamp is " + elementPeek.getTimestamp()); + System.err.println(">>>> Head element is " + elementPeek); + } + else + { + System.err.println(">>>> Head element is MISSING!!!!"); + } + // should have correct seeked result + assertThat(result, is(expectedSeekPosition)); + + // Poll the next element for each subscriber, they should match + elementTwo = subscriberTwo.receive().get(2, TimeUnit.MINUTES); + assertThat(elementTwo, is(notNullValue())); + // ensure that we have read a later timestamp than the seeked to position + while (!elementTwo.getTimestamp().isAfter(seekTimestamp)) + { + elementTwo = subscriberTwo.receive().get(2, TimeUnit.MINUTES); + assertThat(elementTwo, is(notNullValue())); + } + + elementOne = subscriberOne.receive().get(2, TimeUnit.MINUTES); + assertThat(elementOne, is(notNullValue())); + System.err.println(">>>> ElementOne: " + elementOne); + System.err.println(">>>> ElementTwo: " + elementTwo); + assertThat(elementOne.getPosition(), is(elementTwo.getPosition())); + assertThat(elementOne.getValue(), is(elementTwo.getValue())); + } + } + // ----- helper methods ------------------------------------------------- diff --git a/prj/test/functional/topics/src/main/java/topics/NamedTopicTests.java b/prj/test/functional/topics/src/main/java/topics/NamedTopicTests.java index 8b7db47e0fb5e..c41fce0acc2c4 100644 --- a/prj/test/functional/topics/src/main/java/topics/NamedTopicTests.java +++ b/prj/test/functional/topics/src/main/java/topics/NamedTopicTests.java @@ -79,7 +79,7 @@ public static void setupClass() @Before public void logStart() { - String sMsg = ">>>>> Starting test: " + m_testName.getMethodName(); + String sMsg = ">>>>> Starting test: " + m_testWatcher.getMethodName(); for (CoherenceClusterMember member : cluster.getCluster()) { member.submit(() -> @@ -94,7 +94,7 @@ public void logStart() @After public void logEnd() { - String sMsg = ">>>>> Finished test: " + m_testName.getMethodName(); + String sMsg = ">>>>> Finished test: " + m_testWatcher.getMethodName(); for (CoherenceClusterMember member : cluster.getCluster()) { member.submit(() -> diff --git a/prj/test/functional/topics/src/main/java/topics/bug_35945522/Bug35945522Tests.java b/prj/test/functional/topics/src/main/java/topics/bug_35945522/Bug35945522Tests.java index ee7d2ea0897e4..aaee33d949884 100644 --- a/prj/test/functional/topics/src/main/java/topics/bug_35945522/Bug35945522Tests.java +++ b/prj/test/functional/topics/src/main/java/topics/bug_35945522/Bug35945522Tests.java @@ -17,6 +17,7 @@ import com.oracle.bedrock.runtime.coherence.options.Logging; import com.oracle.bedrock.runtime.coherence.options.RoleName; import com.oracle.bedrock.runtime.coherence.options.WellKnownAddress; +import com.oracle.bedrock.runtime.concurrent.runnable.ThreadDump; import com.oracle.bedrock.runtime.java.options.ClassName; import com.oracle.bedrock.runtime.java.options.HeapSize; import com.oracle.bedrock.runtime.java.options.IPv4Preferred; @@ -25,12 +26,12 @@ import com.oracle.bedrock.testsupport.deferred.Eventually; import com.oracle.bedrock.testsupport.junit.TestLogs; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import topics.NamedTopicTests; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; @@ -50,7 +51,6 @@ @SuppressWarnings("resource") public class Bug35945522Tests { - @Ignore("Disabled until we fix the intermittent failure") @Test public void shouldContinueToReceive() throws Exception { @@ -97,13 +97,24 @@ public void shouldContinueToReceive() throws Exception } // publish 2 messages to each channel + System.err.println("***** Publishing first message"); assertThat(publisher.invoke(new PublisherMain.Publish()), is(cChannel)); + System.err.println("***** Publishing second message"); assertThat(publisher.invoke(new PublisherMain.Publish()), is(cChannel)); // wait for the subscribers to receive them for (CoherenceClusterMember member : mapSub.values()) { - Eventually.assertDeferred(() -> member.invoke(SubscriberMain.GET_RECEIVED_COUNT), is(2)); + System.err.println("***** Checking member " + member.getName() + " for two messages"); + try + { + Eventually.assertDeferred(() -> member.invoke(SubscriberMain.GET_RECEIVED_COUNT), is(2)); + } + catch (AssertionError e) + { + member.submit(ThreadDump.toStdErr()).get(5, TimeUnit.MINUTES); + throw e; + } } // kill a subscriber from the middle of the set of subscribers diff --git a/prj/test/functional/topics/src/main/java/topics/bug_35945522/PublisherMain.java b/prj/test/functional/topics/src/main/java/topics/bug_35945522/PublisherMain.java index 56014fc07e415..b3ff3be4c5fa3 100644 --- a/prj/test/functional/topics/src/main/java/topics/bug_35945522/PublisherMain.java +++ b/prj/test/functional/topics/src/main/java/topics/bug_35945522/PublisherMain.java @@ -14,7 +14,6 @@ import com.tangosol.net.topic.NamedTopic; import com.tangosol.net.topic.Publisher; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -71,15 +70,28 @@ public static class Publish @Override public Integer call() throws Exception { - System.err.println("***** In Publish.call()"); + Logger.info("***** Entered Publish.call()"); Publisher publisher = ensurePublisher(); int cChannel = publisher.getChannelCount(); - System.err.println("***** In Publish.call() - publishing " + cChannel + " messages"); + Logger.info("***** In Publish.call() - publishing " + cChannel + " messages"); for (int i = 0; i < cChannel; i++) { - publisher.publish("message-" + s_cMessage.getAndIncrement()); + int nMessage = s_cMessage.getAndIncrement(); + publisher.publish("message-" + nMessage) + .handle((status, err) -> + { + if (err == null) + { + Logger.info("***** In Publish.call() - Completed publish of message " + nMessage + " with status: " + status); + } + else + { + Logger.info("***** In Publish.call() - Completed publish of message " + nMessage + " with error: ", err); + } + return null; + }); } - System.err.println("***** In Publish.call() - published " + cChannel + " messages"); + Logger.info("***** Exiting Publish.call() - published " + cChannel + " messages"); return cChannel; } } diff --git a/prj/test/functional/topics/src/main/java/topics/bug_35945522/SubscriberMain.java b/prj/test/functional/topics/src/main/java/topics/bug_35945522/SubscriberMain.java index c6577cd79353a..d096834ba00a8 100644 --- a/prj/test/functional/topics/src/main/java/topics/bug_35945522/SubscriberMain.java +++ b/prj/test/functional/topics/src/main/java/topics/bug_35945522/SubscriberMain.java @@ -23,7 +23,6 @@ public class SubscriberMain implements Constants { - @SuppressWarnings("resource") public static void main(String[] args) throws Exception { try (Coherence coherence = Coherence.clusterMember().start().get(5, TimeUnit.MINUTES)) @@ -36,16 +35,22 @@ public static void main(String[] args) throws Exception try (Subscriber subscriber = session.createSubscriber(TOPIC_NAME, Subscriber.inGroup(GROUP_NAME))) { + int cChannel = subscriber.getChannelCount(); s_subscriber = subscriber; s_nSubscriberId = ((PagedTopicSubscriber) subscriber).getSubscriberId().getId(); + Logger.info("Created subscriber: " + s_subscriber); while (true) { try { - future = subscriber.receive(); - Logger.info("Calling receive..."); + future = subscriber.receive(); + Logger.info("Called receive: " + future + " subscriber " + subscriber); + for (int i = 0; i < cChannel; i++) + { + Logger.info("**** Channel (" + i + ") " + ((PagedTopicSubscriber) subscriber).getChannel(i)); + } Element element = future.get(cWaitSeconds, TimeUnit.SECONDS); Logger.info("Received message: " + element); s_cReceived++;