Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logging improvements for topic reader & writer #359

Merged
merged 5 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package tech.ydb.topic.impl;

import java.util.UUID;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -22,6 +22,9 @@ public abstract class GrpcStreamRetrier {
private static final int EXP_BACKOFF_BASE_MS = 256;
private static final int EXP_BACKOFF_CEILING_MS = 40000; // 40 sec (max delays would be 40-80 sec)
private static final int EXP_BACKOFF_MAX_POWER = 7;
private static final int ID_LENGTH = 6;
private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890"
.toCharArray();

protected final String id;
protected final AtomicBoolean isReconnecting = new AtomicBoolean(false);
Expand All @@ -31,14 +34,22 @@ public abstract class GrpcStreamRetrier {

protected GrpcStreamRetrier(ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
this.id = UUID.randomUUID().toString();
this.id = generateRandomId(ID_LENGTH);
}

protected abstract Logger getLogger();
protected abstract String getStreamName();
protected abstract void onStreamReconnect();
protected abstract void onShutdown(String reason);

protected static String generateRandomId(int length) {
return new Random().ints(0, ID_ALPHABET.length)
.limit(length)
.map(charId -> ID_ALPHABET[charId])
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}

private void tryScheduleReconnect() {
int currentReconnectCounter = reconnectCounter.get() + 1;
if (MAX_RECONNECT_COUNT > 0 && currentReconnectCounter > MAX_RECONNECT_COUNT) {
Expand All @@ -49,8 +60,8 @@ private void tryScheduleReconnect() {
shutdownImpl(errorMessage);
return;
} else {
getLogger().debug("[{}] Maximum retry count ({}}) exceeded. But {} is already shut down.", id,
MAX_RECONNECT_COUNT, getStreamName());
getLogger().info("[{}] Maximum retry count ({}}) exceeded. Need to shutdown {} but it's already " +
"shut down.", id, MAX_RECONNECT_COUNT, getStreamName());
}
}
if (isReconnecting.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ public CompletableFuture<Void> commit() {

public CompletableFuture<Void> commitImpl(boolean fromCommitter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] partition session {} (partition {}): committing {} message(s), offsets" +
" [{},{})" + (fromCommitter ? " from Committer" : ""), partitionSession.getPath(),
partitionSession.getId(), partitionSession.getPartitionId(), messageCount,
offsetsToCommit.getStart(), offsetsToCommit.getEnd());
logger.debug("[{}] Committing {} message(s), offsets [{},{})" + (fromCommitter ? " from Committer" : ""),
partitionSession.getFullId(), messageCount, offsetsToCommit.getStart(), offsetsToCommit.getEnd());
}
return partitionSession.commitOffsetRange(offsetsToCommit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ private void add(OffsetsRange offsetRange) {
rangesLock.unlock();
}
} catch (RuntimeException exception) {
String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " +
partitionSession.getId() + " (partition " + partitionSession.getPartitionId() + "): " +
exception.getMessage();
String errorMessage = "[" + partitionSession.getFullId() + "] Error adding new offset range to " +
"DeferredCommitter for partition session " + partitionSession.getId() + " (partition " +
partitionSession.getPartitionId() + "): " + exception.getMessage();
logger.error(errorMessage);
throw new RuntimeException(errorMessage, exception);
}
Expand Down
Loading
Loading