Skip to content

Commit

Permalink
Merge branch 'master' into improve-test
Browse files Browse the repository at this point in the history
# Conflicts:
#	flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
#	flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
#	flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java
#	flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java
#	flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
  • Loading branch information
JNSimba committed Sep 10, 2024
2 parents 7518568 + d872f55 commit eb22704
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint> restoreEnumera
SplitEnumeratorContext<DorisSourceSplit> context, PendingSplitsCheckpoint checkpoint)
throws Exception {
Collection<DorisSourceSplit> splits = checkpoint.getSplits();
LOG.info("Restore {} splits from checkpoint, detail {}", splits.size(), splits);
LOG.info("Restore splits from checkpoint, size {}, splits {}", splits.size(), splits);
DorisSplitAssigner splitAssigner = new SimpleSplitAssigner(splits);
return new DorisSourceEnumerator(context, splitAssigner);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void addSplits(Collection<DorisSourceSplit> splits) {

@Override
public PendingSplitsCheckpoint snapshotState(long checkpointId) {
LOG.info("Checkpointing splits: {}, id {}", splits, checkpointId);
LOG.info("Snapshot splits {} for checkpoint {}", splits, checkpointId);
return new PendingSplitsCheckpoint(splits);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void addSplitsBack(List<DorisSourceSplit> splits, int subtaskId) {

@Override
public void addReader(int subtaskId) {
LOG.info("Doris Source Enumerator adds reader: {}", subtaskId);
LOG.info("add reader: {}", subtaskId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected void onSplitFinished(Map<String, DorisSourceSplitState> finishedSplitI

@Override
protected DorisSourceSplitState initializedState(DorisSourceSplit split) {
LOG.info("Initialized state for split: {}", split);
LOG.info("Initialized reader state for split: {}", split);
return new DorisSourceSplitState(split);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,9 @@ private DorisSplitRecords finishSplit() {
}
valueReader = null;
}
LOG.info("Finished split {}", currentSplitId);

final DorisSplitRecords finishRecords = DorisSplitRecords.finishedSplit(currentSplitId);
currentSplitId = null;
LOG.info("After Finished split {}, {} ", currentSplitId, finishRecords.finishedSplits());
return finishRecords;
}

Expand Down

0 comments on commit eb22704

Please sign in to comment.