diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java index 81ea6de396b4..427a6e0e8748 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java @@ -25,10 +25,14 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers; +import org.apache.druid.java.util.common.IAE; +import java.util.Comparator; -public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata + +public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata implements Comparable { + @JsonCreator public KafkaDataSourceMetadata( @JsonProperty("partitions") SeekableStreamSequenceNumbers kafkaPartitions @@ -57,4 +61,19 @@ protected SeekableStreamDataSourceMetadata createConcreteDataSour { return new KafkaDataSourceMetadata(seekableStreamSequenceNumbers); } + + @Override + // This method is to compare KafkaDataSourceMetadata. + // It compares this and other SeekableStreamSequenceNumbers using naturalOrder comparator. + public int compareTo(KafkaDataSourceMetadata other) + { + if (!getClass().equals(other.getClass())) { + throw new IAE( + "Expected instance of %s, got %s", + this.getClass().getName(), + other.getClass().getName() + ); + } + return getSeekableStreamSequenceNumbers().compareTo(other.getSeekableStreamSequenceNumbers(), Comparator.naturalOrder()); + } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 3a9000c3d6e9..6499094d3035 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1861,7 +1861,7 @@ public void testRunReplicas() throws Exception ); } - @Test(timeout = 60_000L) + @Test public void testRunConflicting() throws Exception { final KafkaIndexTask task1 = createTask( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 99f939433f7d..ee805469016a 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -1703,7 +1703,7 @@ public void testRunReplicas() throws Exception } - @Test(timeout = 120_000L) + @Test public void testRunConflicting() throws Exception { recordSupplier.assign(EasyMock.anyObject()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java index 3f2f7bfe1bc0..7aa3995dc70f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.IAE; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -218,4 +219,36 @@ public String toString() ", partitionSequenceNumberMap=" + partitionSequenceNumberMap + '}'; } + + @Override + public int compareTo(SeekableStreamSequenceNumbers other, Comparator comparator) + { + if (this.getClass() != other.getClass()) { + throw new IAE( + "Expected instance of %s, got %s", + this.getClass().getName(), + other.getClass().getName() + ); + } + + final SeekableStreamEndSequenceNumbers otherStart = + (SeekableStreamEndSequenceNumbers) other; + + if (stream.equals(otherStart.stream)) { + //Same stream, compare the offset + boolean res = false; + for (Map.Entry entry : partitionSequenceNumberMap.entrySet()) { + PartitionIdType partitionId = entry.getKey(); + SequenceOffsetType sequenceOffset = entry.getValue(); + if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && comparator.compare(sequenceOffset, otherStart.partitionSequenceNumberMap.get(partitionId)) > 0) { + res = true; + break; + } + } + if (res) { + return 1; + } + } + return 0; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java index a790974e25f1..44f1343e25d8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import java.util.Comparator; import java.util.Map; @JsonTypeInfo(use = Id.NAME, property = "type", defaultImpl = SeekableStreamEndSequenceNumbers.class) @@ -61,4 +62,11 @@ SeekableStreamSequenceNumbers plus( SeekableStreamSequenceNumbers minus( SeekableStreamSequenceNumbers other ); + + /** + * Compare this and the other sequence offsets using comparator. + * Returns 1, if this sequence is ahead of the other. + * otherwise, Return 0 + */ + int compareTo(SeekableStreamSequenceNumbers seekableStreamSequenceNumbers, Comparator comparator); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java index 52e5f31cf1d7..a3bc01dcb2ff 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java @@ -26,6 +26,7 @@ import javax.annotation.Nullable; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -239,4 +240,36 @@ public String toString() ", exclusivePartitions=" + exclusivePartitions + '}'; } + + @Override + public int compareTo(SeekableStreamSequenceNumbers other, Comparator comparator) + { + if (this.getClass() != other.getClass()) { + throw new IAE( + "Expected instance of %s, got %s", + this.getClass().getName(), + other.getClass().getName() + ); + } + + final SeekableStreamStartSequenceNumbers otherStart = + (SeekableStreamStartSequenceNumbers) other; + + if (stream.equals(otherStart.stream)) { + //Same stream, compare the offset + boolean res = false; + for (Map.Entry entry : partitionSequenceNumberMap.entrySet()) { + PartitionIdType partitionId = entry.getKey(); + SequenceOffsetType sequenceOffset = entry.getValue(); + if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && comparator.compare(sequenceOffset, otherStart.partitionSequenceNumberMap.get(partitionId)) > 0) { + res = true; + break; + } + } + if (res) { + return 1; + } + } + return 0; + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbersTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbersTest.java index 691f579d8e60..9cf29d6cbd25 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbersTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbersTest.java @@ -28,6 +28,7 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Comparator; import java.util.Map; public class SeekableStreamEndSequenceNumbersTest @@ -95,4 +96,40 @@ public void testConvertToStart() endSequenceNumbers.asStartPartitions(true) ); } + + @Test + public void testCompareToWithTrueResult() + { + final String stream = "theStream"; + final Map offsetMap1 = ImmutableMap.of(1, 5L, 2, 6L); + final SeekableStreamEndSequenceNumbers partitions1 = new SeekableStreamEndSequenceNumbers<>( + stream, + offsetMap1 + ); + + final Map offsetMap2 = ImmutableMap.of(1, 4L, 2, 4L); + final SeekableStreamEndSequenceNumbers partitions2 = new SeekableStreamEndSequenceNumbers<>( + stream, + offsetMap2 + ); + Assert.assertEquals(1, partitions1.compareTo(partitions2, Comparator.naturalOrder())); + } + + @Test + public void testCompareToWithFalseResult() + { + final String stream = "theStream"; + final Map offsetMap1 = ImmutableMap.of(1, 3L, 2, 2L); + final SeekableStreamEndSequenceNumbers partitions1 = new SeekableStreamEndSequenceNumbers<>( + stream, + offsetMap1 + ); + + final Map offsetMap2 = ImmutableMap.of(1, 4L, 2, 4L); + final SeekableStreamEndSequenceNumbers partitions2 = new SeekableStreamEndSequenceNumbers<>( + stream, + offsetMap2 + ); + Assert.assertEquals(0, partitions1.compareTo(partitions2, Comparator.naturalOrder())); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java index f4342e27acbc..f632cf61ecf2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java @@ -28,6 +28,7 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Comparator; import java.util.Map; public class SeekableStreamStartSequenceNumbersTest @@ -74,4 +75,44 @@ public void testSerde() throws Exception OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new TypeReference>() {}) ); } + + @Test + public void testCompareToWithTrueResult() + { + final String stream = "theStream"; + final Map offsetMap1 = ImmutableMap.of(1, 5L, 2, 6L); + final SeekableStreamStartSequenceNumbers partitions1 = new SeekableStreamStartSequenceNumbers<>( + stream, + offsetMap1, + ImmutableSet.of(6) + ); + + final Map offsetMap2 = ImmutableMap.of(1, 4L, 2, 4L); + final SeekableStreamStartSequenceNumbers partitions2 = new SeekableStreamStartSequenceNumbers<>( + stream, + offsetMap2, + ImmutableSet.of(6) + ); + Assert.assertEquals(1, partitions1.compareTo(partitions2, Comparator.naturalOrder())); + } + + @Test + public void testCompareToWithFalseResult() + { + final String stream = "theStream"; + final Map offsetMap1 = ImmutableMap.of(1, 3L, 2, 2L); + final SeekableStreamStartSequenceNumbers partitions1 = new SeekableStreamStartSequenceNumbers<>( + stream, + offsetMap1, + ImmutableSet.of(6) + ); + + final Map offsetMap2 = ImmutableMap.of(1, 4L, 2, 4L); + final SeekableStreamStartSequenceNumbers partitions2 = new SeekableStreamStartSequenceNumbers<>( + stream, + offsetMap2, + ImmutableSet.of(6) + ); + Assert.assertEquals(0, partitions1.compareTo(partitions2, Comparator.naturalOrder())); + } } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index bb7759a8b5d6..a47b03b63a41 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1558,11 +1558,19 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( } final boolean startMetadataMatchesExisting; + int startMetadataGreaterThanExisting = 0; if (oldCommitMetadataFromDb == null) { startMetadataMatchesExisting = startMetadata.isValidStart(); + startMetadataGreaterThanExisting = 1; } else { // Checking against the last committed metadata. + // If the new start sequence number is greater than the end sequence number of last commit compareTo() function will return 1, + // 0 in all other cases. It might be because multiple tasks are publishing the sequence at around same time. + if (startMetadata instanceof Comparable) { + startMetadataGreaterThanExisting = ((Comparable) startMetadata.asStartMetadata()).compareTo(oldCommitMetadataFromDb.asStartMetadata()); + } + // Converting the last one into start metadata for checking since only the same type of metadata can be matched. // Even though kafka/kinesis indexing services use different sequenceNumber types for representing // start and end sequenceNumbers, the below conversion is fine because the new start sequenceNumbers are supposed @@ -1570,6 +1578,17 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( startMetadataMatchesExisting = startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata()); } + if (startMetadataGreaterThanExisting == 1 && !startMetadataMatchesExisting) { + // Offset stored in StartMetadata is Greater than the last commited metadata, + // Then retry multiple task might be trying to publish the segment for same partitions. + log.info( + "Failed to update the metadata Store. The new start metadata: [%s] is ahead of last commited end state: [%s].", + oldCommitMetadataFromDb, + startMetadata + ); + return DataStoreMetadataUpdateResult.FAILURE; + } + if (!startMetadataMatchesExisting) { // Not in the desired start state. log.error( diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index 65a8a676d3d2..d9dd1a0b10fa 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -38,6 +38,7 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; @@ -598,88 +599,108 @@ ListenableFuture publishInBackground( return executor.submit( () -> { try { - final ImmutableSet ourSegments = ImmutableSet.copyOf(pushedAndTombstones); - final SegmentPublishResult publishResult = publisher.publishSegments( - segmentsToBeOverwritten, - segmentsToBeDropped, - ourSegments, - outputSegmentsAnnotateFunction, - callerMetadata + RetryUtils.retry( + () -> { + try { + final ImmutableSet ourSegments = ImmutableSet.copyOf(pushedAndTombstones); + final SegmentPublishResult publishResult = publisher.publishSegments( + segmentsToBeOverwritten, + segmentsToBeDropped, + ourSegments, + outputSegmentsAnnotateFunction, + callerMetadata + ); + + if (publishResult.isSuccess()) { + log.info( + "Published [%s] segments with commit metadata [%s]", + segmentsAndCommitMetadata.getSegments().size(), + callerMetadata + ); + log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments"); + } else { + // Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active + // now after all, for two possible reasons: + // + // 1) A replica may have beat us to publishing these segments. In this case we want to delete the + // segments we pushed (if they had unique paths) to avoid wasting space on deep storage. + // 2) We may have actually succeeded, but not realized it due to missing the confirmation response + // from the overlord. In this case we do not want to delete the segments we pushed, since they are + // now live! + + final Set segmentsIdentifiers = segmentsAndCommitMetadata + .getSegments() + .stream() + .map(SegmentIdWithShardSpec::fromDataSegment) + .collect(Collectors.toSet()); + + final Set activeSegments = usedSegmentChecker.findUsedSegments(segmentsIdentifiers); + + if (activeSegments.equals(ourSegments)) { + log.info( + "Could not publish [%s] segments, but checked and found them already published; continuing.", + ourSegments.size() + ); + log.infoSegments( + segmentsAndCommitMetadata.getSegments(), + "Could not publish segments" + ); + + // Clean up pushed segments if they are physically disjoint from the published ones (this means + // they were probably pushed by a replica, and with the unique paths option). + final boolean physicallyDisjoint = Sets.intersection( + activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()), + ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()) + ).isEmpty(); + + if (physicallyDisjoint) { + segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); + } + } else { + log.errorSegments(ourSegments, "Failed to publish segments"); + if (publishResult.getErrorMsg() != null && publishResult.getErrorMsg().contains(("Aborting transaction!"))) { + log.info("Got the Error, Aborting Transaction!"); + throw new ISE(publishResult.getErrorMsg()); + } + // Our segments aren't active. Publish failed for some reason. Clean them up and then throw an error. + segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); + + if (publishResult.getErrorMsg() != null) { + throw new ISE( + "Failed to publish segments because of [%s]", + publishResult.getErrorMsg() + ); + } else { + log.errorSegments(ourSegments, "Failed to publish segments"); + throw new ISE("Failed to publish segments"); + } + } + } + } + catch (Exception e) { + // Must not remove segments here, we aren't sure if our transaction succeeded or not. + log.noStackTrace().warn(e, "Failed publish"); + log.warnSegments( + segmentsAndCommitMetadata.getSegments(), + "Failed publish, not removing segments" + ); + Throwables.propagateIfPossible(e); + throw new RuntimeException(e); + } + return segmentsAndCommitMetadata; + }, + e -> (e.getMessage() != null && e.getMessage().contains("Aborting transaction!")), + RetryUtils.DEFAULT_MAX_TRIES ); - - if (publishResult.isSuccess()) { - log.info( - "Published [%s] segments with commit metadata [%s]", - segmentsAndCommitMetadata.getSegments().size(), - callerMetadata - ); - log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments"); - } else { - // Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active - // now after all, for two possible reasons: - // - // 1) A replica may have beat us to publishing these segments. In this case we want to delete the - // segments we pushed (if they had unique paths) to avoid wasting space on deep storage. - // 2) We may have actually succeeded, but not realized it due to missing the confirmation response - // from the overlord. In this case we do not want to delete the segments we pushed, since they are - // now live! - - final Set segmentsIdentifiers = segmentsAndCommitMetadata - .getSegments() - .stream() - .map(SegmentIdWithShardSpec::fromDataSegment) - .collect(Collectors.toSet()); - - final Set activeSegments = usedSegmentChecker.findUsedSegments(segmentsIdentifiers); - - if (activeSegments.equals(ourSegments)) { - log.info( - "Could not publish [%s] segments, but checked and found them already published; continuing.", - ourSegments.size() - ); - log.infoSegments( - segmentsAndCommitMetadata.getSegments(), - "Could not publish segments" - ); - - // Clean up pushed segments if they are physically disjoint from the published ones (this means - // they were probably pushed by a replica, and with the unique paths option). - final boolean physicallyDisjoint = Sets.intersection( - activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()), - ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()) - ).isEmpty(); - - if (physicallyDisjoint) { - segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); - } - } else { - // Our segments aren't active. Publish failed for some reason. Clean them up and then throw an error. - segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); - - if (publishResult.getErrorMsg() != null) { - log.errorSegments(ourSegments, "Failed to publish segments"); - throw new ISE( - "Failed to publish segments because of [%s]", - publishResult.getErrorMsg() - ); - } else { - log.errorSegments(ourSegments, "Failed to publish segments"); - throw new ISE("Failed to publish segments"); - } - } - } } catch (Exception e) { - // Must not remove segments here, we aren't sure if our transaction succeeded or not. - log.noStackTrace().warn(e, "Failed publish"); - log.warnSegments( - segmentsAndCommitMetadata.getSegments(), - "Failed publish, not removing segments" - ); + if (e.getMessage() != null && e.getMessage().contains("Aborting transaction!")) { + // Publish failed for some reason. Clean them up and then throw an error. + segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); + } Throwables.propagateIfPossible(e); throw new RuntimeException(e); } - return segmentsAndCommitMetadata; } );