diff --git a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java index 59781ac2..3913f7e3 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java +++ b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java @@ -1,6 +1,6 @@ package tech.ydb.topic.impl; -import java.util.UUID; +import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -22,6 +22,9 @@ public abstract class GrpcStreamRetrier { private static final int EXP_BACKOFF_BASE_MS = 256; private static final int EXP_BACKOFF_CEILING_MS = 40000; // 40 sec (max delays would be 40-80 sec) private static final int EXP_BACKOFF_MAX_POWER = 7; + private static final int ID_LENGTH = 6; + private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890" + .toCharArray(); protected final String id; protected final AtomicBoolean isReconnecting = new AtomicBoolean(false); @@ -31,7 +34,7 @@ public abstract class GrpcStreamRetrier { protected GrpcStreamRetrier(ScheduledExecutorService scheduler) { this.scheduler = scheduler; - this.id = UUID.randomUUID().toString(); + this.id = generateRandomId(ID_LENGTH); } protected abstract Logger getLogger(); @@ -39,6 +42,14 @@ protected GrpcStreamRetrier(ScheduledExecutorService scheduler) { protected abstract void onStreamReconnect(); protected abstract void onShutdown(String reason); + protected static String generateRandomId(int length) { + return new Random().ints(0, ID_ALPHABET.length) + .limit(length) + .map(charId -> ID_ALPHABET[charId]) + .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append) + .toString(); + } + private void tryScheduleReconnect() { int currentReconnectCounter = reconnectCounter.get() + 1; if (MAX_RECONNECT_COUNT > 0 && currentReconnectCounter > MAX_RECONNECT_COUNT) { @@ -49,8 +60,8 @@ private void tryScheduleReconnect() { shutdownImpl(errorMessage); return; } else { - getLogger().debug("[{}] Maximum retry count ({}}) exceeded. But {} is already shut down.", id, - MAX_RECONNECT_COUNT, getStreamName()); + getLogger().info("[{}] Maximum retry count ({}}) exceeded. Need to shutdown {} but it's already " + + "shut down.", id, MAX_RECONNECT_COUNT, getStreamName()); } } if (isReconnecting.compareAndSet(false, true)) { diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java index 512ee870..05fe2885 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java @@ -29,10 +29,8 @@ public CompletableFuture commit() { public CompletableFuture commitImpl(boolean fromCommitter) { if (logger.isDebugEnabled()) { - logger.debug("[{}] partition session {} (partition {}): committing {} message(s), offsets" + - " [{},{})" + (fromCommitter ? " from Committer" : ""), partitionSession.getPath(), - partitionSession.getId(), partitionSession.getPartitionId(), messageCount, - offsetsToCommit.getStart(), offsetsToCommit.getEnd()); + logger.debug("[{}] Committing {} message(s), offsets [{},{})" + (fromCommitter ? " from Committer" : ""), + partitionSession.getFullId(), messageCount, offsetsToCommit.getStart(), offsetsToCommit.getEnd()); } return partitionSession.commitOffsetRange(offsetsToCommit); } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java index ee3dfa17..702ff1a5 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java @@ -41,9 +41,9 @@ private void add(OffsetsRange offsetRange) { rangesLock.unlock(); } } catch (RuntimeException exception) { - String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " + - partitionSession.getId() + " (partition " + partitionSession.getPartitionId() + "): " + - exception.getMessage(); + String errorMessage = "[" + partitionSession.getFullId() + "] Error adding new offset range to " + + "DeferredCommitter for partition session " + partitionSession.getId() + " (partition " + + partitionSession.getPartitionId() + "): " + exception.getMessage(); logger.error(errorMessage); throw new RuntimeException(errorMessage, exception); } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java index 0dbf09d0..6e28f86c 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java @@ -38,7 +38,9 @@ public class PartitionSessionImpl { private static final Logger logger = LoggerFactory.getLogger(PartitionSessionImpl.class); private final long id; - private final String path; + private final String fullId; + private final String topicPath; + private final String consumerName; private final long partitionId; private final PartitionSession sessionInfo; private final Executor decompressionExecutor; @@ -58,17 +60,19 @@ public class PartitionSessionImpl { private PartitionSessionImpl(Builder builder) { this.id = builder.id; - this.path = builder.path; + this.fullId = builder.fullId; + this.topicPath = builder.topicPath; + this.consumerName = builder.consumerName; this.partitionId = builder.partitionId; - this.sessionInfo = new PartitionSession(id, partitionId, path); + this.sessionInfo = new PartitionSession(id, partitionId, topicPath); this.lastReadOffset = builder.committedOffset; this.lastCommittedOffset = builder.committedOffset; this.decompressionExecutor = builder.decompressionExecutor; this.dataEventCallback = builder.dataEventCallback; this.commitFunction = builder.commitFunction; - logger.info("[{}] Partition session {} (partition {}) is started. CommittedOffset: {}. " + - "Partition offsets: {}-{}", path, id, partitionId, lastReadOffset, builder.partitionOffsets.getStart(), - builder.partitionOffsets.getEnd()); + logger.info("[{}] Partition session is started for Topic \"{}\" and Consumer \"{}\". CommittedOffset: {}. " + + "Partition offsets: {}-{}", fullId, topicPath, consumerName, lastReadOffset, + builder.partitionOffsets.getStart(), builder.partitionOffsets.getEnd()); } public static Builder newBuilder() { @@ -79,12 +83,16 @@ public long getId() { return id; } + public long getFullId() { + return id; + } + public long getPartitionId() { return partitionId; } - public String getPath() { - return path; + public String getTopicPath() { + return topicPath; } public PartitionSession getSessionInfo() { @@ -110,13 +118,11 @@ public CompletableFuture addBatches(List batchMessages = batch.getMessageDataList(); if (!batchMessages.isEmpty()) { if (logger.isDebugEnabled()) { - logger.debug("[{}] Received a batch of {} messages (offsets {} - {}) for partition session {} " + - "(partition {})", path, batchMessages.size(), batchMessages.get(0).getOffset(), - batchMessages.get(batchMessages.size() - 1).getOffset(), id, partitionId); + logger.debug("[{}] Received a batch of {} messages (offsets {} - {})", fullId, batchMessages.size(), + batchMessages.get(0).getOffset(), batchMessages.get(batchMessages.size() - 1).getOffset()); } } else { - logger.error("[{}] Received empty batch for partition session {} (partition {}). This shouldn't happen", - path, id, partitionId); + logger.error("[{}] Received empty batch. This shouldn't happen", fullId); } batchMessages.forEach(messageData -> { long commitOffsetFrom = lastReadOffset; @@ -125,14 +131,12 @@ public CompletableFuture addBatches(List lastReadOffset) { lastReadOffset = newReadOffset; if (logger.isTraceEnabled()) { - logger.trace("[{}] Received a message with offset {} for partition session {} " + - "(partition {}). lastReadOffset is now {}", path, messageOffset, id, - partitionId, lastReadOffset); + logger.trace("[{}] Received a message with offset {}. lastReadOffset is now {}", fullId, + messageOffset, lastReadOffset); } } else { - logger.error("[{}] Received a message with offset {} which is less than last read offset {} " + - "for partition session {} (partition {})", path, messageOffset, lastReadOffset, id, - partitionId); + logger.error("[{}] Received a message with offset {} which is less than last read offset {} ", + fullId, messageOffset, lastReadOffset); } newBatch.addMessage(new MessageImpl.Builder() .setBatchMeta(batchMeta) @@ -175,10 +179,9 @@ public CompletableFuture addBatches(List messages = decodingBatch.getMessages(); - logger.trace("[{}] Adding batch with offsets {}-{} to reading queue of " + - "partition session {} (partition {})", path, + logger.trace("[{}] Adding batch with offsets {}-{} to reading queue", fullId, messages.get(0).getOffset(), - messages.get(messages.size() - 1).getOffset(), id, partitionId); + messages.get(messages.size() - 1).getOffset()); } readingQueue.add(decodingBatch); haveNewBatchesReady = true; @@ -206,18 +209,16 @@ public CompletableFuture commitOffsetRange(OffsetsRange rangeToCommit) { try { if (isWorking.get()) { if (logger.isDebugEnabled()) { - logger.debug("[{}] Offset range [{}, {}) is requested to be committed for partition session {} " + - "(partition {}). Last committed offset is {} (commit lag is {})", path, - rangeToCommit.getStart(), rangeToCommit.getEnd(), id, partitionId, lastCommittedOffset, - rangeToCommit.getStart() - lastCommittedOffset); + logger.debug("[{}] Offset range [{}, {}) is requested to be committed. Last committed offset is" + + " {} (commit lag is {})", fullId, rangeToCommit.getStart(), rangeToCommit.getEnd(), + lastCommittedOffset, rangeToCommit.getStart() - lastCommittedOffset); } commitFutures.put(rangeToCommit.getEnd(), resultFuture); } else { - logger.info("[{}] Offset range [{}, {}) is requested to be committed, but partition session {} " + - "(partition {}) is already closed", path, rangeToCommit.getStart(), rangeToCommit.getEnd(), id, - partitionId); + logger.info("[{}] Offset range [{}, {}) is requested to be committed, but partition session " + + "is already closed", fullId, rangeToCommit.getStart(), rangeToCommit.getEnd()); resultFuture.completeExceptionally(new RuntimeException("Partition session " + id + " (partition " + - partitionId + ") for " + path + " is already closed")); + partitionId + ") for " + topicPath + " is already closed")); return resultFuture; } } finally { @@ -233,18 +234,16 @@ public CompletableFuture commitOffsetRange(OffsetsRange rangeToCommit) { public void commitOffsetRanges(List rangesToCommit) { if (isWorking.get()) { if (logger.isInfoEnabled()) { - StringBuilder message = new StringBuilder("[").append(path) - .append("] Sending CommitRequest for partition session ").append(id) - .append(" (partition ").append(partitionId).append(") with offset ranges "); + StringBuilder message = new StringBuilder("[").append(fullId) + .append("] Sending CommitRequest with offset ranges "); addRangesToString(message, rangesToCommit); logger.debug(message.toString()); } commitFunction.accept(rangesToCommit); } else if (logger.isInfoEnabled()) { - StringBuilder message = new StringBuilder("[").append(path).append("] Offset ranges "); + StringBuilder message = new StringBuilder("[").append(fullId).append("] Offset ranges "); addRangesToString(message, rangesToCommit); - message.append(" are requested to be committed, but partition session ").append(id) - .append(" (partition ").append(partitionId).append(") is already closed"); + message.append(" are requested to be committed, but partition session is already closed"); logger.info(message.toString()); } } @@ -261,17 +260,15 @@ private static void addRangesToString(StringBuilder stringBuilder, List> futuresToComplete = commitFutures.headMap(committedOffset, true); if (logger.isDebugEnabled()) { - logger.debug("[{}] Commit response received for partition session {} (partition {}). Committed offset: {}" + - ". Previous committed offset: {} (diff is {} message(s)). Completing {} commit futures", - path, id, partitionId, committedOffset, lastCommittedOffset, committedOffset - lastCommittedOffset, - futuresToComplete.size()); + logger.debug("[{}] Commit response received. Committed offset: {}. Previous committed offset: {} " + + "(diff is {} message(s)). Completing {} commit futures", fullId, committedOffset, + lastCommittedOffset, committedOffset - lastCommittedOffset, futuresToComplete.size()); } lastCommittedOffset = committedOffset; futuresToComplete.values().forEach(future -> future.complete(null)); @@ -280,7 +277,7 @@ public void handleCommitResponse(long committedOffset) { private void decode(Batch batch) { if (logger.isTraceEnabled()) { - logger.trace("[{}] Started decoding batch for partition session {} (partition {})", path, id, partitionId); + logger.trace("[{}] Started decoding batch", fullId); } if (batch.getCodec() == Codec.RAW) { return; @@ -292,14 +289,13 @@ private void decode(Batch batch) { message.setDecompressed(true); } catch (IOException exception) { message.setException(exception); - logger.info("[{}] Exception was thrown while decoding a message in partition session {} " + - "(partition {})", path, id, partitionId); + logger.warn("[{}] Exception was thrown while decoding a message: ", fullId, exception); } }); batch.setDecompressed(true); if (logger.isTraceEnabled()) { - logger.trace("[{}] Finished decoding batch for partition session {} (partition {})", path, id, partitionId); + logger.trace("[{}] Finished decoding batch", fullId); } } @@ -320,23 +316,22 @@ private void sendDataToReadersIfNeeded() { messageImplList.get(messageImplList.size() - 1).getOffset() + 1); DataReceivedEvent event = new DataReceivedEventImpl(this, messagesToRead, offsetsToCommit); if (logger.isDebugEnabled()) { - logger.debug("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) for partition " + - "session {} " + "(partition {}) is about to be called...", path, messagesToRead.size(), - messagesToRead.get(0).getOffset(), messagesToRead.get(messagesToRead.size() - 1).getOffset(), - id, partitionId); + logger.debug("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) is about " + + "to be called...", fullId, messagesToRead.size(), messagesToRead.get(0).getOffset(), + messagesToRead.get(messagesToRead.size() - 1).getOffset()); } dataEventCallback.apply(event) .whenComplete((res, th) -> { if (th != null) { - logger.error("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) for " + - "partition session {} (partition {}) finished with error: ", path, - messagesToRead.size(), messagesToRead.get(0).getOffset(), - messagesToRead.get(messagesToRead.size() - 1).getOffset(), id, partitionId, th); + logger.error("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) finished" + + " with error: ", fullId, messagesToRead.size(), + messagesToRead.get(0).getOffset(), + messagesToRead.get(messagesToRead.size() - 1).getOffset(), th); } else if (logger.isDebugEnabled()) { - logger.debug("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) for " + - "partition session {} (partition {}) successfully finished", path, - messagesToRead.size(), messagesToRead.get(0).getOffset(), - messagesToRead.get(messagesToRead.size() - 1).getOffset(), id, partitionId); + logger.debug("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) " + + "successfully finished", fullId, messagesToRead.size(), + messagesToRead.get(0).getOffset(), + messagesToRead.get(messagesToRead.size() - 1).getOffset()); } isReadingNow.set(false); batchToRead.complete(); @@ -344,8 +339,7 @@ private void sendDataToReadersIfNeeded() { }); } else { if (logger.isTraceEnabled()) { - logger.trace("[{}] Partition session {} (partition {}) - no need to send data to readers: " + - "reading is already being performed", path, id, partitionId); + logger.trace("[{}] No need to send data to readers: reading is already being performed", fullId); } } } @@ -355,10 +349,10 @@ public void shutdown() { try { isWorking.set(false); - logger.info("[{}] Partition session {} (partition {}) is shutting down. Failing {} commit futures...", path, - id, partitionId, commitFutures.size()); + logger.info("[{}] Partition session for {} is shutting down. Failing {} commit futures...", fullId, + topicPath, commitFutures.size()); commitFutures.values().forEach(f -> f.completeExceptionally(new RuntimeException("Partition session " + id + - " (partition " + partitionId + ") for " + path + " is closed"))); + " (partition " + partitionId + ") for " + topicPath + " is closed"))); } finally { commitFuturesLock.unlock(); } @@ -378,7 +372,9 @@ public void shutdown() { */ public static class Builder { private long id; - private String path; + private String fullId; + private String topicPath; + private String consumerName; private long partitionId; private long committedOffset; private OffsetsRange partitionOffsets; @@ -391,8 +387,18 @@ public Builder setId(long id) { return this; } - public Builder setPath(String path) { - this.path = path; + public Builder setFullId(String fullId) { + this.fullId = fullId; + return this; + } + + public Builder setTopicPath(String topicPath) { + this.topicPath = topicPath; + return this; + } + + public Builder setConsumerName(String consumerName) { + this.consumerName = consumerName; return this; } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index ae5c1ccc..a57d8b6c 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -52,6 +52,7 @@ public abstract class ReaderImpl extends GrpcStreamRetrier { // Every reading stream has a sequential number (for debug purposes) private final AtomicLong seqNumberCounter = new AtomicLong(0); + private final String consumerName; public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) { super(topicRpc.getScheduler()); @@ -70,7 +71,7 @@ public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) { message.append(" \"").append(settings.getReaderName()).append("\""); } message.append(" (generated id ").append(id).append(")"); - message.append(" created for topic(s): "); + message.append(" created for Topic(s) "); for (TopicReadSettings topic : settings.getTopics()) { if (topic != settings.getTopics().get(0)) { message.append(", "); @@ -78,9 +79,11 @@ public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) { message.append("\"").append(topic.getPath()).append("\""); } if (settings.getConsumerName() != null) { - message.append(" and Consumer: \"").append(settings.getConsumerName()).append("\""); + message.append(" and Consumer \"").append(settings.getConsumerName()); + consumerName = settings.getConsumerName(); } else { message.append(" without a Consumer"); + consumerName = "NoConsumer"; } logger.info(message.toString()); } @@ -209,7 +212,7 @@ protected CompletableFuture sendUpdateOffsetsInTransaction(YdbTransactio } protected class ReadSessionImpl extends ReadSession { - protected String sessionId = ""; + protected String sessionId; // Total size to request with next ReadRequest. // Used to group several ReadResponses in one on high rps private final AtomicLong sizeBytesToRequest = new AtomicLong(0); @@ -220,7 +223,7 @@ private ReadSessionImpl() { @Override public void startAndInitialize() { - logger.debug("[{}] Session {} startAndInitialize called", streamId, sessionId); + logger.debug("[{}] Session startAndInitialize called", streamId); start(this::processMessage).whenComplete(this::closeDueToError); YdbTopic.StreamReadMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamReadMessage.InitRequest @@ -273,15 +276,13 @@ private void sendReadRequest() { private void sendStartPartitionSessionResponse(PartitionSessionImpl partitionSession, StartPartitionSessionSettings startSettings) { if (!isWorking.get()) { - logger.info("[{}] Need to send StartPartitionSessionResponse for partition session {} (partition {})," + - " but reading session is already closed", streamId, partitionSession.getId(), - partitionSession.getPartitionId()); + logger.info("[{}] Need to send StartPartitionSessionResponse, but reading session is already closed", + partitionSession.getFullId()); return; } if (!partitionSessions.containsKey(partitionSession.getId())) { - logger.info("[{}] Need to send StartPartitionSessionResponse for partition session {} (partition {})," + - " but have no such partition session active", streamId, partitionSession.getId(), - partitionSession.getPartitionId()); + logger.info("[{}] Need to send StartPartitionSessionResponse, but have no such active partition " + + "session anymore", partitionSession.getFullId()); return; } YdbTopic.StreamReadMessage.StartPartitionSessionResponse.Builder responseBuilder = @@ -301,9 +302,8 @@ private void sendStartPartitionSessionResponse(PartitionSessionImpl partitionSes partitionSession.setLastCommittedOffset(userDefinedCommitOffset); } } - logger.info("[{}] Sending StartPartitionSessionResponse for partition session {} (partition {})" + - " with readOffset {} and commitOffset {}", streamId, partitionSession.getId(), - partitionSession.getPartitionId(), userDefinedReadOffset, userDefinedCommitOffset); + logger.info("[{}] Sending StartPartitionSessionResponse with readOffset {} and commitOffset {}", + partitionSession.getFullId(), userDefinedReadOffset, userDefinedCommitOffset); send(YdbTopic.StreamReadMessage.FromClient.newBuilder() .setStartPartitionSessionResponse(responseBuilder.build()) .build()); @@ -318,8 +318,7 @@ private void sendStopPartitionSessionResponse(long partitionSessionId) { PartitionSessionImpl partitionSession = partitionSessions.remove(partitionSessionId); if (partitionSession != null) { partitionSession.shutdown(); - logger.info("[{}] Sending StopPartitionSessionResponse for partition session {} (partition {})", - streamId, partitionSessionId, partitionSession.getPartitionId()); + logger.info("[{}] Sending StopPartitionSessionResponse", partitionSession.getFullId()); } else { logger.warn("[{}] Sending StopPartitionSessionResponse for partition session {}, " + "but have no such partition session active", streamId, partitionSessionId); @@ -392,14 +391,17 @@ private void onInitResponse(YdbTopic.StreamReadMessage.InitResponse response) { private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest request) { long partitionSessionId = request.getPartitionSession().getPartitionSessionId(); long partitionId = request.getPartitionSession().getPartitionId(); - logger.info("[{}] Received StartPartitionSessionRequest: partition session {} (partition {}) " + - "with committedOffset {} and partitionOffsets [{}-{})", streamId, + String partitionSessionFullId = streamId + '/' + partitionSessionId + "-p" + partitionId; + logger.info("[{}] Received StartPartitionSessionRequest for Topic \"{}\" and Consumer \"{}\". " + + "Partition session {} (partition {}) with committedOffset {} and partitionOffsets [{}-{})", + partitionSessionFullId, request.getPartitionSession().getPath(), consumerName, partitionSessionId, partitionId, request.getCommittedOffset(), request.getPartitionOffsets().getStart(), request.getPartitionOffsets().getEnd()); PartitionSessionImpl partitionSession = PartitionSessionImpl.newBuilder() .setId(partitionSessionId) - .setPath(request.getPartitionSession().getPath()) + .setFullId(partitionSessionFullId) + .setTopicPath(request.getPartitionSession().getPath()) .setPartitionId(partitionId) .setCommittedOffset(request.getCommittedOffset()) .setPartitionOffsets(new OffsetsRangeImpl(request.getPartitionOffsets().getStart(), @@ -418,8 +420,7 @@ protected void onStopPartitionSessionRequest(YdbTopic.StreamReadMessage.StopPart if (request.getGraceful()) { PartitionSessionImpl partitionSession = partitionSessions.get(request.getPartitionSessionId()); if (partitionSession != null) { - logger.info("[{}] Received graceful StopPartitionSessionRequest for partition session {} " + - "(partition {})", streamId, partitionSession.getId(), partitionSession.getPartitionId()); + logger.info("[{}] Received graceful StopPartitionSessionRequest", partitionSession.getFullId()); handleStopPartitionSession(request, partitionSession.getSessionInfo(), () -> sendStopPartitionSessionResponse(request.getPartitionSessionId())); } else { @@ -433,8 +434,7 @@ protected void onStopPartitionSessionRequest(YdbTopic.StreamReadMessage.StopPart } else { PartitionSessionImpl partitionSession = partitionSessions.remove(request.getPartitionSessionId()); if (partitionSession != null) { - logger.info("[{}] Received force StopPartitionSessionRequest for partition session {} (partition " + - "{})", streamId, partitionSession.getId(), partitionSession.getPartitionId()); + logger.info("[{}] Received force StopPartitionSessionRequest", partitionSession.getFullId()); closePartitionSession(partitionSession); } else { logger.info("[{}] Received force StopPartitionSessionRequest for partition session {}, " + diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 24f4e108..9a52e9a9 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -336,7 +336,7 @@ protected void onShutdown(String reason) { } private class WriteSessionImpl extends WriteSession { - protected String sessionId = ""; + protected String sessionId; private final MessageSender messageSender; private final AtomicBoolean isInitialized = new AtomicBoolean(false); @@ -347,7 +347,7 @@ private WriteSessionImpl() { @Override public void startAndInitialize() { - logger.debug("[{}] Session {} startAndInitialize called", streamId, sessionId); + logger.debug("[{}] Session startAndInitialize called", streamId); start(this::processMessage).whenComplete(this::closeDueToError); YdbTopic.StreamWriteMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamWriteMessage.InitRequest @@ -375,8 +375,7 @@ public void startAndInitialize() { private void sendDataRequestIfNeeded() { while (true) { if (!isInitialized.get()) { - logger.debug("[{}] Can't send data: current session is not yet initialized", - streamId); + logger.debug("[{}] Can't send data: current session is not yet initialized", streamId); return; } if (!isWorking.get()) { @@ -412,9 +411,11 @@ private void sendDataRequestIfNeeded() { private void onInitResponse(YdbTopic.StreamWriteMessage.InitResponse response) { sessionId = response.getSessionId(); - logger.info("[{}] Session {} initialized", streamId, sessionId); long lastSeqNo = response.getLastSeqNo(); long actualLastSeqNo = lastSeqNo; + logger.info("[{}] Session with id {} (partition {}) initialized for topic \"{}\", lastSeqNo {}," + + " actualLastSeqNo {}", streamId, sessionId, response.getPartitionId(), + settings.getTopicPath(), lastSeqNo, actualLastSeqNo); // If there are messages that were already sent before reconnect but haven't received acks, // their highest seqNo should also be taken in consideration when calculating next seqNo automatically if (!sentMessages.isEmpty()) { @@ -425,7 +426,7 @@ private void onInitResponse(YdbTopic.StreamWriteMessage.InitResponse response) { // TODO: remember supported codecs for further validation if (!sentMessages.isEmpty()) { // resending messages that haven't received acks yet - logger.info("Resending {} messages that haven't received ack's yet into new session...", + logger.info("[{}] Resending {} messages that haven't received ack's yet into new session...", streamId, sentMessages.size()); messageSender.sendMessages(sentMessages); }