Skip to content

Commit

Permalink
modify config default value, solve conflict.
Browse files Browse the repository at this point in the history
  • Loading branch information
CodeCooker17 committed Sep 13, 2023
1 parent fc5cf99 commit 31db70e
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ public interface ConfigurationOptions {
int SINK_MAX_BLOCKING_TIMES_DEFAULT = 1;

String DORIS_SINK_MAX_BLOCKING_INTERVAL_MS = "doris.sink.max.blocking.interval.ms";
int SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT = 1000;
int SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT = 300000;

String DORIS_SINK_BLOCKING_TRIGGER_KEYS = "doris.sink.block.trigger.keys";
String SINK_BLOCKING_TRIGGER_KEYS_DEFAULT = "-235";
String SINK_BLOCKING_TRIGGER_KEYS_DEFAULT = "";


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private[spark] object Utils {
}

def shouldBlock(exception: String): Boolean = {
blockTriggerKeysArray.exists(exception.contains)
blockTriggerKeysArray.nonEmpty && blockTriggerKeysArray.exists(exception.contains)
}

val result = Try(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class DorisWriter(settings: SparkSettings) extends Serializable {

var resultRdd = dataFrame.queryExecution.toRdd
val schema = dataFrame.schema
val dfColumns = dataFrame.columns
if (Objects.nonNull(sinkTaskPartitionSize)) {
resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize)
}
Expand Down

0 comments on commit 31db70e

Please sign in to comment.