Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changing the Failure to TRY Again in case it fails to update the metadata. #158

Open
wants to merge 27 commits into
base: 25.0.0-confluent
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
cfaac47
Changing the failure retry
Pankaj260100 Sep 28, 2023
a757692
Merge branch '25.0.0-confluent' into pankaj/OBSDATA-2313
Pankaj260100 Sep 28, 2023
eb11c43
Updating test cases
Pankaj260100 Sep 28, 2023
5581295
Updating test cases
Pankaj260100 Sep 28, 2023
5b77c2e
minor change
Pankaj260100 Sep 28, 2023
941d32a
Test case fix
Pankaj260100 Sep 28, 2023
9c5edc2
Changing the default number of retry to 2
Pankaj260100 Oct 4, 2023
c25f6b7
Adding test case
Pankaj260100 Oct 5, 2023
ccd9eb7
Changing the logic to retry in case of new startoffset is greater tha…
Pankaj260100 Oct 11, 2023
0425c89
minor change
Pankaj260100 Oct 11, 2023
2c8a32e
Fix test cases
Pankaj260100 Oct 11, 2023
34fc18c
Fix test cases
Pankaj260100 Oct 11, 2023
6964e0f
Fix test cases
Pankaj260100 Oct 11, 2023
08847bc
Using CompareTo function instead of adding new greaterThen
Pankaj260100 Oct 12, 2023
5b00e71
Addressing comments
Pankaj260100 Nov 14, 2023
1745550
fixing Indentation
Pankaj260100 Nov 14, 2023
5a3869a
minor change
Pankaj260100 Nov 14, 2023
f5a4ed2
Fix checkstyle
Pankaj260100 Nov 15, 2023
24b86fd
updating PR as per upstream PR
Pankaj260100 Dec 20, 2023
543a42d
Fix build
Pankaj260100 Dec 27, 2023
f7b9c63
Fix build
Pankaj260100 Dec 27, 2023
1432037
Fix build
Pankaj260100 Dec 27, 2023
a8c24cd
Merge branch '25.0.0-confluent' into pankaj/OBSDATA-2313
Pankaj260100 Dec 28, 2023
5b8519c
Fix retry Logic
Pankaj260100 Jan 4, 2024
b4318d6
Addressing review comments
Pankaj260100 Jan 8, 2024
ff387c1
Fixing test cases
Pankaj260100 Jan 8, 2024
74db290
Adding retry in try/catch block
Pankaj260100 Jan 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressing comments
Pankaj260100 committed Nov 14, 2023
commit 5b00e716f26b818ecf7ea729c8d161e7d8bfc874
Original file line number Diff line number Diff line change
@@ -135,10 +135,4 @@ public String toString()
", metrics=" + metrics +
'}';
}

@Override
public int compareTo(DataSourceMetadata o)
{
return -1;
}
}
Original file line number Diff line number Diff line change
@@ -25,9 +25,12 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.java.util.common.IAE;
import java.util.Comparator;


public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata<KafkaTopicPartition, Long> implements Comparable<KafkaDataSourceMetadata> {

public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata<Integer, Long>
{

@JsonCreator
public KafkaDataSourceMetadata(
@@ -57,4 +60,17 @@ protected SeekableStreamDataSourceMetadata<Integer, Long> createConcreteDataSour
{
return new KafkaDataSourceMetadata(seekableStreamSequenceNumbers);
}

@Override
public int compareTo(KafkaDataSourceMetadata other)
{
if (!getClass().equals(other.getClass())) {
throw new IAE(
"Expected instance of %s, got %s",
this.getClass().getName(),
other.getClass().getName()
);
}
return getSeekableStreamSequenceNumbers().compareTo(other.getSeekableStreamSequenceNumbers(), Comparator.naturalOrder());
}
}
Original file line number Diff line number Diff line change
@@ -59,18 +59,6 @@ public boolean matches(DataSourceMetadata other)
return plus(other).equals(other.plus(this));
}

@Override
public int compareTo(DataSourceMetadata other)
{
if (!getClass().equals(other.getClass())) {
return -1;
}
final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> that =
(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>) other;

return seekableStreamSequenceNumbers.compareTo(that.seekableStreamSequenceNumbers);
}

@Override
public DataSourceMetadata plus(DataSourceMetadata other)
{
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
import org.apache.druid.java.util.common.IAE;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -221,7 +222,7 @@ public String toString()
}

@Override
public int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other)
public int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other, Comparator<SequenceOffsetType> comparator)
{
if (this.getClass() != other.getClass()) {
throw new IAE(
@@ -239,18 +240,15 @@ public int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffs
AtomicReference<Boolean> res = new AtomicReference<>(false);
partitionSequenceNumberMap.forEach(
(partitionId, sequenceOffset) -> {
if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && Long.parseLong(String.valueOf(sequenceOffset)) > Long.parseLong(String.valueOf(otherStart.partitionSequenceNumberMap.get(partitionId)))) {
if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && comparator.compare(sequenceOffset, otherStart.partitionSequenceNumberMap.get(partitionId)) > 0) {
res.set(true);
}
}
);
if (res.get()) {
return 1;
}
return -1;
} else {
// Different streams
return -1;
}
return 0;
}
}
Original file line number Diff line number Diff line change
@@ -25,14 +25,15 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import org.apache.druid.indexing.overlord.DataSourceMetadata;

import java.util.Comparator;
import java.util.Map;

@JsonTypeInfo(use = Id.NAME, property = "type", defaultImpl = SeekableStreamEndSequenceNumbers.class)
@JsonSubTypes({
@Type(name = "start", value = SeekableStreamStartSequenceNumbers.class),
@Type(name = "end", value = SeekableStreamEndSequenceNumbers.class)
})
public interface SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> extends Comparable<SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType>>
public interface SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType>
{
/**
* Returns the stream/topic name.
@@ -61,4 +62,11 @@ SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> plus(
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> minus(
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
);

/**
* Compare this and the other sequence offsets using comparator.
* Returns 1, if this sequence is ahead of the other.
* otherwise, Return 0
*/
int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> seekableStreamSequenceNumbers, Comparator<SequenceOffsetType> comparator);
}
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -242,7 +243,7 @@ public String toString()
}

@Override
public int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other)
public int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other, Comparator<SequenceOffsetType> comparator)
{
if (this.getClass() != other.getClass()) {
throw new IAE(
@@ -260,18 +261,15 @@ public int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffs
AtomicReference<Boolean> res = new AtomicReference<>(false);
partitionSequenceNumberMap.forEach(
(partitionId, sequenceOffset) -> {
if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && Long.parseLong(String.valueOf(sequenceOffset)) > Long.parseLong(String.valueOf(otherStart.partitionSequenceNumberMap.get(partitionId)))) {
if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && comparator.compare(sequenceOffset, otherStart.partitionSequenceNumberMap.get(partitionId)) > 0) {
res.set(true);
}
}
);
if (res.get()) {
return 1;
}
return -1;
} else {
// Different streams
return -1;
}
return 0;
}
}
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "object", value = ObjectMetadata.class)
})
public interface DataSourceMetadata extends Comparable<DataSourceMetadata>
public interface DataSourceMetadata
{
/**
* Returns true if this instance should be considered a valid starting point for a new dataSource that has
Original file line number Diff line number Diff line change
@@ -96,10 +96,4 @@ public String toString()
"theObject=" + theObject +
'}';
}

@Override
public int compareTo(DataSourceMetadata o)
{
return -1;
}
}
Original file line number Diff line number Diff line change
@@ -1558,16 +1558,18 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle(
}

final boolean startMetadataMatchesExisting;
final int startMetadataGreaterThanExisting;
int startMetadataGreaterThanExisting = 0;

if (oldCommitMetadataFromDb == null) {
startMetadataMatchesExisting = startMetadata.isValidStart();
startMetadataGreaterThanExisting = 1;
} else {
// Checking against the last committed metadata.
// If the new start sequence number is greater than the end sequence number of last commit compareTo() function will return 1,
// -1 in other cases. It might be because multiple tasks are publishing the sequence at around same time.
startMetadataGreaterThanExisting = startMetadata.asStartMetadata().compareTo(oldCommitMetadataFromDb.asStartMetadata());
// 0 in all other cases. It might be because multiple tasks are publishing the sequence at around same time.
if (startMetadata instanceof Comparable) {
startMetadataGreaterThanExisting = ((Comparable) startMetadata.asStartMetadata()).compareTo(oldCommitMetadataFromDb.asStartMetadata());
}

// Converting the last one into start metadata for checking since only the same type of metadata can be matched.
// Even though kafka/kinesis indexing services use different sequenceNumber types for representing
@@ -1580,12 +1582,13 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle(
// Offset stored in StartMetadata is Greater than the last commited metadata,
// Then retry multiple task might be trying to publish the segment for same partitions.
log.info(
"Retrying to update metadata, existing state[%s] in metadata store is behind the new start state[%s].",
oldCommitMetadataFromDb,
startMetadata
"Failed to update the metadata Store. The new start metadata: [%s] is ahead of last commited end state: [%s].",
oldCommitMetadataFromDb,
startMetadata
);
return DataStoreMetadataUpdateResult.TRY_AGAIN;
}

if (!startMetadataMatchesExisting) {
// Not in the desired start state.
log.error(