From fddfa30dac22026a643dcedd1fc1dea2fb0ba5d4 Mon Sep 17 00:00:00 2001 From: wudi Date: Wed, 13 Nov 2024 19:57:24 +0800 Subject: [PATCH] fix batch block --- .../flink/sink/batch/DorisBatchStreamLoad.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 3cfda6041..5356d7f33 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -178,7 +178,7 @@ public DorisBatchStreamLoad( * @param record * @throws IOException */ - public synchronized void writeRecord(String database, String table, byte[] record) { + public void writeRecord(String database, String table, byte[] record) { checkFlushException(); String bufferKey = getTableIdentifier(database, table); @@ -228,15 +228,15 @@ public synchronized void writeRecord(String database, String table, byte[] recor } } - public synchronized boolean bufferFullFlush(String bufferKey) { + public boolean bufferFullFlush(String bufferKey) { return doFlush(bufferKey, false, true); } - public synchronized boolean intervalFlush() { + public boolean intervalFlush() { return doFlush(null, false, false); } - public synchronized boolean checkpointFlush() { + public boolean checkpointFlush() { return doFlush(null, true, false); } @@ -408,11 +408,6 @@ public void run() { load(bf.getLabelName(), bf); } } - - if (flushQueue.size() < flushQueueSize) { - // Avoid waiting for 2 rounds of intervalMs - doFlush(null, false, false); - } } catch (Exception e) { LOG.error("worker running error", e); exception.set(e);