From ccb2a177404e7089212ac87bc3a465fd7507f5fc Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 13 Jan 2025 15:59:50 +0000 Subject: [PATCH 1/2] Added lost tracing --- .../java/tech/ydb/core/impl/call/ReadWriteStreamCall.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java b/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java index 609e3eb7..761a3610 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java +++ b/core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java @@ -88,6 +88,10 @@ public CompletableFuture start(Observer observer) { public void sendNext(W message) { synchronized (call) { if (flush()) { + if (logger.isTraceEnabled()) { + String msg = TextFormat.shortDebugString((Message) message); + logger.trace("ReadWriteStreamCall[{}] --> {}", traceId, msg); + } call.sendMessage(message); } else { messagesQueue.add(message); @@ -103,7 +107,8 @@ private boolean flush() { } if (logger.isTraceEnabled()) { - logger.trace("ReadWriteStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) next)); + String msg = TextFormat.shortDebugString((Message) next); + logger.trace("ReadWriteStreamCall[{}] --> {}", traceId, msg); } call.sendMessage(next); } From 9c9863925aa2a904294c8d370e17fb4e7703829c Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 13 Jan 2025 16:12:51 +0000 Subject: [PATCH 2/2] Set traceId for topic sessions --- .../main/java/tech/ydb/topic/TopicRpc.java | 8 +- .../tech/ydb/topic/impl/GrpcTopicRpc.java | 24 +++--- .../tech/ydb/topic/read/impl/ReadSession.java | 7 +- .../tech/ydb/topic/read/impl/ReaderImpl.java | 80 +++++++++---------- .../ydb/topic/write/impl/WriteSession.java | 7 +- .../tech/ydb/topic/write/impl/WriterImpl.java | 39 +++++---- 6 files changed, 87 insertions(+), 78 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/TopicRpc.java b/topic/src/main/java/tech/ydb/topic/TopicRpc.java index 556388cc..6adeeef2 100644 --- a/topic/src/main/java/tech/ydb/topic/TopicRpc.java +++ b/topic/src/main/java/tech/ydb/topic/TopicRpc.java @@ -75,9 +75,13 @@ CompletableFuture> describeConsumer( CompletableFuture updateOffsetsInTransaction(YdbTopic.UpdateOffsetsInTransactionRequest request, GrpcRequestSettings settings); - GrpcReadWriteStream writeSession(); + GrpcReadWriteStream writeSession( + String traceId + ); - GrpcReadWriteStream readSession(); + GrpcReadWriteStream readSession( + String traceId + ); ScheduledExecutorService getScheduler(); } diff --git a/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java b/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java index 3789b398..68ca6223 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java +++ b/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java @@ -91,21 +91,21 @@ public CompletableFuture updateOffsetsInTransaction(YdbTopic.UpdateOffse } @Override - public GrpcReadWriteStream< - YdbTopic.StreamWriteMessage.FromServer, - YdbTopic.StreamWriteMessage.FromClient - > writeSession() { - return transport.readWriteStreamCall(TopicServiceGrpc.getStreamWriteMethod(), - GrpcRequestSettings.newBuilder().build()); + public GrpcReadWriteStream + writeSession(String streamId) { + return transport.readWriteStreamCall( + TopicServiceGrpc.getStreamWriteMethod(), + GrpcRequestSettings.newBuilder().withTraceId(streamId).build() + ); } @Override - public GrpcReadWriteStream< - YdbTopic.StreamReadMessage.FromServer, - YdbTopic.StreamReadMessage.FromClient - > readSession() { - return transport.readWriteStreamCall(TopicServiceGrpc.getStreamReadMethod(), - GrpcRequestSettings.newBuilder().build()); + public GrpcReadWriteStream + readSession(String streamId) { + return transport.readWriteStreamCall( + TopicServiceGrpc.getStreamReadMethod(), + GrpcRequestSettings.newBuilder().withTraceId(streamId).build() + ); } @Override diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java index 81cb10e2..52d26fcc 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java @@ -15,8 +15,11 @@ public abstract class ReadSession extends SessionBase { private static final Logger logger = LoggerFactory.getLogger(ReadSession.class); - public ReadSession(TopicRpc rpc) { - super(rpc.readSession()); + protected final String streamId; + + public ReadSession(TopicRpc rpc, String streamId) { + super(rpc.readSession(streamId)); + this.streamId = streamId; } @Override diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index ba1851f7..ae5c1ccc 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -66,7 +66,7 @@ public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) { this.decompressionExecutor = defaultDecompressionExecutorService; } StringBuilder message = new StringBuilder("Reader"); - if (settings.getReaderName() != null && !settings.getReaderName().isEmpty()) { + if (settings.getReaderName() != null) { message.append(" \"").append(settings.getReaderName()).append("\""); } message.append(" (generated id ").append(id).append(")"); @@ -167,12 +167,12 @@ protected CompletableFuture sendUpdateOffsetsInTransaction(YdbTransactio if (error != null) { currentSession.closeDueToError(null, new RuntimeException("Restarting read session due to transaction " + transaction.getId() + - " with partition offsets from read session " + currentSession.fullId + + " with partition offsets from read session " + currentSession.streamId + " was not committed with reason: " + error)); } else if (!status.isSuccess()) { currentSession.closeDueToError(null, new RuntimeException("Restarting read session due to transaction " + transaction.getId() + - " with partition offsets from read session " + currentSession.fullId + + " with partition offsets from read session " + currentSession.streamId + " was not committed with status: " + status)); } }); @@ -210,18 +210,17 @@ protected CompletableFuture sendUpdateOffsetsInTransaction(YdbTransactio protected class ReadSessionImpl extends ReadSession { protected String sessionId = ""; - private final String fullId; // Total size to request with next ReadRequest. // Used to group several ReadResponses in one on high rps private final AtomicLong sizeBytesToRequest = new AtomicLong(0); private final Map partitionSessions = new ConcurrentHashMap<>(); private ReadSessionImpl() { - super(topicRpc); - this.fullId = id + '.' + seqNumberCounter.incrementAndGet(); + super(topicRpc, id + '.' + seqNumberCounter.incrementAndGet()); } + @Override public void startAndInitialize() { - logger.debug("[{}] Session {} startAndInitialize called", fullId, sessionId); + logger.debug("[{}] Session {} startAndInitialize called", streamId, sessionId); start(this::processMessage).whenComplete(this::closeDueToError); YdbTopic.StreamReadMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamReadMessage.InitRequest @@ -245,8 +244,9 @@ public void startAndInitialize() { initRequestBuilder.addTopicsReadSettings(settingsBuilder); }); - if (settings.getReaderName() != null && !settings.getReaderName().isEmpty()) { - initRequestBuilder.setReaderName(settings.getReaderName()); + String readerName = settings.getReaderName(); + if (readerName != null && !readerName.isEmpty()) { + initRequestBuilder.setReaderName(readerName); } send(YdbTopic.StreamReadMessage.FromClient.newBuilder() @@ -257,11 +257,11 @@ public void startAndInitialize() { private void sendReadRequest() { long currentSizeBytesToRequest = sizeBytesToRequest.getAndSet(0); if (currentSizeBytesToRequest <= 0) { - logger.debug("[{}] Nothing to request in DataRequest. sizeBytesToRequest == {}", fullId, + logger.debug("[{}] Nothing to request in DataRequest. sizeBytesToRequest == {}", streamId, currentSizeBytesToRequest); return; } - logger.debug("[{}] Sending DataRequest with {} bytes", fullId, currentSizeBytesToRequest); + logger.debug("[{}] Sending DataRequest with {} bytes", streamId, currentSizeBytesToRequest); send(YdbTopic.StreamReadMessage.FromClient.newBuilder() .setReadRequest(YdbTopic.StreamReadMessage.ReadRequest.newBuilder() @@ -274,13 +274,13 @@ private void sendStartPartitionSessionResponse(PartitionSessionImpl partitionSes StartPartitionSessionSettings startSettings) { if (!isWorking.get()) { logger.info("[{}] Need to send StartPartitionSessionResponse for partition session {} (partition {})," + - " but reading session is already closed", fullId, partitionSession.getId(), + " but reading session is already closed", streamId, partitionSession.getId(), partitionSession.getPartitionId()); return; } if (!partitionSessions.containsKey(partitionSession.getId())) { logger.info("[{}] Need to send StartPartitionSessionResponse for partition session {} (partition {})," + - " but have no such partition session active", fullId, partitionSession.getId(), + " but have no such partition session active", streamId, partitionSession.getId(), partitionSession.getPartitionId()); return; } @@ -302,7 +302,7 @@ private void sendStartPartitionSessionResponse(PartitionSessionImpl partitionSes } } logger.info("[{}] Sending StartPartitionSessionResponse for partition session {} (partition {})" + - " with readOffset {} and commitOffset {}", fullId, partitionSession.getId(), + " with readOffset {} and commitOffset {}", streamId, partitionSession.getId(), partitionSession.getPartitionId(), userDefinedReadOffset, userDefinedCommitOffset); send(YdbTopic.StreamReadMessage.FromClient.newBuilder() .setStartPartitionSessionResponse(responseBuilder.build()) @@ -312,17 +312,17 @@ private void sendStartPartitionSessionResponse(PartitionSessionImpl partitionSes private void sendStopPartitionSessionResponse(long partitionSessionId) { if (!isWorking.get()) { logger.info("[{}] Need to send StopPartitionSessionResponse for partition session {}, " + - "but reading session is already closed", fullId, partitionSessionId); + "but reading session is already closed", streamId, partitionSessionId); return; } PartitionSessionImpl partitionSession = partitionSessions.remove(partitionSessionId); if (partitionSession != null) { partitionSession.shutdown(); - logger.info("[{}] Sending StopPartitionSessionResponse for partition session {} (partition {})", fullId, - partitionSessionId, partitionSession.getPartitionId()); + logger.info("[{}] Sending StopPartitionSessionResponse for partition session {} (partition {})", + streamId, partitionSessionId, partitionSession.getPartitionId()); } else { logger.warn("[{}] Sending StopPartitionSessionResponse for partition session {}, " + - "but have no such partition session active", fullId, partitionSessionId); + "but have no such partition session active", streamId, partitionSessionId); } send(YdbTopic.StreamReadMessage.FromClient.newBuilder() .setStopPartitionSessionResponse( @@ -336,7 +336,7 @@ private void sendCommitOffsetRequest(long partitionSessionId, long partitionId, List rangesToCommit) { if (!isWorking.get()) { if (logger.isInfoEnabled()) { - StringBuilder message = new StringBuilder("[").append(fullId) + StringBuilder message = new StringBuilder("[").append(streamId) .append("] Need to send CommitRequest for partition session ").append(partitionSessionId) .append(" (partition ").append(partitionId).append(") with offset ranges "); for (int i = 0; i < rangesToCommit.size(); i++) { @@ -384,7 +384,7 @@ private void onInitResponse(YdbTopic.StreamReadMessage.InitResponse response) { initResultFutureRef.get().complete(null); } sizeBytesToRequest.set(settings.getMaxMemoryUsageBytes()); - logger.info("[{}] Session {} initialized. Requesting {} bytes...", fullId, sessionId, + logger.info("[{}] Session {} initialized. Requesting {} bytes...", streamId, sessionId, settings.getMaxMemoryUsageBytes()); sendReadRequest(); } @@ -393,7 +393,7 @@ private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPart long partitionSessionId = request.getPartitionSession().getPartitionSessionId(); long partitionId = request.getPartitionSession().getPartitionId(); logger.info("[{}] Received StartPartitionSessionRequest: partition session {} (partition {}) " + - "with committedOffset {} and partitionOffsets [{}-{})", fullId, + "with committedOffset {} and partitionOffsets [{}-{})", streamId, partitionSessionId, partitionId, request.getCommittedOffset(), request.getPartitionOffsets().getStart(), request.getPartitionOffsets().getEnd()); @@ -419,12 +419,12 @@ protected void onStopPartitionSessionRequest(YdbTopic.StreamReadMessage.StopPart PartitionSessionImpl partitionSession = partitionSessions.get(request.getPartitionSessionId()); if (partitionSession != null) { logger.info("[{}] Received graceful StopPartitionSessionRequest for partition session {} " + - "(partition {})", fullId, partitionSession.getId(), partitionSession.getPartitionId()); + "(partition {})", streamId, partitionSession.getId(), partitionSession.getPartitionId()); handleStopPartitionSession(request, partitionSession.getSessionInfo(), () -> sendStopPartitionSessionResponse(request.getPartitionSessionId())); } else { logger.error("[{}] Received graceful StopPartitionSessionRequest for partition session {}, " + - "but have no such partition session active", fullId, request.getPartitionSessionId()); + "but have no such partition session active", streamId, request.getPartitionSessionId()); closeDueToError(null, new RuntimeException("Restarting read session due to receiving " + "StopPartitionSessionRequest with PartitionSessionId " + @@ -434,18 +434,18 @@ protected void onStopPartitionSessionRequest(YdbTopic.StreamReadMessage.StopPart PartitionSessionImpl partitionSession = partitionSessions.remove(request.getPartitionSessionId()); if (partitionSession != null) { logger.info("[{}] Received force StopPartitionSessionRequest for partition session {} (partition " + - "{})", fullId, partitionSession.getId(), partitionSession.getPartitionId()); + "{})", streamId, partitionSession.getId(), partitionSession.getPartitionId()); closePartitionSession(partitionSession); } else { logger.info("[{}] Received force StopPartitionSessionRequest for partition session {}, " + - "but have no such partition session running", fullId, request.getPartitionSessionId()); + "but have no such partition session running", streamId, request.getPartitionSessionId()); } } } private void onReadResponse(YdbTopic.StreamReadMessage.ReadResponse readResponse) { final long responseBytesSize = readResponse.getBytesSize(); - logger.trace("[{}] Received ReadResponse of {} bytes", fullId, responseBytesSize); + logger.trace("[{}] Received ReadResponse of {} bytes", streamId, responseBytesSize); List> batchReadFutures = new ArrayList<>(); readResponse.getPartitionDataList().forEach( (YdbTopic.StreamReadMessage.ReadResponse.PartitionData data) -> { @@ -457,28 +457,28 @@ private void onReadResponse(YdbTopic.StreamReadMessage.ReadResponse readResponse batchReadFutures.add(readFuture); } else { logger.info("[{}] Received PartitionData for unknown(most likely already closed) " + - "PartitionSessionId={}", fullId, partitionId); + "PartitionSessionId={}", streamId, partitionId); } }); CompletableFuture.allOf(batchReadFutures.toArray(new CompletableFuture[0])) .whenComplete((res, th) -> { if (th != null) { - logger.error("[{}] Exception while waiting for batches to be read:", fullId, th); + logger.error("[{}] Exception while waiting for batches to be read:", streamId, th); } if (isWorking.get()) { logger.trace("[{}] Finished handling ReadResponse of {} bytes. Sending ReadRequest...", - fullId, responseBytesSize); + streamId, responseBytesSize); this.sizeBytesToRequest.addAndGet(responseBytesSize); sendReadRequest(); } else { logger.trace("[{}] Finished handling ReadResponse of {} bytes. Read session is already " + - "closed -- no need to send ReadRequest", fullId, responseBytesSize); + "closed -- no need to send ReadRequest", streamId, responseBytesSize); } }); } protected void onCommitOffsetResponse(YdbTopic.StreamReadMessage.CommitOffsetResponse response) { - logger.trace("[{}] Received CommitOffsetResponse", fullId); + logger.trace("[{}] Received CommitOffsetResponse", streamId); for (YdbTopic.StreamReadMessage.CommitOffsetResponse.PartitionCommittedOffset partitionCommittedOffset : response.getPartitionsCommittedOffsetsList()) { PartitionSessionImpl partitionSession = @@ -491,7 +491,7 @@ protected void onCommitOffsetResponse(YdbTopic.StreamReadMessage.CommitOffsetRes partitionSession.getSessionInfo()); } else { logger.info("[{}] Received CommitOffsetResponse for unknown (most likely already closed) " + - "partition session with id={}", fullId, + "partition session with id={}", streamId, partitionCommittedOffset.getPartitionSessionId()); } } @@ -501,7 +501,7 @@ protected void onPartitionSessionStatusResponse( YdbTopic.StreamReadMessage.PartitionSessionStatusResponse response) { PartitionSessionImpl partitionSession = partitionSessions.get(response.getPartitionSessionId()); logger.info("[{}] Received PartitionSessionStatusResponse: partition session {} (partition {})." + - " Partition offsets: [{}, {}). Committed offset: {}", fullId, + " Partition offsets: [{}, {}). Committed offset: {}", streamId, response.getPartitionSessionId(), partitionSession == null ? "unknown" : partitionSession.getPartitionId(), response.getPartitionOffsets().getStart(), response.getPartitionOffsets().getEnd(), @@ -510,16 +510,16 @@ protected void onPartitionSessionStatusResponse( private void processMessage(YdbTopic.StreamReadMessage.FromServer message) { if (!isWorking.get()) { - logger.debug("[{}] processMessage called, but read session is already closed", fullId); + logger.debug("[{}] processMessage called, but read session is already closed", streamId); return; } - logger.debug("[{}] processMessage called", fullId); + logger.debug("[{}] processMessage called", streamId); if (message.getStatus() == StatusCodesProtos.StatusIds.StatusCode.SUCCESS) { reconnectCounter.set(0); } else { Status status = Status.of(StatusCode.fromProto(message.getStatus()), Issue.fromPb(message.getIssuesList())); - logger.warn("[{}] Got non-success status in processMessage method: {}", fullId, status); + logger.warn("[{}] Got non-success status in processMessage method: {}", streamId, status); closeDueToError(status, null); return; } @@ -537,14 +537,14 @@ private void processMessage(YdbTopic.StreamReadMessage.FromServer message) { } else if (message.hasPartitionSessionStatusResponse()) { onPartitionSessionStatusResponse(message.getPartitionSessionStatusResponse()); } else if (message.hasUpdateTokenResponse()) { - logger.debug("[{}] Received UpdateTokenResponse", fullId); + logger.debug("[{}] Received UpdateTokenResponse", streamId); } else { - logger.error("[{}] Unhandled message from server: {}", fullId, message); + logger.error("[{}] Unhandled message from server: {}", streamId, message); } } protected void closeDueToError(Status status, Throwable th) { - logger.info("[{}] Session {} closeDueToError called", fullId, sessionId); + logger.info("[{}] Session {} closeDueToError called", streamId, sessionId); if (shutdown()) { // Signal reader to retry onSessionClosed(status, th); @@ -553,7 +553,7 @@ protected void closeDueToError(Status status, Throwable th) { @Override protected void onStop() { - logger.debug("[{}] Session {} onStop called", fullId, sessionId); + logger.debug("[{}] Session {} onStop called", streamId, sessionId); closePartitionSessions(); } } diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java index 6d1618d1..eba3b5b6 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java @@ -14,8 +14,11 @@ public abstract class WriteSession extends SessionBase { private static final Logger logger = LoggerFactory.getLogger(WriteSession.class); - public WriteSession(TopicRpc rpc) { - super(rpc.writeSession()); + protected final String streamId; + + public WriteSession(TopicRpc rpc, String streamId) { + super(rpc.writeSession(streamId)); + this.streamId = streamId; } @Override diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 5c5e4852..24f4e108 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -337,18 +337,17 @@ protected void onShutdown(String reason) { private class WriteSessionImpl extends WriteSession { protected String sessionId = ""; - private final String fullId; private final MessageSender messageSender; private final AtomicBoolean isInitialized = new AtomicBoolean(false); private WriteSessionImpl() { - super(topicRpc); - this.fullId = id + '.' + sessionSeqNumberCounter.incrementAndGet(); + super(topicRpc, id + '.' + sessionSeqNumberCounter.incrementAndGet()); this.messageSender = new MessageSender(settings); } + @Override public void startAndInitialize() { - logger.debug("[{}] Session {} startAndInitialize called", fullId, sessionId); + logger.debug("[{}] Session {} startAndInitialize called", streamId, sessionId); start(this::processMessage).whenComplete(this::closeDueToError); YdbTopic.StreamWriteMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamWriteMessage.InitRequest @@ -377,43 +376,43 @@ private void sendDataRequestIfNeeded() { while (true) { if (!isInitialized.get()) { logger.debug("[{}] Can't send data: current session is not yet initialized", - fullId); + streamId); return; } if (!isWorking.get()) { logger.debug("[{}] Can't send data: current session has been already stopped", - fullId); + streamId); return; } Queue messages; if (sendingQueue.isEmpty()) { - logger.trace("[{}] Nothing to send -- sendingQueue is empty", fullId); + logger.trace("[{}] Nothing to send -- sendingQueue is empty", streamId); return; } if (!writeRequestInProgress.compareAndSet(false, true)) { - logger.debug("[{}] Send request is already in progress", fullId); + logger.debug("[{}] Send request is already in progress", streamId); return; } // This code can be run in one thread at a time due to acquiring writeRequestInProgress messages = new LinkedList<>(sendingQueue); // Checking second time under writeRequestInProgress "lock" if (messages.isEmpty()) { - logger.debug("[{}] Nothing to send -- sendingQueue is empty #2", fullId); + logger.debug("[{}] Nothing to send -- sendingQueue is empty #2", streamId); } else { sendingQueue.removeAll(messages); sentMessages.addAll(messages); messageSender.sendMessages(messages); - logger.debug("[{}] Sent {} messages to server", fullId, messages.size()); + logger.debug("[{}] Sent {} messages to server", streamId, messages.size()); } if (!writeRequestInProgress.compareAndSet(true, false)) { - logger.error("[{}] Couldn't turn off writeRequestInProgress. Should not happen", fullId); + logger.error("[{}] Couldn't turn off writeRequestInProgress. Should not happen", streamId); } } } private void onInitResponse(YdbTopic.StreamWriteMessage.InitResponse response) { sessionId = response.getSessionId(); - logger.info("[{}] Session {} initialized", fullId, sessionId); + logger.info("[{}] Session {} initialized", streamId, sessionId); long lastSeqNo = response.getLastSeqNo(); long actualLastSeqNo = lastSeqNo; // If there are messages that were already sent before reconnect but haven't received acks, @@ -440,7 +439,7 @@ private void onInitResponse(YdbTopic.StreamWriteMessage.InitResponse response) { // Shouldn't be called more than once at a time due to grpc guarantees private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) { List acks = response.getAcksList(); - logger.debug("[{}] Received WriteResponse with {} WriteAcks", fullId, acks.size()); + logger.debug("[{}] Received WriteResponse with {} WriteAcks", streamId, acks.size()); int inFlightFreed = 0; long bytesFreed = 0; for (YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack : acks) { @@ -459,7 +458,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) if (sentMessage.getSeqNo() < ack.getSeqNo()) { // An older message hasn't received an Ack while a newer message has logger.warn("[{}] Received an ack for seqNo {}, but the oldest seqNo waiting for ack is {}", - fullId, ack.getSeqNo(), sentMessage.getSeqNo()); + streamId, ack.getSeqNo(), sentMessage.getSeqNo()); sentMessage.getFuture().completeExceptionally( new RuntimeException("Didn't get ack from server for this message")); inFlightFreed++; @@ -468,7 +467,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) // Checking next message waiting for ack } else { logger.warn("[{}] Received an ack with seqNo {} which is older than the oldest message with " + - "seqNo {} waiting for ack", fullId, ack.getSeqNo(), sentMessage.getSeqNo()); + "seqNo {} waiting for ack", streamId, ack.getSeqNo(), sentMessage.getSeqNo()); break; } } @@ -477,13 +476,13 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) } private void processMessage(YdbTopic.StreamWriteMessage.FromServer message) { - logger.debug("[{}] processMessage called", fullId); + logger.debug("[{}] processMessage called", streamId); if (message.getStatus() == StatusCodesProtos.StatusIds.StatusCode.SUCCESS) { reconnectCounter.set(0); } else { Status status = Status.of(StatusCode.fromProto(message.getStatus()), Issue.fromPb(message.getIssuesList())); - logger.warn("[{}] Got non-success status in processMessage method: {}", fullId, status); + logger.warn("[{}] Got non-success status in processMessage method: {}", streamId, status); closeDueToError(status, null); return; } @@ -496,7 +495,7 @@ private void processMessage(YdbTopic.StreamWriteMessage.FromServer message) { private void processWriteAck(EnqueuedMessage message, YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack) { - logger.debug("[{}] Received WriteAck with seqNo {} and status {}", fullId, ack.getSeqNo(), + logger.debug("[{}] Received WriteAck with seqNo {} and status {}", streamId, ack.getSeqNo(), ack.getMessageWriteStatusCase()); WriteAck resultAck; switch (ack.getMessageWriteStatusCase()) { @@ -526,7 +525,7 @@ private void processWriteAck(EnqueuedMessage message, } private void closeDueToError(Status status, Throwable th) { - logger.info("[{}] Session {} closeDueToError called", fullId, sessionId); + logger.info("[{}] Session {} closeDueToError called", streamId, sessionId); if (shutdown()) { // Signal writer to retry onSessionClosed(status, th); @@ -535,7 +534,7 @@ private void closeDueToError(Status status, Throwable th) { @Override protected void onStop() { - logger.debug("[{}] Session {} onStop called", fullId, sessionId); + logger.debug("[{}] Session {} onStop called", streamId, sessionId); } } }