Skip to content

Commit

Permalink
Fix reader hdfs constant partition reassignement (#227)
Browse files Browse the repository at this point in the history
* Fix reader hdfs constant partition reassignement

  - upgrade flink client to latest version: new clients handle more gracefully cluster rebalancing. Instead of stopping all kafka consumers, only the one concerned by the rebalance will be stopped and have their offset resetted
  - expire consumers using a thread pool: this aims at lowering the time it takes to write files to hdfs by parallelizing those operations
  - Fix race on latestMessageTimeForPartitionAndDay access

* Remove heuristics module as it is not used anymore and fails the build

* Fix checkstyle

---------

Co-authored-by: w.montaz <[email protected]>
  • Loading branch information
Willymontaz and w.montaz authored Nov 27, 2023
1 parent de3ea0f commit 5d6215a
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 42 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
<java-hamcrest.version>2.0.0.0</java-hamcrest.version>
<slf4j.version>1.7.26</slf4j.version>
<protobuf.version>3.9.0</protobuf.version>
<kafka.version>2.0.1</kafka.version>
<kafka.version>3.5.1</kafka.version>
<guava.version>20.0</guava.version>
<parquet-protobuf.version>1.10.1</parquet-protobuf.version>
<protobuf-dynamic.version>1.0.0</protobuf-dynamic.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand All @@ -37,6 +42,14 @@ public class PartitionedWriter<MESSAGE_KIND> implements Closeable {
private static final ZoneId UTC_ZONE = ZoneId.of("UTC");
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionedWriter.class);

// A pool of worker dedicated to writing files to HDFS. Allows the reader to block for less time
// the pool is static to avoid creating too many pools
private static final ExecutorService CONSUMER_CLOSER_THREADS = Executors.newCachedThreadPool();

static {
Runtime.getRuntime().addShutdownHook(new Thread(CONSUMER_CLOSER_THREADS::shutdown));
}

private final BiFunction<Integer, LocalDateTime, ExpiringConsumer<MESSAGE_KIND>> writerBuilder;
private final OffsetComputer offsetComputer;
private final Map<Integer, Map<LocalDateTime, ExpiringConsumer<MESSAGE_KIND>>> perPartitionDayWriters = new HashMap<>();
Expand Down Expand Up @@ -92,12 +105,12 @@ public void write(Instant when, Offset offset, MESSAGE_KIND msg) throws IOExcept
final AbstractMap.SimpleEntry<Integer, LocalDateTime> dayAndPartition = new AbstractMap.SimpleEntry<>(
partitionId, dayStartTime);

if (!latestMessageTimeForPartitionAndDay.containsKey(dayAndPartition) ||
when.isAfter(latestMessageTimeForPartitionAndDay.get(dayAndPartition))) {
latestMessageTimeForPartitionAndDay.put(dayAndPartition, when);
}

synchronized (perPartitionDayWriters) {
if (!latestMessageTimeForPartitionAndDay.containsKey(dayAndPartition) ||
when.isAfter(latestMessageTimeForPartitionAndDay.get(dayAndPartition))) {
latestMessageTimeForPartitionAndDay.put(dayAndPartition, when);
}

if (shouldSkipOffset(offset.getOffset(), partitionId)) return;

// /!\ This line must not be switched with the offset computation as this would create empty files otherwise
Expand Down Expand Up @@ -152,8 +165,8 @@ public Map<Integer, Long> getStartingOffsets(Collection<Integer> partitionsId) t
}

return partitionsId.stream()
.filter(perPartitionStartOffset::containsKey)
.collect(Collectors.toMap(Function.identity(), perPartitionStartOffset::get));
.filter(perPartitionStartOffset::containsKey)
.collect(Collectors.toMap(Function.identity(), perPartitionStartOffset::get));
}
}

Expand All @@ -177,13 +190,13 @@ public void heartbeat(int partition, Offset offset) {

try {
if ((!perPartitionDayWriters.containsKey(partition) || perPartitionDayWriters.get(partition).isEmpty())
&& !shouldSkipOffset(offset.getOffset(), partition)) {
&& !shouldSkipOffset(offset.getOffset(), partition)) {
final ExpiringConsumer<MESSAGE_KIND> heartbeatWriter = writerBuilder.apply(offset.getPartition(), LocalDateTime.now());

long now = System.currentTimeMillis();
MESSAGE_KIND msg = (MESSAGE_KIND) ProtoConcatenator
.concatToProtobuf(now, offset.getOffset(), Arrays.asList(emptyHeader, emptyMessageBuilder.build()))
.build();
.concatToProtobuf(now, offset.getOffset(), Arrays.asList(emptyHeader, emptyMessageBuilder.build()))
.build();

heartbeatWriter.write(now, msg, offset);

Expand All @@ -200,42 +213,105 @@ public void heartbeat(int partition, Offset offset) {
}
}

private void possiblyCloseConsumers(Predicate<ExpiringConsumer> shouldClose) {
private void possiblyCloseConsumers(Predicate<ExpiringConsumer<MESSAGE_KIND>> shouldClose) {
synchronized (perPartitionDayWriters) {
perPartitionDayWriters.forEach((partitionId, dailyWriters) ->
dailyWriters.entrySet().removeIf(entry -> {
final ExpiringConsumer<MESSAGE_KIND> consumer = entry.getValue();
final LocalDateTime day = entry.getKey();

if (shouldClose.test(consumer)) {
if (tryExpireConsumer(consumer)) {
final Counter.Child filesCommitted = PrometheusMetrics.filesCommittedCounter(eventName);
final Counter.Child checkpointsFailures = PrometheusMetrics.checkpointFailuresCounter(eventName, partitionId);
final Counter.Child checkpointsSuccesses = PrometheusMetrics.checkpointSuccessesCounter(eventName, partitionId);

filesCommitted.inc();

try {
checkpointer.tryCheckpoint(partitionId, latestMessageTimeForPartitionAndDay.get(
new AbstractMap.SimpleEntry<>(partitionId, day)));
} catch (RuntimeException e) {
String msg = String.format("Failed to checkpoint partition %d, date %s, event %s",
partitionId, day.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),
eventName);

LOGGER.warn(msg, e);
checkpointsFailures.inc();

// We create a bunch of async tasks that will try to close the consumers
List<CompletableFuture<CloseConsumerTaskResult>> futureResults = perPartitionDayWriters.entrySet().stream().flatMap(partitionAndWriters -> {
Integer partitionId = partitionAndWriters.getKey();
Map<LocalDateTime, ExpiringConsumer<MESSAGE_KIND>> dailyWriters = partitionAndWriters.getValue();

return dailyWriters.entrySet().stream().map(e -> CompletableFuture.supplyAsync(
new CloseConsumerTask(shouldClose, partitionId, e.getKey(), e.getValue()),
CONSUMER_CLOSER_THREADS
));
}).collect(Collectors.toList());

CompletableFuture
// We wait for all those tasks to complete
.allOf(futureResults.toArray(new CompletableFuture<?>[0]))
// Upon completion, we remove the consumers if relevant
// we do this only once all futures have completed
// to avoid race condition on the map object we modify.
// If an error occurred, we log it. This way we make sure not to throw to early and miss
// cleaning work as well as logging all errors
.whenComplete((ignored1, ignored2) -> futureResults.forEach(futureResult -> {
try {
CloseConsumerTaskResult result = futureResult.get();
// If the consumer was properly closed, remove it
if (result.closed) {
perPartitionDayWriters.get(result.partitionId).remove(result.day);
}
} catch (ExecutionException | InterruptedException e) {
LOGGER.error("A consumer could not be closed properly", e);
}
})).join(); // return nothing or throws if any error occurred
}
}

checkpointsSuccesses.inc();
private final static class CloseConsumerTaskResult {
private final Integer partitionId;
private final LocalDateTime day;

return true;
}
private final boolean closed;

private CloseConsumerTaskResult(Integer partitionId, LocalDateTime day, boolean closed) {
this.partitionId = partitionId;
this.day = day;
this.closed = closed;
}
}

private final class CloseConsumerTask implements Supplier<CloseConsumerTaskResult> {

private final Predicate<ExpiringConsumer<MESSAGE_KIND>> shouldClose;
private final Integer partitionId;
private final LocalDateTime day;
private final ExpiringConsumer<MESSAGE_KIND> consumer;

private CloseConsumerTask(
Predicate<ExpiringConsumer<MESSAGE_KIND>> shouldClose,
Integer partitionId,
LocalDateTime day,
ExpiringConsumer<MESSAGE_KIND> consumer
) {
this.shouldClose = shouldClose;
this.partitionId = partitionId;
this.day = day;
this.consumer = consumer;
}

@Override
public CloseConsumerTaskResult get() {
boolean closed = false;
if (shouldClose.test(consumer)) {
if (tryExpireConsumer(consumer)) {
final Counter.Child filesCommitted = PrometheusMetrics.filesCommittedCounter(eventName);
final Counter.Child checkpointsFailures = PrometheusMetrics.checkpointFailuresCounter(eventName, partitionId);
final Counter.Child checkpointsSuccesses = PrometheusMetrics.checkpointSuccessesCounter(eventName, partitionId);

filesCommitted.inc();

try {
checkpointer.tryCheckpoint(partitionId, latestMessageTimeForPartitionAndDay.get(
new AbstractMap.SimpleEntry<>(partitionId, day)));
} catch (RuntimeException e) {
String msg = String.format("Failed to checkpoint partition %d, date %s, event %s",
partitionId, day.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),
eventName);

LOGGER.warn(msg, e);
checkpointsFailures.inc();
}

return false;
}));
checkpointsSuccesses.inc();

closed = true;
}
}
return new CloseConsumerTaskResult(partitionId, day, closed);
}

}

private boolean tryExpireConsumer(ExpiringConsumer<MESSAGE_KIND> consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ public void closingExceptionalConsumerThrowExceptionAfter5Retries() throws IOExc
partitionedWriter.close();
} catch (RuntimeException re) {
throwException = true;
assertEquals("Couldn't close writer for ignored", re.getMessage());
}

assertTrue(throwException);
Expand Down
1 change: 0 additions & 1 deletion readers/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
<modules>
<module>common</module>
<module>elasticsearch</module>
<module>heuristics</module>
<module>hdfs</module>
</modules>
</project>

0 comments on commit 5d6215a

Please sign in to comment.