From cfaac471434917bbdf09620b4c7e7e41ac0b1844 Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Thu, 28 Sep 2023 13:26:35 +0530 Subject: [PATCH 01/25] Changing the failure retry --- .../druid/metadata/IndexerSQLMetadataStorageCoordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index bb7759a8b5d6..924c06ccbbeb 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1577,7 +1577,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( oldCommitMetadataFromDb, startMetadata ); - return DataStoreMetadataUpdateResult.FAILURE; + return DataStoreMetadataUpdateResult.TRY_AGAIN; } // Only endOffsets should be stored in metadata store From eb11c431a4de803acfe08b4c78bfb76b7aca26ed Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Thu, 28 Sep 2023 14:27:37 +0530 Subject: [PATCH 02/25] Updating test cases --- .../metadata/IndexerSQLMetadataStorageCoordinatorTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index f7af9611e51a..0b2fa805defa 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -680,7 +680,7 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result1); + Assert.assertEquals(SegmentPublishResult.fail("java.lang.RetryTransactionException: Aborting transaction!"), result1); // Should only be tried once. Assert.assertEquals(1, metadataUpdateCounter.get()); @@ -805,7 +805,7 @@ public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result2); + Assert.assertEquals(SegmentPublishResult.fail("java.lang.RetryTransactionException: Aborting transaction!"), result2); // Should only be tried once per call. Assert.assertEquals(2, metadataUpdateCounter.get()); @@ -828,7 +828,7 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOExcep new ObjectMetadata(ImmutableMap.of("foo", "qux")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result2); + Assert.assertEquals(SegmentPublishResult.fail("java.lang.RetryTransactionException: Aborting transaction!"), result2); // Should only be tried once per call. Assert.assertEquals(2, metadataUpdateCounter.get()); From 55812958579c9ead8497e232881d0e10885db3d5 Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Thu, 28 Sep 2023 15:40:21 +0530 Subject: [PATCH 03/25] Updating test cases --- .../org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java | 4 ++++ .../apache/druid/indexing/kinesis/KinesisIndexTaskTest.java | 4 ++++ .../common/actions/SegmentTransactionalInsertActionTest.java | 2 +- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 3a9000c3d6e9..5ac22e0eae8f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -159,6 +159,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -1011,6 +1012,7 @@ DataSourceMetadata newDataSchemaMetadata() } @Test(timeout = 60_000L) + @Ignore public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception { records = generateSinglePartitionRecords(topic); @@ -1794,6 +1796,7 @@ public void testMultipleParseExceptionsFailure() throws Exception } @Test(timeout = 60_000L) + @Ignore public void testRunReplicas() throws Exception { final KafkaIndexTask task1 = createTask( @@ -1862,6 +1865,7 @@ public void testRunReplicas() throws Exception } @Test(timeout = 60_000L) + @Ignore public void testRunConflicting() throws Exception { final KafkaIndexTask task1 = createTask( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 99f939433f7d..0cc2432fa87b 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -126,6 +126,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -1610,6 +1611,7 @@ public void testMultipleParseExceptionsFailure() throws Exception @Test(timeout = 120_000L) + @Ignore public void testRunReplicas() throws Exception { // Insert data @@ -1704,6 +1706,7 @@ public void testRunReplicas() throws Exception @Test(timeout = 120_000L) + @Ignore public void testRunConflicting() throws Exception { recordSupplier.assign(EasyMock.anyObject()); @@ -2522,6 +2525,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception } @Test(timeout = 5000L) + @Ignore public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception { final String baseSequenceName = "sequence0"; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index 5f50e4abf532..94e75785519b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -183,7 +183,7 @@ public void testFailTransactionalUpdateDataSourceMetadata() throws Exception actionTestKit.getTaskActionToolbox() ); - Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result); + Assert.assertEquals(SegmentPublishResult.fail("java.lang.RetryTransactionException: Aborting transaction!"), result); } @Test From 5b77c2efac0034f3b4186d936642a8076b1484b6 Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Thu, 28 Sep 2023 15:46:01 +0530 Subject: [PATCH 04/25] minor change --- .../actions/SegmentTransactionalInsertActionTest.java | 2 +- .../metadata/IndexerSQLMetadataStorageCoordinatorTest.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index 94e75785519b..cc81996dba1e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -183,7 +183,7 @@ public void testFailTransactionalUpdateDataSourceMetadata() throws Exception actionTestKit.getTaskActionToolbox() ); - Assert.assertEquals(SegmentPublishResult.fail("java.lang.RetryTransactionException: Aborting transaction!"), result); + Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result); } @Test diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 0b2fa805defa..de5776592560 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -680,7 +680,7 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(SegmentPublishResult.fail("java.lang.RetryTransactionException: Aborting transaction!"), result1); + Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result1); // Should only be tried once. Assert.assertEquals(1, metadataUpdateCounter.get()); @@ -805,7 +805,7 @@ public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(SegmentPublishResult.fail("java.lang.RetryTransactionException: Aborting transaction!"), result2); + Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result2); // Should only be tried once per call. Assert.assertEquals(2, metadataUpdateCounter.get()); @@ -828,7 +828,7 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOExcep new ObjectMetadata(ImmutableMap.of("foo", "qux")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(SegmentPublishResult.fail("java.lang.RetryTransactionException: Aborting transaction!"), result2); + Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result2); // Should only be tried once per call. Assert.assertEquals(2, metadataUpdateCounter.get()); From 941d32aa7b44c3659f8af7aa7ec43621855aa6a9 Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Thu, 28 Sep 2023 16:23:22 +0530 Subject: [PATCH 05/25] Test case fix --- .../IndexerSQLMetadataStorageCoordinatorTest.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index de5776592560..18a468daadd7 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -683,7 +683,8 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result1); // Should only be tried once. - Assert.assertEquals(1, metadataUpdateCounter.get()); + // TODO REVERT: Since now i have updated to retry instead of failure, there will more then one call for result2. + Assert.assertEquals(2, metadataUpdateCounter.get()); } @Test @@ -808,7 +809,8 @@ public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result2); // Should only be tried once per call. - Assert.assertEquals(2, metadataUpdateCounter.get()); + // TODO REVERT: Since now i have updated to retry instead of failure, there will more then one call for result2. + Assert.assertEquals(3, metadataUpdateCounter.get()); } @Test @@ -831,7 +833,8 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOExcep Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result2); // Should only be tried once per call. - Assert.assertEquals(2, metadataUpdateCounter.get()); + // TODO REVERT: Since now i have updated to retry instead of failure, there will more then one call for result2. + Assert.assertEquals(3, metadataUpdateCounter.get()); } @Test From 9c5edc284d4ac7f795cbd070e67ddeb1b5c51153 Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Wed, 4 Oct 2023 12:24:16 +0530 Subject: [PATCH 06/25] Changing the default number of retry to 2 --- .../druid/indexing/kafka/KafkaIndexTaskTest.java | 13 ++++++++----- .../indexing/kinesis/KinesisIndexTaskTest.java | 13 ++++++++----- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 5ac22e0eae8f..56a0a60a0c82 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -159,7 +159,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -1012,7 +1011,6 @@ DataSourceMetadata newDataSchemaMetadata() } @Test(timeout = 60_000L) - @Ignore public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception { records = generateSinglePartitionRecords(topic); @@ -1796,7 +1794,6 @@ public void testMultipleParseExceptionsFailure() throws Exception } @Test(timeout = 60_000L) - @Ignore public void testRunReplicas() throws Exception { final KafkaIndexTask task1 = createTask( @@ -1865,7 +1862,6 @@ public void testRunReplicas() throws Exception } @Test(timeout = 60_000L) - @Ignore public void testRunConflicting() throws Exception { final KafkaIndexTask task1 = createTask( @@ -3099,7 +3095,14 @@ private void makeToolboxFactory() throws IOException testUtils.getTestObjectMapper(), derby.metadataTablesConfigSupplier().get(), derbyConnector - ); + ) + { + @Override + public int getSqlMetadataMaxRetry() + { + return 2; + } + }; taskLockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 0cc2432fa87b..9497c5ca2cc0 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -126,7 +126,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -1611,7 +1610,6 @@ public void testMultipleParseExceptionsFailure() throws Exception @Test(timeout = 120_000L) - @Ignore public void testRunReplicas() throws Exception { // Insert data @@ -1706,7 +1704,6 @@ public void testRunReplicas() throws Exception @Test(timeout = 120_000L) - @Ignore public void testRunConflicting() throws Exception { recordSupplier.assign(EasyMock.anyObject()); @@ -2525,7 +2522,6 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception } @Test(timeout = 5000L) - @Ignore public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception { final String baseSequenceName = "sequence0"; @@ -3063,7 +3059,14 @@ private void makeToolboxFactory() throws IOException testUtils.getTestObjectMapper(), derby.metadataTablesConfigSupplier().get(), derbyConnector - ); + ) + { + @Override + public int getSqlMetadataMaxRetry() + { + return 2; + } + }; taskLockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, From c25f6b7ca48cbba6968c16fcf98d569315a4b447 Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Thu, 5 Oct 2023 10:18:22 +0530 Subject: [PATCH 07/25] Adding test case --- .../SegmentTransactionalInsertActionTest.java | 107 ++++++++++++++++++ .../common/actions/TaskActionTestKit.java | 5 + 2 files changed, 112 insertions(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index cc81996dba1e..458dd9285f08 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -29,17 +29,24 @@ import org.apache.druid.indexing.overlord.ObjectMetadata; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TimeChunkLockRequest; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; +import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.LinearShardSpec; import org.assertj.core.api.Assertions; +import org.easymock.EasyMock; import org.hamcrest.CoreMatchers; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; public class SegmentTransactionalInsertActionTest { @@ -134,6 +141,106 @@ public void testTransactionalUpdateDataSourceMetadata() throws Exception ); } + @Test + public void testTransactionalUpdateDataSourceMetadataWithRecoverFromMetadataMismatch() throws Exception + { + final Task task1 = NoopTask.create(); + final TaskLockbox taskLockbox1 = new TaskLockbox(actionTestKit.getTaskStorage(), actionTestKit.getMetadataStorageCoordinator()); + taskLockbox1.add(task1); + taskLockbox1.lock(task1, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, task1, INTERVAL, null), 5000); + + final Task task2 = NoopTask.create(); + final TaskLockbox taskLockbox2 = new TaskLockbox(actionTestKit.getTaskStorage(), actionTestKit.getMetadataStorageCoordinator()); + taskLockbox2.add(task2); + taskLockbox2.lock(task2, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, task2, INTERVAL, null), 5000); + + final TaskLockConfig taskLockConfig = new TaskLockConfig() + { + @Override + public boolean isBatchSegmentAllocation() + { + return true; + } + + @Override + public long getBatchAllocationWaitTime() + { + return 10L; + } + }; + TaskActionToolbox taskActionToolbox = actionTestKit.getTaskActionToolbox(); + + // Task1 and Task2 tries to publish segment1 and segment2 for same partition at around same time. + // With different start and end offsets. Segment2 -> {1 - 2}, Segment1 -> {null - 1} + Future result2Future = CompletableFuture.supplyAsync(() -> { + return SegmentTransactionalInsertAction.appendAction( + ImmutableSet.of(SEGMENT2), + new ObjectMetadata(ImmutableList.of(1)), + new ObjectMetadata(ImmutableList.of(2)) + ).perform( + task2, + new TaskActionToolbox( + taskLockbox2, + taskActionToolbox.getTaskStorage(), + taskActionToolbox.getIndexerMetadataStorageCoordinator(), + new SegmentAllocationQueue( + taskLockbox2, + taskLockConfig, + taskActionToolbox.getIndexerMetadataStorageCoordinator(), + taskActionToolbox.getEmitter(), + ScheduledExecutors::fixed + ), + taskActionToolbox.getEmitter(), + EasyMock.createMock(SupervisorManager.class), + taskActionToolbox.getJsonMapper() + ) + ); + }); + + + Future result1Future = CompletableFuture.supplyAsync(() -> { + return SegmentTransactionalInsertAction.appendAction( + ImmutableSet.of(SEGMENT1), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableList.of(1)) + ).perform( + task1, + new TaskActionToolbox( + taskLockbox1, + taskActionToolbox.getTaskStorage(), + taskActionToolbox.getIndexerMetadataStorageCoordinator(), + new SegmentAllocationQueue( + taskLockbox1, + taskLockConfig, + taskActionToolbox.getIndexerMetadataStorageCoordinator(), + taskActionToolbox.getEmitter(), + ScheduledExecutors::fixed + ), + taskActionToolbox.getEmitter(), + EasyMock.createMock(SupervisorManager.class), + taskActionToolbox.getJsonMapper() + ) + + ); + }); + + SegmentPublishResult result2 = result2Future.get(); + SegmentPublishResult result1 = result1Future.get(); + + Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT1)), result1); + Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT2)), result2); + + Assertions.assertThat( + actionTestKit.getMetadataStorageCoordinator() + .retrieveUsedSegmentsForInterval(DATA_SOURCE, INTERVAL, Segments.ONLY_VISIBLE) + ).containsExactlyInAnyOrder(SEGMENT1, SEGMENT2); + + Assert.assertEquals( + new ObjectMetadata(ImmutableList.of(2)), + actionTestKit.getMetadataStorageCoordinator().retrieveDataSourceMetadata(DATA_SOURCE) + ); + } + @Test public void testTransactionalDropSegments() throws Exception { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index eebf78a7ddcb..abe551ce2cf7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -64,6 +64,11 @@ public IndexerMetadataStorageCoordinator getMetadataStorageCoordinator() return metadataStorageCoordinator; } + public TaskStorage getTaskStorage() + { + return taskStorage; + } + public SegmentsMetadataManager getSegmentsMetadataManager() { return segmentsMetadataManager; From ccd9eb744448de21f08be19f68fc28419c403c5b Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Wed, 11 Oct 2023 17:37:19 +0530 Subject: [PATCH 08/25] Changing the logic to retry in case of new startoffset is greater than commited endoffset --- .../DerivativeDataSourceMetadata.java | 6 ++++ .../indexing/kafka/KafkaIndexTaskTest.java | 9 +---- .../kinesis/KinesisIndexTaskTest.java | 9 +---- .../SeekableStreamDataSourceMetadata.java | 13 +++++++ .../SeekableStreamEndSequenceNumbers.java | 34 +++++++++++++++++++ .../SeekableStreamStartSequenceNumbers.java | 34 +++++++++++++++++++ .../indexing/overlord/DataSourceMetadata.java | 9 +++++ .../indexing/overlord/ObjectMetadata.java | 6 ++++ .../IndexerSQLMetadataStorageCoordinator.java | 18 +++++++++- ...exerSQLMetadataStorageCoordinatorTest.java | 12 +++---- 10 files changed, 126 insertions(+), 24 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/DerivativeDataSourceMetadata.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/DerivativeDataSourceMetadata.java index 0c38b2fd3bdf..4ac062259b6f 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/DerivativeDataSourceMetadata.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/DerivativeDataSourceMetadata.java @@ -85,6 +85,12 @@ public boolean matches(DataSourceMetadata other) return equals(other); } + @Override + public boolean isGreater(DataSourceMetadata other) + { + return false; + } + @Override public DataSourceMetadata plus(DataSourceMetadata other) { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 56a0a60a0c82..3a9000c3d6e9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -3095,14 +3095,7 @@ private void makeToolboxFactory() throws IOException testUtils.getTestObjectMapper(), derby.metadataTablesConfigSupplier().get(), derbyConnector - ) - { - @Override - public int getSqlMetadataMaxRetry() - { - return 2; - } - }; + ); taskLockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 9497c5ca2cc0..99f939433f7d 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -3059,14 +3059,7 @@ private void makeToolboxFactory() throws IOException testUtils.getTestObjectMapper(), derby.metadataTablesConfigSupplier().get(), derbyConnector - ) - { - @Override - public int getSqlMetadataMaxRetry() - { - return 2; - } - }; + ); taskLockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java index 4ea1e992da0d..17a00ef61f42 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java @@ -59,6 +59,19 @@ public boolean matches(DataSourceMetadata other) return plus(other).equals(other.plus(this)); } + @Override + public boolean isGreater(DataSourceMetadata other) + { + if (!getClass().equals(other.getClass())) { + return false; + } + final SeekableStreamDataSourceMetadata that = + (SeekableStreamDataSourceMetadata) other; + + return seekableStreamSequenceNumbers.isGreater(that.seekableStreamSequenceNumbers); + } + + @Override public DataSourceMetadata plus(DataSourceMetadata other) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java index 3f2f7bfe1bc0..e972cd7ef6c4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; /** * Represents the end sequenceNumber per partition of a sequence. Note that end sequenceNumbers are always @@ -147,6 +148,39 @@ public SeekableStreamSequenceNumbers plus( } } + @Override + public boolean isGreater( + SeekableStreamSequenceNumbers other + ) + { + if (this.getClass() != other.getClass()) { + throw new IAE( + "Expected instance of %s, got %s", + this.getClass().getName(), + other.getClass().getName() + ); + } + + final SeekableStreamEndSequenceNumbers otherStart = + (SeekableStreamEndSequenceNumbers) other; + + if (stream.equals(otherStart.stream)) { + //Same stream, compare the offset + AtomicReference res = new AtomicReference<>(false); + partitionSequenceNumberMap.forEach( + (partitionId, sequenceOffset) -> { + if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && Long.parseLong(String.valueOf(sequenceOffset)) > Long.parseLong(String.valueOf(otherStart.partitionSequenceNumberMap.get(partitionId)))) { + res.set(true); + } + } + ); + return res.get(); + } else { + // Different streams + return false; + } + } + @Override public SeekableStreamSequenceNumbers minus( SeekableStreamSequenceNumbers other diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java index 52e5f31cf1d7..4c0947b96c29 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java @@ -32,6 +32,7 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; /** * Represents the start sequenceNumber per partition of a sequence. This class keeps an additional set of @@ -161,6 +162,39 @@ public SeekableStreamSequenceNumbers plus( } } + @Override + public boolean isGreater( + SeekableStreamSequenceNumbers other + ) + { + if (this.getClass() != other.getClass()) { + throw new IAE( + "Expected instance of %s, got %s", + this.getClass().getName(), + other.getClass().getName() + ); + } + + final SeekableStreamStartSequenceNumbers otherStart = + (SeekableStreamStartSequenceNumbers) other; + + if (stream.equals(otherStart.stream)) { + //Same stream, compare the offset + AtomicReference res = new AtomicReference<>(false); + partitionSequenceNumberMap.forEach( + (partitionId, sequenceOffset) -> { + if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && Long.parseLong(String.valueOf(sequenceOffset)) > Long.parseLong(String.valueOf(otherStart.partitionSequenceNumberMap.get(partitionId)))) { + res.set(true); + } + } + ); + return res.get(); + } else { + // Different streams + return false; + } + } + @Override public SeekableStreamSequenceNumbers minus( SeekableStreamSequenceNumbers other diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java b/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java index 6cad3c12649b..3d596fb480c0 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java @@ -69,6 +69,15 @@ public interface DataSourceMetadata */ boolean matches(DataSourceMetadata other); + /** + * Returns true if the metadata in this instance is greater than the metadata in "other" + * + * Behavior is undefined if you pass in an instance of a different class from this one. + * + * @return true or false + */ + boolean isGreater(DataSourceMetadata other); + /** * Returns a copy of this instance with "other" merged in. Any conflicts should be resolved in favor of * information from "other". diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java b/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java index 7a8d01042484..6401427fe3cf 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java @@ -60,6 +60,12 @@ public boolean matches(DataSourceMetadata other) return equals(other); } + @Override + public boolean isGreater(DataSourceMetadata other) + { + return false; + } + @Override public DataSourceMetadata plus(DataSourceMetadata other) { diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 924c06ccbbeb..428ce8ec0e3d 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1558,11 +1558,17 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( } final boolean startMetadataMatchesExisting; + final boolean startMetadataGreaterthenExisting; if (oldCommitMetadataFromDb == null) { startMetadataMatchesExisting = startMetadata.isValidStart(); + startMetadataGreaterthenExisting = true; } else { // Checking against the last committed metadata. + // If the new start sequence number is greater than the end sequence number of last commit. + // It's possible multiple tasks are publishing the sequence at around same time. + startMetadataGreaterthenExisting = startMetadata.asStartMetadata().isGreater(oldCommitMetadataFromDb.asStartMetadata()); + // Converting the last one into start metadata for checking since only the same type of metadata can be matched. // Even though kafka/kinesis indexing services use different sequenceNumber types for representing // start and end sequenceNumbers, the below conversion is fine because the new start sequenceNumbers are supposed @@ -1570,6 +1576,16 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( startMetadataMatchesExisting = startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata()); } + if (startMetadataGreaterthenExisting && !startMetadataMatchesExisting) { + // Offset stored in StartMetadata is Greater than the last commited metadata, + // Then retry multiple task might be trying to publish the segment for same partitions. + log.info( + "Retrying to update metadata, existing state[%s] in metadata store is behind the new start state[%s].", + oldCommitMetadataFromDb, + startMetadata + ); + return DataStoreMetadataUpdateResult.TRY_AGAIN; + } if (!startMetadataMatchesExisting) { // Not in the desired start state. log.error( @@ -1577,7 +1593,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( oldCommitMetadataFromDb, startMetadata ); - return DataStoreMetadataUpdateResult.TRY_AGAIN; + return DataStoreMetadataUpdateResult.FAILURE; } // Only endOffsets should be stored in metadata store diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 18a468daadd7..5a625cb197a5 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -680,11 +680,10 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result1); + Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result1); // Should only be tried once. - // TODO REVERT: Since now i have updated to retry instead of failure, there will more then one call for result2. - Assert.assertEquals(2, metadataUpdateCounter.get()); + Assert.assertEquals(1, metadataUpdateCounter.get()); } @Test @@ -779,7 +778,7 @@ public void testTransactionalAnnounceFailSegmentDropFailWithRetry() throws IOExc null ); Assert.assertEquals(SegmentPublishResult.fail( - "org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result1); + "java.lang.RuntimeException: Aborting transaction!"), result1); Assert.assertEquals(MAX_SQL_MEATADATA_RETRY_FOR_TEST, segmentTableDropUpdateCounter.get()); @@ -806,7 +805,7 @@ public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result2); + Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result2); // Should only be tried once per call. // TODO REVERT: Since now i have updated to retry instead of failure, there will more then one call for result2. @@ -833,8 +832,7 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOExcep Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result2); // Should only be tried once per call. - // TODO REVERT: Since now i have updated to retry instead of failure, there will more then one call for result2. - Assert.assertEquals(3, metadataUpdateCounter.get()); + Assert.assertEquals(2, metadataUpdateCounter.get()); } @Test From 0425c89218bdac9347ee9af3fe1ac70b3461f3f2 Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Wed, 11 Oct 2023 17:48:01 +0530 Subject: [PATCH 09/25] minor change --- .../seekablestream/SeekableStreamSequenceNumbers.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java index a790974e25f1..f6fe42d3a67d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java @@ -53,6 +53,15 @@ SeekableStreamSequenceNumbers plus( SeekableStreamSequenceNumbers other ); + /** + * Return True if this is greater than other instance sequence. + * + * @see DataSourceMetadata#isGreater + */ + boolean isGreater( + SeekableStreamSequenceNumbers other + ); + /** * Subtracts the given other from this and returns the result. * From 2c8a32ea353929d09b713260204b38431475cc74 Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Wed, 11 Oct 2023 17:52:53 +0530 Subject: [PATCH 10/25] Fix test cases --- .../metadata/IndexerSQLMetadataStorageCoordinatorTest.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 5a625cb197a5..f7af9611e51a 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -778,7 +778,7 @@ public void testTransactionalAnnounceFailSegmentDropFailWithRetry() throws IOExc null ); Assert.assertEquals(SegmentPublishResult.fail( - "java.lang.RuntimeException: Aborting transaction!"), result1); + "org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result1); Assert.assertEquals(MAX_SQL_MEATADATA_RETRY_FOR_TEST, segmentTableDropUpdateCounter.get()); @@ -808,8 +808,7 @@ public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result2); // Should only be tried once per call. - // TODO REVERT: Since now i have updated to retry instead of failure, there will more then one call for result2. - Assert.assertEquals(3, metadataUpdateCounter.get()); + Assert.assertEquals(2, metadataUpdateCounter.get()); } @Test @@ -829,7 +828,7 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOExcep new ObjectMetadata(ImmutableMap.of("foo", "qux")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result2); + Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result2); // Should only be tried once per call. Assert.assertEquals(2, metadataUpdateCounter.get()); From 34fc18c13dac167c7d02c578a1e967ff22f414ae Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Wed, 11 Oct 2023 18:00:14 +0530 Subject: [PATCH 11/25] Fix test cases --- .../metadata/IndexerSQLMetadataStorageCoordinatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index f7af9611e51a..ab9d2139a713 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -680,7 +680,7 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result1); + Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result1); // Should only be tried once. Assert.assertEquals(1, metadataUpdateCounter.get()); From 6964e0f35478bbaa1751ba3b6a805dfce7bd79c6 Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Wed, 11 Oct 2023 21:25:54 +0530 Subject: [PATCH 12/25] Fix test cases --- .../metadata/IndexerSQLMetadataStorageCoordinatorTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index ab9d2139a713..054de41f2559 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -683,7 +683,8 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result1); // Should only be tried once. - Assert.assertEquals(1, metadataUpdateCounter.get()); + // Now, we will retry for this test case as well, So it will be equal to total retries available which is 2. + Assert.assertEquals(2, metadataUpdateCounter.get()); } @Test From 08847bc192b07e2790d08f0ecaa3ab316bc0def3 Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Thu, 12 Oct 2023 14:54:09 +0530 Subject: [PATCH 13/25] Using CompareTo function instead of adding new greaterThen --- .../DerivativeDataSourceMetadata.java | 12 ++-- .../SeekableStreamDataSourceMetadata.java | 9 ++- .../SeekableStreamEndSequenceNumbers.java | 67 ++++++++++--------- .../SeekableStreamSequenceNumbers.java | 11 +-- .../SeekableStreamStartSequenceNumbers.java | 67 ++++++++++--------- .../indexing/overlord/DataSourceMetadata.java | 11 +-- .../indexing/overlord/ObjectMetadata.java | 12 ++-- .../IndexerSQLMetadataStorageCoordinator.java | 12 ++-- 8 files changed, 92 insertions(+), 109 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/DerivativeDataSourceMetadata.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/DerivativeDataSourceMetadata.java index 4ac062259b6f..2b3aca076595 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/DerivativeDataSourceMetadata.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/DerivativeDataSourceMetadata.java @@ -85,12 +85,6 @@ public boolean matches(DataSourceMetadata other) return equals(other); } - @Override - public boolean isGreater(DataSourceMetadata other) - { - return false; - } - @Override public DataSourceMetadata plus(DataSourceMetadata other) { @@ -141,4 +135,10 @@ public String toString() ", metrics=" + metrics + '}'; } + + @Override + public int compareTo(DataSourceMetadata o) + { + return -1; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java index 17a00ef61f42..3895ebeb60a4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java @@ -60,18 +60,17 @@ public boolean matches(DataSourceMetadata other) } @Override - public boolean isGreater(DataSourceMetadata other) + public int compareTo(DataSourceMetadata other) { if (!getClass().equals(other.getClass())) { - return false; + return -1; } final SeekableStreamDataSourceMetadata that = - (SeekableStreamDataSourceMetadata) other; + (SeekableStreamDataSourceMetadata) other; - return seekableStreamSequenceNumbers.isGreater(that.seekableStreamSequenceNumbers); + return seekableStreamSequenceNumbers.compareTo(that.seekableStreamSequenceNumbers); } - @Override public DataSourceMetadata plus(DataSourceMetadata other) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java index e972cd7ef6c4..799781073e57 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java @@ -148,39 +148,6 @@ public SeekableStreamSequenceNumbers plus( } } - @Override - public boolean isGreater( - SeekableStreamSequenceNumbers other - ) - { - if (this.getClass() != other.getClass()) { - throw new IAE( - "Expected instance of %s, got %s", - this.getClass().getName(), - other.getClass().getName() - ); - } - - final SeekableStreamEndSequenceNumbers otherStart = - (SeekableStreamEndSequenceNumbers) other; - - if (stream.equals(otherStart.stream)) { - //Same stream, compare the offset - AtomicReference res = new AtomicReference<>(false); - partitionSequenceNumberMap.forEach( - (partitionId, sequenceOffset) -> { - if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && Long.parseLong(String.valueOf(sequenceOffset)) > Long.parseLong(String.valueOf(otherStart.partitionSequenceNumberMap.get(partitionId)))) { - res.set(true); - } - } - ); - return res.get(); - } else { - // Different streams - return false; - } - } - @Override public SeekableStreamSequenceNumbers minus( SeekableStreamSequenceNumbers other @@ -252,4 +219,38 @@ public String toString() ", partitionSequenceNumberMap=" + partitionSequenceNumberMap + '}'; } + + @Override + public int compareTo(SeekableStreamSequenceNumbers other) + { + if (this.getClass() != other.getClass()) { + throw new IAE( + "Expected instance of %s, got %s", + this.getClass().getName(), + other.getClass().getName() + ); + } + + final SeekableStreamEndSequenceNumbers otherStart = + (SeekableStreamEndSequenceNumbers) other; + + if (stream.equals(otherStart.stream)) { + //Same stream, compare the offset + AtomicReference res = new AtomicReference<>(false); + partitionSequenceNumberMap.forEach( + (partitionId, sequenceOffset) -> { + if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && Long.parseLong(String.valueOf(sequenceOffset)) > Long.parseLong(String.valueOf(otherStart.partitionSequenceNumberMap.get(partitionId)))) { + res.set(true); + } + } + ); + if (res.get()) { + return 1; + } + return -1; + } else { + // Different streams + return -1; + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java index f6fe42d3a67d..4f2787e33d57 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java @@ -32,7 +32,7 @@ @Type(name = "start", value = SeekableStreamStartSequenceNumbers.class), @Type(name = "end", value = SeekableStreamEndSequenceNumbers.class) }) -public interface SeekableStreamSequenceNumbers +public interface SeekableStreamSequenceNumbers extends Comparable> { /** * Returns the stream/topic name. @@ -53,15 +53,6 @@ SeekableStreamSequenceNumbers plus( SeekableStreamSequenceNumbers other ); - /** - * Return True if this is greater than other instance sequence. - * - * @see DataSourceMetadata#isGreater - */ - boolean isGreater( - SeekableStreamSequenceNumbers other - ); - /** * Subtracts the given other from this and returns the result. * diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java index 4c0947b96c29..1cf33a608dda 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java @@ -162,39 +162,6 @@ public SeekableStreamSequenceNumbers plus( } } - @Override - public boolean isGreater( - SeekableStreamSequenceNumbers other - ) - { - if (this.getClass() != other.getClass()) { - throw new IAE( - "Expected instance of %s, got %s", - this.getClass().getName(), - other.getClass().getName() - ); - } - - final SeekableStreamStartSequenceNumbers otherStart = - (SeekableStreamStartSequenceNumbers) other; - - if (stream.equals(otherStart.stream)) { - //Same stream, compare the offset - AtomicReference res = new AtomicReference<>(false); - partitionSequenceNumberMap.forEach( - (partitionId, sequenceOffset) -> { - if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && Long.parseLong(String.valueOf(sequenceOffset)) > Long.parseLong(String.valueOf(otherStart.partitionSequenceNumberMap.get(partitionId)))) { - res.set(true); - } - } - ); - return res.get(); - } else { - // Different streams - return false; - } - } - @Override public SeekableStreamSequenceNumbers minus( SeekableStreamSequenceNumbers other @@ -273,4 +240,38 @@ public String toString() ", exclusivePartitions=" + exclusivePartitions + '}'; } + + @Override + public int compareTo(SeekableStreamSequenceNumbers other) + { + if (this.getClass() != other.getClass()) { + throw new IAE( + "Expected instance of %s, got %s", + this.getClass().getName(), + other.getClass().getName() + ); + } + + final SeekableStreamStartSequenceNumbers otherStart = + (SeekableStreamStartSequenceNumbers) other; + + if (stream.equals(otherStart.stream)) { + //Same stream, compare the offset + AtomicReference res = new AtomicReference<>(false); + partitionSequenceNumberMap.forEach( + (partitionId, sequenceOffset) -> { + if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && Long.parseLong(String.valueOf(sequenceOffset)) > Long.parseLong(String.valueOf(otherStart.partitionSequenceNumberMap.get(partitionId)))) { + res.set(true); + } + } + ); + if (res.get()) { + return 1; + } + return -1; + } else { + // Different streams + return -1; + } + } } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java b/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java index 3d596fb480c0..8e054a7fe225 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java @@ -36,7 +36,7 @@ @JsonSubTypes(value = { @JsonSubTypes.Type(name = "object", value = ObjectMetadata.class) }) -public interface DataSourceMetadata +public interface DataSourceMetadata extends Comparable { /** * Returns true if this instance should be considered a valid starting point for a new dataSource that has @@ -69,15 +69,6 @@ public interface DataSourceMetadata */ boolean matches(DataSourceMetadata other); - /** - * Returns true if the metadata in this instance is greater than the metadata in "other" - * - * Behavior is undefined if you pass in an instance of a different class from this one. - * - * @return true or false - */ - boolean isGreater(DataSourceMetadata other); - /** * Returns a copy of this instance with "other" merged in. Any conflicts should be resolved in favor of * information from "other". diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java b/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java index 6401427fe3cf..ea65ab450777 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java @@ -60,12 +60,6 @@ public boolean matches(DataSourceMetadata other) return equals(other); } - @Override - public boolean isGreater(DataSourceMetadata other) - { - return false; - } - @Override public DataSourceMetadata plus(DataSourceMetadata other) { @@ -102,4 +96,10 @@ public String toString() "theObject=" + theObject + '}'; } + + @Override + public int compareTo(DataSourceMetadata o) + { + return -1; + } } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 428ce8ec0e3d..941c1de38bad 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1558,16 +1558,16 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( } final boolean startMetadataMatchesExisting; - final boolean startMetadataGreaterthenExisting; + final int startMetadataGreaterThanExisting; if (oldCommitMetadataFromDb == null) { startMetadataMatchesExisting = startMetadata.isValidStart(); - startMetadataGreaterthenExisting = true; + startMetadataGreaterThanExisting = 1; } else { // Checking against the last committed metadata. - // If the new start sequence number is greater than the end sequence number of last commit. - // It's possible multiple tasks are publishing the sequence at around same time. - startMetadataGreaterthenExisting = startMetadata.asStartMetadata().isGreater(oldCommitMetadataFromDb.asStartMetadata()); + // If the new start sequence number is greater than the end sequence number of last commit compareTo() function will return 1, + // -1 in other cases. It might be because multiple tasks are publishing the sequence at around same time. + startMetadataGreaterThanExisting = startMetadata.asStartMetadata().compareTo(oldCommitMetadataFromDb.asStartMetadata()); // Converting the last one into start metadata for checking since only the same type of metadata can be matched. // Even though kafka/kinesis indexing services use different sequenceNumber types for representing @@ -1576,7 +1576,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( startMetadataMatchesExisting = startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata()); } - if (startMetadataGreaterthenExisting && !startMetadataMatchesExisting) { + if (startMetadataGreaterThanExisting == 1 && !startMetadataMatchesExisting) { // Offset stored in StartMetadata is Greater than the last commited metadata, // Then retry multiple task might be trying to publish the segment for same partitions. log.info( From 5b00e716f26b818ecf7ea729c8d161e7d8bfc874 Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Tue, 14 Nov 2023 15:14:08 +0530 Subject: [PATCH 14/25] Addressing comments --- .../DerivativeDataSourceMetadata.java | 6 ------ .../kafka/KafkaDataSourceMetadata.java | 20 +++++++++++++++++-- .../SeekableStreamDataSourceMetadata.java | 12 ----------- .../SeekableStreamEndSequenceNumbers.java | 10 ++++------ .../SeekableStreamSequenceNumbers.java | 10 +++++++++- .../SeekableStreamStartSequenceNumbers.java | 10 ++++------ .../indexing/overlord/DataSourceMetadata.java | 2 +- .../indexing/overlord/ObjectMetadata.java | 6 ------ .../IndexerSQLMetadataStorageCoordinator.java | 15 ++++++++------ 9 files changed, 45 insertions(+), 46 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/DerivativeDataSourceMetadata.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/DerivativeDataSourceMetadata.java index 2b3aca076595..0c38b2fd3bdf 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/DerivativeDataSourceMetadata.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/DerivativeDataSourceMetadata.java @@ -135,10 +135,4 @@ public String toString() ", metrics=" + metrics + '}'; } - - @Override - public int compareTo(DataSourceMetadata o) - { - return -1; - } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java index 81ea6de396b4..ecc016d9acfb 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java @@ -25,9 +25,12 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers; +import org.apache.druid.java.util.common.IAE; +import java.util.Comparator; + + +public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata implements Comparable { -public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata -{ @JsonCreator public KafkaDataSourceMetadata( @@ -57,4 +60,17 @@ protected SeekableStreamDataSourceMetadata createConcreteDataSour { return new KafkaDataSourceMetadata(seekableStreamSequenceNumbers); } + + @Override + public int compareTo(KafkaDataSourceMetadata other) + { + if (!getClass().equals(other.getClass())) { + throw new IAE( + "Expected instance of %s, got %s", + this.getClass().getName(), + other.getClass().getName() + ); + } + return getSeekableStreamSequenceNumbers().compareTo(other.getSeekableStreamSequenceNumbers(), Comparator.naturalOrder()); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java index 3895ebeb60a4..4ea1e992da0d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java @@ -59,18 +59,6 @@ public boolean matches(DataSourceMetadata other) return plus(other).equals(other.plus(this)); } - @Override - public int compareTo(DataSourceMetadata other) - { - if (!getClass().equals(other.getClass())) { - return -1; - } - final SeekableStreamDataSourceMetadata that = - (SeekableStreamDataSourceMetadata) other; - - return seekableStreamSequenceNumbers.compareTo(that.seekableStreamSequenceNumbers); - } - @Override public DataSourceMetadata plus(DataSourceMetadata other) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java index 799781073e57..0d682f752032 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.IAE; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -221,7 +222,7 @@ public String toString() } @Override - public int compareTo(SeekableStreamSequenceNumbers other) + public int compareTo(SeekableStreamSequenceNumbers other, Comparator comparator) { if (this.getClass() != other.getClass()) { throw new IAE( @@ -239,7 +240,7 @@ public int compareTo(SeekableStreamSequenceNumbers res = new AtomicReference<>(false); partitionSequenceNumberMap.forEach( (partitionId, sequenceOffset) -> { - if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && Long.parseLong(String.valueOf(sequenceOffset)) > Long.parseLong(String.valueOf(otherStart.partitionSequenceNumberMap.get(partitionId)))) { + if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && comparator.compare(sequenceOffset, otherStart.partitionSequenceNumberMap.get(partitionId)) > 0) { res.set(true); } } @@ -247,10 +248,7 @@ public int compareTo(SeekableStreamSequenceNumbers extends Comparable> +public interface SeekableStreamSequenceNumbers { /** * Returns the stream/topic name. @@ -61,4 +62,11 @@ SeekableStreamSequenceNumbers plus( SeekableStreamSequenceNumbers minus( SeekableStreamSequenceNumbers other ); + + /** + * Compare this and the other sequence offsets using comparator. + * Returns 1, if this sequence is ahead of the other. + * otherwise, Return 0 + */ + int compareTo(SeekableStreamSequenceNumbers seekableStreamSequenceNumbers, Comparator comparator); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java index 1cf33a608dda..dfcaf3010924 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java @@ -26,6 +26,7 @@ import javax.annotation.Nullable; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -242,7 +243,7 @@ public String toString() } @Override - public int compareTo(SeekableStreamSequenceNumbers other) + public int compareTo(SeekableStreamSequenceNumbers other, Comparator comparator) { if (this.getClass() != other.getClass()) { throw new IAE( @@ -260,7 +261,7 @@ public int compareTo(SeekableStreamSequenceNumbers res = new AtomicReference<>(false); partitionSequenceNumberMap.forEach( (partitionId, sequenceOffset) -> { - if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && Long.parseLong(String.valueOf(sequenceOffset)) > Long.parseLong(String.valueOf(otherStart.partitionSequenceNumberMap.get(partitionId)))) { + if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && comparator.compare(sequenceOffset, otherStart.partitionSequenceNumberMap.get(partitionId)) > 0) { res.set(true); } } @@ -268,10 +269,7 @@ public int compareTo(SeekableStreamSequenceNumbers +public interface DataSourceMetadata { /** * Returns true if this instance should be considered a valid starting point for a new dataSource that has diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java b/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java index ea65ab450777..7a8d01042484 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java @@ -96,10 +96,4 @@ public String toString() "theObject=" + theObject + '}'; } - - @Override - public int compareTo(DataSourceMetadata o) - { - return -1; - } } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 941c1de38bad..e02e02a9fdbd 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1558,7 +1558,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( } final boolean startMetadataMatchesExisting; - final int startMetadataGreaterThanExisting; + int startMetadataGreaterThanExisting = 0; if (oldCommitMetadataFromDb == null) { startMetadataMatchesExisting = startMetadata.isValidStart(); @@ -1566,8 +1566,10 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( } else { // Checking against the last committed metadata. // If the new start sequence number is greater than the end sequence number of last commit compareTo() function will return 1, - // -1 in other cases. It might be because multiple tasks are publishing the sequence at around same time. - startMetadataGreaterThanExisting = startMetadata.asStartMetadata().compareTo(oldCommitMetadataFromDb.asStartMetadata()); + // 0 in all other cases. It might be because multiple tasks are publishing the sequence at around same time. + if (startMetadata instanceof Comparable) { + startMetadataGreaterThanExisting = ((Comparable) startMetadata.asStartMetadata()).compareTo(oldCommitMetadataFromDb.asStartMetadata()); + } // Converting the last one into start metadata for checking since only the same type of metadata can be matched. // Even though kafka/kinesis indexing services use different sequenceNumber types for representing @@ -1580,12 +1582,13 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( // Offset stored in StartMetadata is Greater than the last commited metadata, // Then retry multiple task might be trying to publish the segment for same partitions. log.info( - "Retrying to update metadata, existing state[%s] in metadata store is behind the new start state[%s].", - oldCommitMetadataFromDb, - startMetadata + "Failed to update the metadata Store. The new start metadata: [%s] is ahead of last commited end state: [%s].", + oldCommitMetadataFromDb, + startMetadata ); return DataStoreMetadataUpdateResult.TRY_AGAIN; } + if (!startMetadataMatchesExisting) { // Not in the desired start state. log.error( From 1745550cc9b30041761f1db4bf019da12422387c Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Tue, 14 Nov 2023 15:19:51 +0530 Subject: [PATCH 15/25] fixing Indentation --- .../SegmentTransactionalInsertActionTest.java | 85 +++++++++---------- 1 file changed, 42 insertions(+), 43 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index 458dd9285f08..08907053a8aa 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -174,53 +174,52 @@ public long getBatchAllocationWaitTime() // With different start and end offsets. Segment2 -> {1 - 2}, Segment1 -> {null - 1} Future result2Future = CompletableFuture.supplyAsync(() -> { return SegmentTransactionalInsertAction.appendAction( - ImmutableSet.of(SEGMENT2), - new ObjectMetadata(ImmutableList.of(1)), - new ObjectMetadata(ImmutableList.of(2)) + ImmutableSet.of(SEGMENT2), + new ObjectMetadata(ImmutableList.of(1)), + new ObjectMetadata(ImmutableList.of(2)) ).perform( - task2, - new TaskActionToolbox( - taskLockbox2, - taskActionToolbox.getTaskStorage(), - taskActionToolbox.getIndexerMetadataStorageCoordinator(), - new SegmentAllocationQueue( - taskLockbox2, - taskLockConfig, - taskActionToolbox.getIndexerMetadataStorageCoordinator(), - taskActionToolbox.getEmitter(), - ScheduledExecutors::fixed - ), - taskActionToolbox.getEmitter(), - EasyMock.createMock(SupervisorManager.class), - taskActionToolbox.getJsonMapper() - ) + task2, + new TaskActionToolbox( + taskLockbox2, + taskActionToolbox.getTaskStorage(), + taskActionToolbox.getIndexerMetadataStorageCoordinator(), + new SegmentAllocationQueue( + taskLockbox2, + taskLockConfig, + taskActionToolbox.getIndexerMetadataStorageCoordinator(), + taskActionToolbox.getEmitter(), + ScheduledExecutors::fixed + ), + taskActionToolbox.getEmitter(), + EasyMock.createMock(SupervisorManager.class), + taskActionToolbox.getJsonMapper() + ) ); }); Future result1Future = CompletableFuture.supplyAsync(() -> { return SegmentTransactionalInsertAction.appendAction( - ImmutableSet.of(SEGMENT1), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableList.of(1)) + ImmutableSet.of(SEGMENT1), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableList.of(1)) ).perform( - task1, - new TaskActionToolbox( - taskLockbox1, - taskActionToolbox.getTaskStorage(), - taskActionToolbox.getIndexerMetadataStorageCoordinator(), - new SegmentAllocationQueue( - taskLockbox1, - taskLockConfig, - taskActionToolbox.getIndexerMetadataStorageCoordinator(), - taskActionToolbox.getEmitter(), - ScheduledExecutors::fixed - ), - taskActionToolbox.getEmitter(), - EasyMock.createMock(SupervisorManager.class), - taskActionToolbox.getJsonMapper() - ) - + task1, + new TaskActionToolbox( + taskLockbox1, + taskActionToolbox.getTaskStorage(), + taskActionToolbox.getIndexerMetadataStorageCoordinator(), + new SegmentAllocationQueue( + taskLockbox1, + taskLockConfig, + taskActionToolbox.getIndexerMetadataStorageCoordinator(), + taskActionToolbox.getEmitter(), + ScheduledExecutors::fixed + ), + taskActionToolbox.getEmitter(), + EasyMock.createMock(SupervisorManager.class), + taskActionToolbox.getJsonMapper() + ) ); }); @@ -231,13 +230,13 @@ public long getBatchAllocationWaitTime() Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT2)), result2); Assertions.assertThat( - actionTestKit.getMetadataStorageCoordinator() - .retrieveUsedSegmentsForInterval(DATA_SOURCE, INTERVAL, Segments.ONLY_VISIBLE) + actionTestKit.getMetadataStorageCoordinator() + .retrieveUsedSegmentsForInterval(DATA_SOURCE, INTERVAL, Segments.ONLY_VISIBLE) ).containsExactlyInAnyOrder(SEGMENT1, SEGMENT2); Assert.assertEquals( - new ObjectMetadata(ImmutableList.of(2)), - actionTestKit.getMetadataStorageCoordinator().retrieveDataSourceMetadata(DATA_SOURCE) + new ObjectMetadata(ImmutableList.of(2)), + actionTestKit.getMetadataStorageCoordinator().retrieveDataSourceMetadata(DATA_SOURCE) ); } From 5a3869a2446c883c02e79d9965c9b2017859a58f Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Tue, 14 Nov 2023 15:25:43 +0530 Subject: [PATCH 16/25] minor change --- .../apache/druid/indexing/kafka/KafkaDataSourceMetadata.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java index ecc016d9acfb..80acb783af57 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java @@ -29,7 +29,7 @@ import java.util.Comparator; -public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata implements Comparable { +public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata implements Comparable { @JsonCreator From f5a4ed283b79548367fc21be77d3fda5bf15d8f3 Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Wed, 15 Nov 2023 14:17:32 +0530 Subject: [PATCH 17/25] Fix checkstyle --- .../apache/druid/indexing/kafka/KafkaDataSourceMetadata.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java index 80acb783af57..d2eb9f0c2261 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java @@ -29,7 +29,8 @@ import java.util.Comparator; -public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata implements Comparable { +public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata implements Comparable +{ @JsonCreator From 24b86fd547f68c4fc73f12ad112ee1dc7d07c0a4 Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Wed, 20 Dec 2023 16:44:39 +0530 Subject: [PATCH 18/25] updating PR as per upstream PR --- .../SegmentTransactionalInsertActionTest.java | 99 ----------- .../common/actions/TaskActionTestKit.java | 5 - .../IndexerSQLMetadataStorageCoordinator.java | 2 +- .../appenderator/BaseAppenderatorDriver.java | 166 ++++++++++-------- .../appenderator/BatchAppenderatorDriver.java | 3 +- ...exerSQLMetadataStorageCoordinatorTest.java | 5 +- 6 files changed, 93 insertions(+), 187 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index 08907053a8aa..b54b9b0d912e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -141,105 +141,6 @@ public void testTransactionalUpdateDataSourceMetadata() throws Exception ); } - @Test - public void testTransactionalUpdateDataSourceMetadataWithRecoverFromMetadataMismatch() throws Exception - { - final Task task1 = NoopTask.create(); - final TaskLockbox taskLockbox1 = new TaskLockbox(actionTestKit.getTaskStorage(), actionTestKit.getMetadataStorageCoordinator()); - taskLockbox1.add(task1); - taskLockbox1.lock(task1, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, task1, INTERVAL, null), 5000); - - final Task task2 = NoopTask.create(); - final TaskLockbox taskLockbox2 = new TaskLockbox(actionTestKit.getTaskStorage(), actionTestKit.getMetadataStorageCoordinator()); - taskLockbox2.add(task2); - taskLockbox2.lock(task2, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, task2, INTERVAL, null), 5000); - - final TaskLockConfig taskLockConfig = new TaskLockConfig() - { - @Override - public boolean isBatchSegmentAllocation() - { - return true; - } - - @Override - public long getBatchAllocationWaitTime() - { - return 10L; - } - }; - TaskActionToolbox taskActionToolbox = actionTestKit.getTaskActionToolbox(); - - // Task1 and Task2 tries to publish segment1 and segment2 for same partition at around same time. - // With different start and end offsets. Segment2 -> {1 - 2}, Segment1 -> {null - 1} - Future result2Future = CompletableFuture.supplyAsync(() -> { - return SegmentTransactionalInsertAction.appendAction( - ImmutableSet.of(SEGMENT2), - new ObjectMetadata(ImmutableList.of(1)), - new ObjectMetadata(ImmutableList.of(2)) - ).perform( - task2, - new TaskActionToolbox( - taskLockbox2, - taskActionToolbox.getTaskStorage(), - taskActionToolbox.getIndexerMetadataStorageCoordinator(), - new SegmentAllocationQueue( - taskLockbox2, - taskLockConfig, - taskActionToolbox.getIndexerMetadataStorageCoordinator(), - taskActionToolbox.getEmitter(), - ScheduledExecutors::fixed - ), - taskActionToolbox.getEmitter(), - EasyMock.createMock(SupervisorManager.class), - taskActionToolbox.getJsonMapper() - ) - ); - }); - - - Future result1Future = CompletableFuture.supplyAsync(() -> { - return SegmentTransactionalInsertAction.appendAction( - ImmutableSet.of(SEGMENT1), - new ObjectMetadata(null), - new ObjectMetadata(ImmutableList.of(1)) - ).perform( - task1, - new TaskActionToolbox( - taskLockbox1, - taskActionToolbox.getTaskStorage(), - taskActionToolbox.getIndexerMetadataStorageCoordinator(), - new SegmentAllocationQueue( - taskLockbox1, - taskLockConfig, - taskActionToolbox.getIndexerMetadataStorageCoordinator(), - taskActionToolbox.getEmitter(), - ScheduledExecutors::fixed - ), - taskActionToolbox.getEmitter(), - EasyMock.createMock(SupervisorManager.class), - taskActionToolbox.getJsonMapper() - ) - ); - }); - - SegmentPublishResult result2 = result2Future.get(); - SegmentPublishResult result1 = result1Future.get(); - - Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT1)), result1); - Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT2)), result2); - - Assertions.assertThat( - actionTestKit.getMetadataStorageCoordinator() - .retrieveUsedSegmentsForInterval(DATA_SOURCE, INTERVAL, Segments.ONLY_VISIBLE) - ).containsExactlyInAnyOrder(SEGMENT1, SEGMENT2); - - Assert.assertEquals( - new ObjectMetadata(ImmutableList.of(2)), - actionTestKit.getMetadataStorageCoordinator().retrieveDataSourceMetadata(DATA_SOURCE) - ); - } - @Test public void testTransactionalDropSegments() throws Exception { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index abe551ce2cf7..eebf78a7ddcb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -64,11 +64,6 @@ public IndexerMetadataStorageCoordinator getMetadataStorageCoordinator() return metadataStorageCoordinator; } - public TaskStorage getTaskStorage() - { - return taskStorage; - } - public SegmentsMetadataManager getSegmentsMetadataManager() { return segmentsMetadataManager; diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index e02e02a9fdbd..a47b03b63a41 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1586,7 +1586,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( oldCommitMetadataFromDb, startMetadata ); - return DataStoreMetadataUpdateResult.TRY_AGAIN; + return DataStoreMetadataUpdateResult.FAILURE; } if (!startMetadataMatchesExisting) { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index 65a8a676d3d2..1340f337ce62 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -38,6 +38,7 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; @@ -61,6 +62,7 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -565,8 +567,7 @@ ListenableFuture publishInBackground( SegmentsAndCommitMetadata segmentsAndCommitMetadata, TransactionalSegmentPublisher publisher, java.util.function.Function, Set> outputSegmentsAnnotateFunction - ) - { + ) throws Exception { final Set pushedAndTombstones = new HashSet<>(segmentsAndCommitMetadata.getSegments()); if (tombstones != null) { pushedAndTombstones.addAll(tombstones); @@ -595,93 +596,104 @@ ListenableFuture publishInBackground( final Object callerMetadata = metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata(); - return executor.submit( - () -> { - try { - final ImmutableSet ourSegments = ImmutableSet.copyOf(pushedAndTombstones); - final SegmentPublishResult publishResult = publisher.publishSegments( - segmentsToBeOverwritten, - segmentsToBeDropped, - ourSegments, - outputSegmentsAnnotateFunction, - callerMetadata - ); - - if (publishResult.isSuccess()) { - log.info( - "Published [%s] segments with commit metadata [%s]", - segmentsAndCommitMetadata.getSegments().size(), + return RetryUtils.retry( + () -> executor.submit( + () -> { + try { + final ImmutableSet ourSegments = ImmutableSet.copyOf(pushedAndTombstones); + final SegmentPublishResult publishResult = publisher.publishSegments( + segmentsToBeOverwritten, + segmentsToBeDropped, + ourSegments, + outputSegmentsAnnotateFunction, callerMetadata ); - log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments"); - } else { - // Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active - // now after all, for two possible reasons: - // - // 1) A replica may have beat us to publishing these segments. In this case we want to delete the - // segments we pushed (if they had unique paths) to avoid wasting space on deep storage. - // 2) We may have actually succeeded, but not realized it due to missing the confirmation response - // from the overlord. In this case we do not want to delete the segments we pushed, since they are - // now live! - - final Set segmentsIdentifiers = segmentsAndCommitMetadata - .getSegments() - .stream() - .map(SegmentIdWithShardSpec::fromDataSegment) - .collect(Collectors.toSet()); - - final Set activeSegments = usedSegmentChecker.findUsedSegments(segmentsIdentifiers); - - if (activeSegments.equals(ourSegments)) { + + if (publishResult.isSuccess()) { log.info( - "Could not publish [%s] segments, but checked and found them already published; continuing.", - ourSegments.size() - ); - log.infoSegments( - segmentsAndCommitMetadata.getSegments(), - "Could not publish segments" + "Published [%s] segments with commit metadata [%s]", + segmentsAndCommitMetadata.getSegments().size(), + callerMetadata ); - - // Clean up pushed segments if they are physically disjoint from the published ones (this means - // they were probably pushed by a replica, and with the unique paths option). - final boolean physicallyDisjoint = Sets.intersection( - activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()), - ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()) - ).isEmpty(); - - if (physicallyDisjoint) { - segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); - } + log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments"); } else { - // Our segments aren't active. Publish failed for some reason. Clean them up and then throw an error. - segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); - - if (publishResult.getErrorMsg() != null) { - log.errorSegments(ourSegments, "Failed to publish segments"); - throw new ISE( - "Failed to publish segments because of [%s]", - publishResult.getErrorMsg() + // Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active + // now after all, for two possible reasons: + // + // 1) A replica may have beat us to publishing these segments. In this case we want to delete the + // segments we pushed (if they had unique paths) to avoid wasting space on deep storage. + // 2) We may have actually succeeded, but not realized it due to missing the confirmation response + // from the overlord. In this case we do not want to delete the segments we pushed, since they are + // now live! + + final Set segmentsIdentifiers = segmentsAndCommitMetadata + .getSegments() + .stream() + .map(SegmentIdWithShardSpec::fromDataSegment) + .collect(Collectors.toSet()); + + final Set activeSegments = usedSegmentChecker.findUsedSegments(segmentsIdentifiers); + + if (activeSegments.equals(ourSegments)) { + log.info( + "Could not publish [%s] segments, but checked and found them already published; continuing.", + ourSegments.size() + ); + log.infoSegments( + segmentsAndCommitMetadata.getSegments(), + "Could not publish segments" ); + + // Clean up pushed segments if they are physically disjoint from the published ones (this means + // they were probably pushed by a replica, and with the unique paths option). + final boolean physicallyDisjoint = Sets.intersection( + activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()), + ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()) + ).isEmpty(); + + if (physicallyDisjoint) { + segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); + } } else { log.errorSegments(ourSegments, "Failed to publish segments"); - throw new ISE("Failed to publish segments"); + if (publishResult.getErrorMsg() != null && publishResult.getErrorMsg().contains(("Aborting transaction!"))) { + throw new ISE(publishResult.getErrorMsg()); + } + // Our segments aren't active. Publish failed for some reason. Clean them up and then throw an error. + segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); + + if (publishResult.getErrorMsg() != null) { + throw new ISE( + "Failed to publish segments because of [%s]", + publishResult.getErrorMsg() + ); + } else { + log.errorSegments(ourSegments, "Failed to publish segments"); + throw new ISE("Failed to publish segments"); + } } } } + catch (Exception e) { + // Must not remove segments here, we aren't sure if our transaction succeeded or not. + log.noStackTrace().warn(e, "Failed publish"); + log.warnSegments( + segmentsAndCommitMetadata.getSegments(), + "Failed publish, not removing segments" + ); + if (e.getMessage() != null && e.getMessage().contains("Aborting transaction!")) { + // we can recover from this error. + throw new ExecutionException(e); + } else { + Throwables.propagateIfPossible(e); + throw new RuntimeException(e); + } + } + return segmentsAndCommitMetadata; } - catch (Exception e) { - // Must not remove segments here, we aren't sure if our transaction succeeded or not. - log.noStackTrace().warn(e, "Failed publish"); - log.warnSegments( - segmentsAndCommitMetadata.getSegments(), - "Failed publish, not removing segments" - ); - Throwables.propagateIfPossible(e); - throw new RuntimeException(e); - } - - return segmentsAndCommitMetadata; - } + ), + e -> (e instanceof ExecutionException), + RetryUtils.DEFAULT_MAX_TRIES ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java index 7a99f200bea6..15f075940ee4 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java @@ -203,8 +203,7 @@ public ListenableFuture publishAll( @Nullable final Set tombstones, final TransactionalSegmentPublisher publisher, final Function, Set> outputSegmentsAnnotateFunction - ) - { + ) throws Exception { final Map snapshot; synchronized (segments) { snapshot = ImmutableMap.copyOf(segments); diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 054de41f2559..f7af9611e51a 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -680,11 +680,10 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result1); + Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result1); // Should only be tried once. - // Now, we will retry for this test case as well, So it will be equal to total retries available which is 2. - Assert.assertEquals(2, metadataUpdateCounter.get()); + Assert.assertEquals(1, metadataUpdateCounter.get()); } @Test From 543a42ddd18a3878e35dc8772248e3a8e4245633 Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Wed, 27 Dec 2023 11:39:29 +0530 Subject: [PATCH 19/25] Fix build --- .../java/org/apache/druid/indexing/common/task/IndexTask.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 288107d0b944..88ee3fbfe1d6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -844,8 +844,7 @@ private TaskStatus generateAndPublishSegments( final InputSource inputSource, final File tmpDir, final PartitionAnalysis partitionAnalysis - ) throws IOException, InterruptedException - { + ) throws Exception { final FireDepartment fireDepartmentForMetrics = new FireDepartment(dataSchema, new RealtimeIOConfig(null, null), null); FireDepartmentMetrics buildSegmentsFireDepartmentMetrics = fireDepartmentForMetrics.getMetrics(); From f7b9c63f45c175b9a21e1834e31a7fd15af49c10 Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Wed, 27 Dec 2023 11:54:53 +0530 Subject: [PATCH 20/25] Fix build --- .../segment/realtime/appenderator/BaseAppenderatorDriver.java | 3 ++- .../segment/realtime/appenderator/BatchAppenderatorDriver.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index 1340f337ce62..dd2259cd6e3e 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -567,7 +567,8 @@ ListenableFuture publishInBackground( SegmentsAndCommitMetadata segmentsAndCommitMetadata, TransactionalSegmentPublisher publisher, java.util.function.Function, Set> outputSegmentsAnnotateFunction - ) throws Exception { + ) throws Exception + { final Set pushedAndTombstones = new HashSet<>(segmentsAndCommitMetadata.getSegments()); if (tombstones != null) { pushedAndTombstones.addAll(tombstones); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java index 15f075940ee4..d3ea7d562154 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java @@ -203,7 +203,8 @@ public ListenableFuture publishAll( @Nullable final Set tombstones, final TransactionalSegmentPublisher publisher, final Function, Set> outputSegmentsAnnotateFunction - ) throws Exception { + ) throws Exception + { final Map snapshot; synchronized (segments) { snapshot = ImmutableMap.copyOf(segments); From 14320373f94242164bb8118400eef87c847c6c70 Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Wed, 27 Dec 2023 13:00:21 +0530 Subject: [PATCH 21/25] Fix build --- .../org/apache/druid/indexing/common/task/IndexTask.java | 3 ++- .../actions/SegmentTransactionalInsertActionTest.java | 9 +-------- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 88ee3fbfe1d6..6b73b43b20e1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -844,7 +844,8 @@ private TaskStatus generateAndPublishSegments( final InputSource inputSource, final File tmpDir, final PartitionAnalysis partitionAnalysis - ) throws Exception { + ) throws Exception + { final FireDepartment fireDepartmentForMetrics = new FireDepartment(dataSchema, new RealtimeIOConfig(null, null), null); FireDepartmentMetrics buildSegmentsFireDepartmentMetrics = fireDepartmentForMetrics.getMetrics(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index b54b9b0d912e..5f50e4abf532 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -29,24 +29,17 @@ import org.apache.druid.indexing.overlord.ObjectMetadata; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; -import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TimeChunkLockRequest; -import org.apache.druid.indexing.overlord.config.TaskLockConfig; -import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.LinearShardSpec; import org.assertj.core.api.Assertions; -import org.easymock.EasyMock; import org.hamcrest.CoreMatchers; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; public class SegmentTransactionalInsertActionTest { @@ -190,7 +183,7 @@ public void testFailTransactionalUpdateDataSourceMetadata() throws Exception actionTestKit.getTaskActionToolbox() ); - Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result); + Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result); } @Test From 5b8519c822c56c358a19acfc6362bef8ab51f52b Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Thu, 4 Jan 2024 14:48:29 +0530 Subject: [PATCH 22/25] Fix retry Logic --- .../druid/indexing/common/task/IndexTask.java | 2 +- .../appenderator/BaseAppenderatorDriver.java | 176 +++++++++--------- .../appenderator/BatchAppenderatorDriver.java | 2 +- 3 files changed, 91 insertions(+), 89 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 6b73b43b20e1..288107d0b944 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -844,7 +844,7 @@ private TaskStatus generateAndPublishSegments( final InputSource inputSource, final File tmpDir, final PartitionAnalysis partitionAnalysis - ) throws Exception + ) throws IOException, InterruptedException { final FireDepartment fireDepartmentForMetrics = new FireDepartment(dataSchema, new RealtimeIOConfig(null, null), null); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index dd2259cd6e3e..372a6e2a1190 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -567,7 +567,7 @@ ListenableFuture publishInBackground( SegmentsAndCommitMetadata segmentsAndCommitMetadata, TransactionalSegmentPublisher publisher, java.util.function.Function, Set> outputSegmentsAnnotateFunction - ) throws Exception + ) { final Set pushedAndTombstones = new HashSet<>(segmentsAndCommitMetadata.getSegments()); if (tombstones != null) { @@ -597,104 +597,106 @@ ListenableFuture publishInBackground( final Object callerMetadata = metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata(); - return RetryUtils.retry( - () -> executor.submit( - () -> { - try { - final ImmutableSet ourSegments = ImmutableSet.copyOf(pushedAndTombstones); - final SegmentPublishResult publishResult = publisher.publishSegments( - segmentsToBeOverwritten, - segmentsToBeDropped, - ourSegments, - outputSegmentsAnnotateFunction, - callerMetadata - ); - - if (publishResult.isSuccess()) { - log.info( - "Published [%s] segments with commit metadata [%s]", - segmentsAndCommitMetadata.getSegments().size(), + return executor.submit( + () -> RetryUtils.retry( + () -> { + try { + final ImmutableSet ourSegments = ImmutableSet.copyOf(pushedAndTombstones); + final SegmentPublishResult publishResult = publisher.publishSegments( + segmentsToBeOverwritten, + segmentsToBeDropped, + ourSegments, + outputSegmentsAnnotateFunction, callerMetadata ); - log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments"); - } else { - // Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active - // now after all, for two possible reasons: - // - // 1) A replica may have beat us to publishing these segments. In this case we want to delete the - // segments we pushed (if they had unique paths) to avoid wasting space on deep storage. - // 2) We may have actually succeeded, but not realized it due to missing the confirmation response - // from the overlord. In this case we do not want to delete the segments we pushed, since they are - // now live! - - final Set segmentsIdentifiers = segmentsAndCommitMetadata - .getSegments() - .stream() - .map(SegmentIdWithShardSpec::fromDataSegment) - .collect(Collectors.toSet()); - - final Set activeSegments = usedSegmentChecker.findUsedSegments(segmentsIdentifiers); - - if (activeSegments.equals(ourSegments)) { + + if (publishResult.isSuccess()) { log.info( - "Could not publish [%s] segments, but checked and found them already published; continuing.", - ourSegments.size() - ); - log.infoSegments( - segmentsAndCommitMetadata.getSegments(), - "Could not publish segments" + "Published [%s] segments with commit metadata [%s]", + segmentsAndCommitMetadata.getSegments().size(), + callerMetadata ); - - // Clean up pushed segments if they are physically disjoint from the published ones (this means - // they were probably pushed by a replica, and with the unique paths option). - final boolean physicallyDisjoint = Sets.intersection( - activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()), - ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()) - ).isEmpty(); - - if (physicallyDisjoint) { - segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); - } + log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments"); } else { - log.errorSegments(ourSegments, "Failed to publish segments"); - if (publishResult.getErrorMsg() != null && publishResult.getErrorMsg().contains(("Aborting transaction!"))) { - throw new ISE(publishResult.getErrorMsg()); - } - // Our segments aren't active. Publish failed for some reason. Clean them up and then throw an error. - segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); - - if (publishResult.getErrorMsg() != null) { - throw new ISE( - "Failed to publish segments because of [%s]", - publishResult.getErrorMsg() + // Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active + // now after all, for two possible reasons: + // + // 1) A replica may have beat us to publishing these segments. In this case we want to delete the + // segments we pushed (if they had unique paths) to avoid wasting space on deep storage. + // 2) We may have actually succeeded, but not realized it due to missing the confirmation response + // from the overlord. In this case we do not want to delete the segments we pushed, since they are + // now live! + + final Set segmentsIdentifiers = segmentsAndCommitMetadata + .getSegments() + .stream() + .map(SegmentIdWithShardSpec::fromDataSegment) + .collect(Collectors.toSet()); + + final Set activeSegments = usedSegmentChecker.findUsedSegments(segmentsIdentifiers); + + if (activeSegments.equals(ourSegments)) { + log.info( + "Could not publish [%s] segments, but checked and found them already published; continuing.", + ourSegments.size() + ); + log.infoSegments( + segmentsAndCommitMetadata.getSegments(), + "Could not publish segments" ); + + // Clean up pushed segments if they are physically disjoint from the published ones (this means + // they were probably pushed by a replica, and with the unique paths option). + final boolean physicallyDisjoint = Sets.intersection( + activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()), + ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()) + ).isEmpty(); + + if (physicallyDisjoint) { + segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); + } } else { log.errorSegments(ourSegments, "Failed to publish segments"); - throw new ISE("Failed to publish segments"); + if (publishResult.getErrorMsg() != null && publishResult.getErrorMsg().contains(("Aborting transaction!"))) { + log.info("Got the Error, Aborting Transaction!"); + throw new ISE(publishResult.getErrorMsg()); + } + // Our segments aren't active. Publish failed for some reason. Clean them up and then throw an error. + segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); + + if (publishResult.getErrorMsg() != null) { + throw new ISE( + "Failed to publish segments because of [%s]", + publishResult.getErrorMsg() + ); + } else { + log.errorSegments(ourSegments, "Failed to publish segments"); + throw new ISE("Failed to publish segments"); + } } } } - } - catch (Exception e) { - // Must not remove segments here, we aren't sure if our transaction succeeded or not. - log.noStackTrace().warn(e, "Failed publish"); - log.warnSegments( - segmentsAndCommitMetadata.getSegments(), - "Failed publish, not removing segments" - ); - if (e.getMessage() != null && e.getMessage().contains("Aborting transaction!")) { - // we can recover from this error. - throw new ExecutionException(e); - } else { - Throwables.propagateIfPossible(e); - throw new RuntimeException(e); + catch (Exception e) { + // Must not remove segments here, we aren't sure if our transaction succeeded or not. + log.noStackTrace().warn(e, "Failed publish"); + log.warnSegments( + segmentsAndCommitMetadata.getSegments(), + "Failed publish, not removing segments" + ); + if (e.getMessage() != null && e.getMessage().contains("Aborting transaction!")) { + // we can recover from this error. + log.info("Retrying for this case"); + throw new ExecutionException(e); + } else { + Throwables.propagateIfPossible(e); + throw new RuntimeException(e); + } } - } - return segmentsAndCommitMetadata; - } - ), - e -> (e instanceof ExecutionException), - RetryUtils.DEFAULT_MAX_TRIES + return segmentsAndCommitMetadata; + }, + e -> (e instanceof ExecutionException), + RetryUtils.DEFAULT_MAX_TRIES + ) ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java index d3ea7d562154..7a99f200bea6 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java @@ -203,7 +203,7 @@ public ListenableFuture publishAll( @Nullable final Set tombstones, final TransactionalSegmentPublisher publisher, final Function, Set> outputSegmentsAnnotateFunction - ) throws Exception + ) { final Map snapshot; synchronized (segments) { From b4318d6af1b21b46085e912ca64935f7ca36067a Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Mon, 8 Jan 2024 15:25:41 +0530 Subject: [PATCH 23/25] Addressing review comments --- .../kafka/KafkaDataSourceMetadata.java | 2 + .../SeekableStreamEndSequenceNumbers.java | 23 ++++++----- .../SeekableStreamStartSequenceNumbers.java | 23 ++++++----- .../SeekableStreamEndSequenceNumbersTest.java | 37 +++++++++++++++++ ...eekableStreamStartSequenceNumbersTest.java | 41 +++++++++++++++++++ 5 files changed, 106 insertions(+), 20 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java index d2eb9f0c2261..427a6e0e8748 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java @@ -63,6 +63,8 @@ protected SeekableStreamDataSourceMetadata createConcreteDataSour } @Override + // This method is to compare KafkaDataSourceMetadata. + // It compares this and other SeekableStreamSequenceNumbers using naturalOrder comparator. public int compareTo(KafkaDataSourceMetadata other) { if (!getClass().equals(other.getClass())) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java index 0d682f752032..7c23a9da1297 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java @@ -30,7 +30,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; /** * Represents the end sequenceNumber per partition of a sequence. Note that end sequenceNumbers are always @@ -237,15 +236,19 @@ public int compareTo(SeekableStreamSequenceNumbers res = new AtomicReference<>(false); - partitionSequenceNumberMap.forEach( - (partitionId, sequenceOffset) -> { - if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && comparator.compare(sequenceOffset, otherStart.partitionSequenceNumberMap.get(partitionId)) > 0) { - res.set(true); - } - } - ); - if (res.get()) { + boolean res = false; + for (Map.Entry entry : partitionSequenceNumberMap.entrySet()) { + PartitionIdType partitionId = entry.getKey(); + SequenceOffsetType sequenceOffset = entry.getValue(); + if (!otherStart.partitionSequenceNumberMap.containsKey(partitionId)) { + throw new IAE("%th Partition is not present in old commited metadata", partitionId); + } + if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && comparator.compare(sequenceOffset, otherStart.partitionSequenceNumberMap.get(partitionId)) > 0) { + res = true; + break; + } + } + if (res) { return 1; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java index dfcaf3010924..8b07d7b8f61c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java @@ -33,7 +33,6 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; /** * Represents the start sequenceNumber per partition of a sequence. This class keeps an additional set of @@ -258,15 +257,19 @@ public int compareTo(SeekableStreamSequenceNumbers res = new AtomicReference<>(false); - partitionSequenceNumberMap.forEach( - (partitionId, sequenceOffset) -> { - if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && comparator.compare(sequenceOffset, otherStart.partitionSequenceNumberMap.get(partitionId)) > 0) { - res.set(true); - } - } - ); - if (res.get()) { + boolean res = false; + for (Map.Entry entry : partitionSequenceNumberMap.entrySet()) { + PartitionIdType partitionId = entry.getKey(); + SequenceOffsetType sequenceOffset = entry.getValue(); + if (!otherStart.partitionSequenceNumberMap.containsKey(partitionId)) { + throw new IAE("%th Partition is not present in old commited metadata", partitionId); + } + if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && comparator.compare(sequenceOffset, otherStart.partitionSequenceNumberMap.get(partitionId)) > 0) { + res = true; + break; + } + } + if (res) { return 1; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbersTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbersTest.java index 691f579d8e60..9cf29d6cbd25 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbersTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbersTest.java @@ -28,6 +28,7 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Comparator; import java.util.Map; public class SeekableStreamEndSequenceNumbersTest @@ -95,4 +96,40 @@ public void testConvertToStart() endSequenceNumbers.asStartPartitions(true) ); } + + @Test + public void testCompareToWithTrueResult() + { + final String stream = "theStream"; + final Map offsetMap1 = ImmutableMap.of(1, 5L, 2, 6L); + final SeekableStreamEndSequenceNumbers partitions1 = new SeekableStreamEndSequenceNumbers<>( + stream, + offsetMap1 + ); + + final Map offsetMap2 = ImmutableMap.of(1, 4L, 2, 4L); + final SeekableStreamEndSequenceNumbers partitions2 = new SeekableStreamEndSequenceNumbers<>( + stream, + offsetMap2 + ); + Assert.assertEquals(1, partitions1.compareTo(partitions2, Comparator.naturalOrder())); + } + + @Test + public void testCompareToWithFalseResult() + { + final String stream = "theStream"; + final Map offsetMap1 = ImmutableMap.of(1, 3L, 2, 2L); + final SeekableStreamEndSequenceNumbers partitions1 = new SeekableStreamEndSequenceNumbers<>( + stream, + offsetMap1 + ); + + final Map offsetMap2 = ImmutableMap.of(1, 4L, 2, 4L); + final SeekableStreamEndSequenceNumbers partitions2 = new SeekableStreamEndSequenceNumbers<>( + stream, + offsetMap2 + ); + Assert.assertEquals(0, partitions1.compareTo(partitions2, Comparator.naturalOrder())); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java index f4342e27acbc..f632cf61ecf2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java @@ -28,6 +28,7 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Comparator; import java.util.Map; public class SeekableStreamStartSequenceNumbersTest @@ -74,4 +75,44 @@ public void testSerde() throws Exception OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new TypeReference>() {}) ); } + + @Test + public void testCompareToWithTrueResult() + { + final String stream = "theStream"; + final Map offsetMap1 = ImmutableMap.of(1, 5L, 2, 6L); + final SeekableStreamStartSequenceNumbers partitions1 = new SeekableStreamStartSequenceNumbers<>( + stream, + offsetMap1, + ImmutableSet.of(6) + ); + + final Map offsetMap2 = ImmutableMap.of(1, 4L, 2, 4L); + final SeekableStreamStartSequenceNumbers partitions2 = new SeekableStreamStartSequenceNumbers<>( + stream, + offsetMap2, + ImmutableSet.of(6) + ); + Assert.assertEquals(1, partitions1.compareTo(partitions2, Comparator.naturalOrder())); + } + + @Test + public void testCompareToWithFalseResult() + { + final String stream = "theStream"; + final Map offsetMap1 = ImmutableMap.of(1, 3L, 2, 2L); + final SeekableStreamStartSequenceNumbers partitions1 = new SeekableStreamStartSequenceNumbers<>( + stream, + offsetMap1, + ImmutableSet.of(6) + ); + + final Map offsetMap2 = ImmutableMap.of(1, 4L, 2, 4L); + final SeekableStreamStartSequenceNumbers partitions2 = new SeekableStreamStartSequenceNumbers<>( + stream, + offsetMap2, + ImmutableSet.of(6) + ); + Assert.assertEquals(0, partitions1.compareTo(partitions2, Comparator.naturalOrder())); + } } From ff387c12c8f450a0013ebc108764bf742ad76f7a Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Mon, 8 Jan 2024 17:10:22 +0530 Subject: [PATCH 24/25] Fixing test cases --- .../org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java | 2 +- .../apache/druid/indexing/kinesis/KinesisIndexTaskTest.java | 2 +- .../seekablestream/SeekableStreamEndSequenceNumbers.java | 3 --- .../seekablestream/SeekableStreamStartSequenceNumbers.java | 3 --- 4 files changed, 2 insertions(+), 8 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 3a9000c3d6e9..6499094d3035 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1861,7 +1861,7 @@ public void testRunReplicas() throws Exception ); } - @Test(timeout = 60_000L) + @Test public void testRunConflicting() throws Exception { final KafkaIndexTask task1 = createTask( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 99f939433f7d..ee805469016a 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -1703,7 +1703,7 @@ public void testRunReplicas() throws Exception } - @Test(timeout = 120_000L) + @Test public void testRunConflicting() throws Exception { recordSupplier.assign(EasyMock.anyObject()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java index 7c23a9da1297..7aa3995dc70f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java @@ -240,9 +240,6 @@ public int compareTo(SeekableStreamSequenceNumbers entry : partitionSequenceNumberMap.entrySet()) { PartitionIdType partitionId = entry.getKey(); SequenceOffsetType sequenceOffset = entry.getValue(); - if (!otherStart.partitionSequenceNumberMap.containsKey(partitionId)) { - throw new IAE("%th Partition is not present in old commited metadata", partitionId); - } if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && comparator.compare(sequenceOffset, otherStart.partitionSequenceNumberMap.get(partitionId)) > 0) { res = true; break; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java index 8b07d7b8f61c..a3bc01dcb2ff 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java @@ -261,9 +261,6 @@ public int compareTo(SeekableStreamSequenceNumbers entry : partitionSequenceNumberMap.entrySet()) { PartitionIdType partitionId = entry.getKey(); SequenceOffsetType sequenceOffset = entry.getValue(); - if (!otherStart.partitionSequenceNumberMap.containsKey(partitionId)) { - throw new IAE("%th Partition is not present in old commited metadata", partitionId); - } if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && comparator.compare(sequenceOffset, otherStart.partitionSequenceNumberMap.get(partitionId)) > 0) { res = true; break; From 74db29085e2017321e26d938e7c268771bfa6c57 Mon Sep 17 00:00:00 2001 From: Pankaj kumar Date: Tue, 9 Jan 2024 22:17:04 +0530 Subject: [PATCH 25/25] Adding retry in try/catch block --- .../appenderator/BaseAppenderatorDriver.java | 194 +++++++++--------- 1 file changed, 100 insertions(+), 94 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index 372a6e2a1190..d9dd1a0b10fa 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -62,7 +62,6 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -598,105 +597,112 @@ ListenableFuture publishInBackground( ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata(); return executor.submit( - () -> RetryUtils.retry( - () -> { - try { - final ImmutableSet ourSegments = ImmutableSet.copyOf(pushedAndTombstones); - final SegmentPublishResult publishResult = publisher.publishSegments( - segmentsToBeOverwritten, - segmentsToBeDropped, - ourSegments, - outputSegmentsAnnotateFunction, - callerMetadata - ); - - if (publishResult.isSuccess()) { - log.info( - "Published [%s] segments with commit metadata [%s]", - segmentsAndCommitMetadata.getSegments().size(), - callerMetadata - ); - log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments"); - } else { - // Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active - // now after all, for two possible reasons: - // - // 1) A replica may have beat us to publishing these segments. In this case we want to delete the - // segments we pushed (if they had unique paths) to avoid wasting space on deep storage. - // 2) We may have actually succeeded, but not realized it due to missing the confirmation response - // from the overlord. In this case we do not want to delete the segments we pushed, since they are - // now live! - - final Set segmentsIdentifiers = segmentsAndCommitMetadata - .getSegments() - .stream() - .map(SegmentIdWithShardSpec::fromDataSegment) - .collect(Collectors.toSet()); - - final Set activeSegments = usedSegmentChecker.findUsedSegments(segmentsIdentifiers); - - if (activeSegments.equals(ourSegments)) { - log.info( - "Could not publish [%s] segments, but checked and found them already published; continuing.", - ourSegments.size() - ); - log.infoSegments( - segmentsAndCommitMetadata.getSegments(), - "Could not publish segments" + () -> { + try { + RetryUtils.retry( + () -> { + try { + final ImmutableSet ourSegments = ImmutableSet.copyOf(pushedAndTombstones); + final SegmentPublishResult publishResult = publisher.publishSegments( + segmentsToBeOverwritten, + segmentsToBeDropped, + ourSegments, + outputSegmentsAnnotateFunction, + callerMetadata ); - // Clean up pushed segments if they are physically disjoint from the published ones (this means - // they were probably pushed by a replica, and with the unique paths option). - final boolean physicallyDisjoint = Sets.intersection( - activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()), - ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()) - ).isEmpty(); - - if (physicallyDisjoint) { - segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); - } - } else { - log.errorSegments(ourSegments, "Failed to publish segments"); - if (publishResult.getErrorMsg() != null && publishResult.getErrorMsg().contains(("Aborting transaction!"))) { - log.info("Got the Error, Aborting Transaction!"); - throw new ISE(publishResult.getErrorMsg()); - } - // Our segments aren't active. Publish failed for some reason. Clean them up and then throw an error. - segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); - - if (publishResult.getErrorMsg() != null) { - throw new ISE( - "Failed to publish segments because of [%s]", - publishResult.getErrorMsg() + if (publishResult.isSuccess()) { + log.info( + "Published [%s] segments with commit metadata [%s]", + segmentsAndCommitMetadata.getSegments().size(), + callerMetadata ); + log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments"); } else { - log.errorSegments(ourSegments, "Failed to publish segments"); - throw new ISE("Failed to publish segments"); + // Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active + // now after all, for two possible reasons: + // + // 1) A replica may have beat us to publishing these segments. In this case we want to delete the + // segments we pushed (if they had unique paths) to avoid wasting space on deep storage. + // 2) We may have actually succeeded, but not realized it due to missing the confirmation response + // from the overlord. In this case we do not want to delete the segments we pushed, since they are + // now live! + + final Set segmentsIdentifiers = segmentsAndCommitMetadata + .getSegments() + .stream() + .map(SegmentIdWithShardSpec::fromDataSegment) + .collect(Collectors.toSet()); + + final Set activeSegments = usedSegmentChecker.findUsedSegments(segmentsIdentifiers); + + if (activeSegments.equals(ourSegments)) { + log.info( + "Could not publish [%s] segments, but checked and found them already published; continuing.", + ourSegments.size() + ); + log.infoSegments( + segmentsAndCommitMetadata.getSegments(), + "Could not publish segments" + ); + + // Clean up pushed segments if they are physically disjoint from the published ones (this means + // they were probably pushed by a replica, and with the unique paths option). + final boolean physicallyDisjoint = Sets.intersection( + activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()), + ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()) + ).isEmpty(); + + if (physicallyDisjoint) { + segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); + } + } else { + log.errorSegments(ourSegments, "Failed to publish segments"); + if (publishResult.getErrorMsg() != null && publishResult.getErrorMsg().contains(("Aborting transaction!"))) { + log.info("Got the Error, Aborting Transaction!"); + throw new ISE(publishResult.getErrorMsg()); + } + // Our segments aren't active. Publish failed for some reason. Clean them up and then throw an error. + segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); + + if (publishResult.getErrorMsg() != null) { + throw new ISE( + "Failed to publish segments because of [%s]", + publishResult.getErrorMsg() + ); + } else { + log.errorSegments(ourSegments, "Failed to publish segments"); + throw new ISE("Failed to publish segments"); + } + } } } - } - } - catch (Exception e) { - // Must not remove segments here, we aren't sure if our transaction succeeded or not. - log.noStackTrace().warn(e, "Failed publish"); - log.warnSegments( - segmentsAndCommitMetadata.getSegments(), - "Failed publish, not removing segments" - ); - if (e.getMessage() != null && e.getMessage().contains("Aborting transaction!")) { - // we can recover from this error. - log.info("Retrying for this case"); - throw new ExecutionException(e); - } else { - Throwables.propagateIfPossible(e); - throw new RuntimeException(e); - } - } - return segmentsAndCommitMetadata; - }, - e -> (e instanceof ExecutionException), - RetryUtils.DEFAULT_MAX_TRIES - ) + catch (Exception e) { + // Must not remove segments here, we aren't sure if our transaction succeeded or not. + log.noStackTrace().warn(e, "Failed publish"); + log.warnSegments( + segmentsAndCommitMetadata.getSegments(), + "Failed publish, not removing segments" + ); + Throwables.propagateIfPossible(e); + throw new RuntimeException(e); + } + return segmentsAndCommitMetadata; + }, + e -> (e.getMessage() != null && e.getMessage().contains("Aborting transaction!")), + RetryUtils.DEFAULT_MAX_TRIES + ); + } + catch (Exception e) { + if (e.getMessage() != null && e.getMessage().contains("Aborting transaction!")) { + // Publish failed for some reason. Clean them up and then throw an error. + segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); + } + Throwables.propagateIfPossible(e); + throw new RuntimeException(e); + } + return segmentsAndCommitMetadata; + } ); }