Skip to content

Commit

Permalink
WIP reduce verbosity
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 committed May 3, 2024
1 parent 277e0b4 commit 87c6408
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
import org.apache.pulsar.common.util.DateFormatter;
import org.jetbrains.annotations.NotNull;

@Slf4j
Expand Down Expand Up @@ -351,7 +354,6 @@ public void producerCreated(ServerCnx cnx, Producer producer, Map<String, String
if (traceLevel == TraceLevel.OFF) return;

Map<String, Object> traceDetails = new TreeMap<>();
traceDetails.put("serverCnx", getConnectionDetails(cnx));
traceDetails.put("producer", getProducerDetails(producer, traceSchema));
traceDetails.put("metadata", metadata);

Expand All @@ -365,10 +367,17 @@ public void producerClosed(ServerCnx cnx, Producer producer, Map<String, String>
if (traceLevel == TraceLevel.OFF) return;

Map<String, Object> traceDetails = new TreeMap<>();
traceDetails.put("serverCnx", getConnectionDetails(cnx));
traceDetails.put("producer", getProducerDetails(producer, traceSchema));
traceDetails.put("metadata", metadata);

PublisherStatsImpl stats = producer.getStats();
traceDetails.put("connectedSince", stats.getConnectedSince());
traceDetails.put("closedAt", DateFormatter.now());
traceDetails.put("averageMsgSize", stats.getAverageMsgSize());
traceDetails.put("msgRateIn", stats.getMsgRateIn());
traceDetails.put("msgThroughputIn", stats.getMsgThroughputIn());
// no message count in stats? stats.getCount() is not it

trace(EventReasons.ADMINISTRATIVE, "Producer closed", traceDetails);
}

Expand All @@ -379,7 +388,6 @@ public void consumerCreated(ServerCnx cnx, Consumer consumer, Map<String, String
if (traceLevel == TraceLevel.OFF) return;

Map<String, Object> traceDetails = new TreeMap<>();
traceDetails.put("serverCnx", getConnectionDetails(cnx));
traceDetails.put("consumer", getConsumerDetails(consumer));
traceDetails.put("subscription", getSubscriptionDetails(consumer.getSubscription()));
traceDetails.put("metadata", metadata);
Expand All @@ -394,11 +402,22 @@ public void consumerClosed(ServerCnx cnx, Consumer consumer, Map<String, String>
if (traceLevel == TraceLevel.OFF) return;

Map<String, Object> traceDetails = new TreeMap<>();
traceDetails.put("serverCnx", getConnectionDetails(cnx));
traceDetails.put("consumer", getConsumerDetails(consumer));
traceDetails.put("subscription", getSubscriptionDetails(consumer.getSubscription()));
traceDetails.put("metadata", metadata);

ConsumerStatsImpl stats = consumer.getStats();

traceDetails.put("connectedSince", stats.getConnectedSince());
traceDetails.put("closedAt", DateFormatter.now());
traceDetails.put("averageMsgSize", stats.getAvgMessagesPerEntry());
traceDetails.put("msgRateOut", stats.getMsgRateOut());
traceDetails.put("msgThroughputOut", stats.getMsgThroughputOut());
traceDetails.put("msgOutCounter", stats.getMsgOutCounter());
traceDetails.put("bytesOutCounter", stats.getBytesOutCounter());
traceDetails.put("unackedMessages", stats.getUnackedMessages());
traceDetails.put("messageAckRate", stats.getMessageAckRate());

trace(EventReasons.ADMINISTRATIVE, "Consumer closed", traceDetails);
}

Expand Down Expand Up @@ -452,7 +471,7 @@ public void beforeSendMessage(
traceDetails.put("entry", getEntryDetails(entry, maxBinaryDataLength));
traceDetails.put("messageMetadata", getMessageMetadataDetails(msgMetadata));

trace(EventReasons.MESSAGE, "Before sending message", traceDetails);
trace(EventReasons.MESSAGE, "Message read", traceDetails);
}

public void onMessagePublish(
Expand All @@ -472,7 +491,7 @@ public void onMessagePublish(
"headersAndPayload", headersAndPayload, headersAndPayloadDetails, maxBinaryDataLength);
traceDetails.put("payload", headersAndPayloadDetails);

trace(EventReasons.MESSAGE, "Message publish", traceDetails);
trace(EventReasons.MESSAGE, "Message received", traceDetails);
}

public void messageProduced(
Expand All @@ -493,7 +512,7 @@ public void messageProduced(
traceDetails.put("publishContext", getPublishContextDetails(publishContext));
traceDetails.put("messageId", ledgerId + ":" + entryId);
traceDetails.put("startTimeNs", startTimeNs);
trace(EventReasons.MESSAGE, "Message produced", traceDetails);
trace(EventReasons.MESSAGE, "Message stored", traceDetails);
}

public void messageDispatched(
Expand All @@ -504,17 +523,18 @@ public void messageDispatched(
if (level == TraceLevel.OFF) return;

Map<String, Object> traceDetails = new TreeMap<>();
traceDetails.put("serverCnx", getConnectionDetails(cnx));
traceDetails.put("consumer", getConsumerDetails(consumer));
traceDetails.put("subscription", getSubscriptionDetails(consumer.getSubscription()));
if (consumer != null) {
traceDetails.put("subscription", getSubscriptionDetails(consumer.getSubscription()));
}
traceDetails.put("messageId", ledgerId + ":" + entryId);

Map<String, Object> headersAndPayloadDetails = new TreeMap<>();
traceByteBuf(
"headersAndPayload", headersAndPayload, headersAndPayloadDetails, maxBinaryDataLength);
"headersAndPayload", headersAndPayload, headersAndPayloadDetails, maxBinaryDataLength);
traceDetails.put("payload", headersAndPayloadDetails);

trace(EventReasons.MESSAGE, "After dispatching message", traceDetails);
trace(EventReasons.MESSAGE, "Message dispatched", traceDetails);
}

public void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck ackCmd) {
Expand All @@ -524,9 +544,10 @@ public void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck ackCmd) {
if (level == TraceLevel.OFF) return;

Map<String, Object> traceDetails = new TreeMap<>();
traceDetails.put("serverCnx", getConnectionDetails(cnx));
traceDetails.put("consumer", getConsumerDetails(consumer));
traceDetails.put("subscription", getSubscriptionDetails(consumer.getSubscription()));
if (consumer != null) {
traceDetails.put("subscription", getSubscriptionDetails(consumer.getSubscription()));
}

Map<String, Object> ackDetails = new TreeMap<>();
if (ackCmd.hasAckType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
Expand Down Expand Up @@ -100,7 +101,7 @@ public enum TraceLevel {
private static final LoadingCache<String, String> ipResolverCache =
CacheBuilder.newBuilder()
.maximumSize(10_000L)
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
.expireAfterWrite(4, TimeUnit.HOURS)
.build(
new CacheLoader<String, String>() {
public String load(String clientAddress) {
Expand Down Expand Up @@ -464,18 +465,15 @@ private static void populateConnectionDetails(ServerCnx cnx, Map<String, Object>
if (cnx == null) {
return;
}

traceDetails.put("clientAddress", hostNameOf(cnx.clientSourceAddress()));
traceDetails.put("clientSocket", cnx.clientAddress());
traceDetails.put("clientHost", hostNameOf(cnx.clientSourceAddress()));
traceDetails.put("authRole", cnx.getAuthRole());
traceDetails.put("principal", cnx.getPrincipal());
traceDetails.put("clientVersion", cnx.getClientVersion());
traceDetails.put("clientSourceAddressAndPort", cnx.clientSourceAddressAndPort());
traceDetails.put("authMethod", cnx.getAuthMethod());
traceDetails.put(
"authMethodName",
cnx.getAuthenticationProvider() == null
? "no provider"
: cnx.getAuthenticationProvider().getAuthMethodName());
if (cnx.getAuthenticationProvider() != null) {
traceDetails.put("authMethodName", cnx.getAuthenticationProvider().getAuthMethodName());
}

AuthenticationDataSource authData = cnx.getAuthenticationData();
if (authData != null) {
Expand Down Expand Up @@ -534,10 +532,10 @@ private static void populateSubscriptionDetails(

if (sub.getConsumers() != null) {
traceDetails.put("numberOfConsumers", sub.getConsumers().size());
traceDetails.put(
"namesOfConsumers",
sub.getConsumers().stream().map(Consumer::consumerName).collect(Collectors.toList()));
}
traceDetails.put("isReplicated", sub.isReplicated());
traceDetails.put("numberOfEntriesDelayed", sub.getNumberOfEntriesDelayed());
traceDetails.put("numberOfEntriesInBacklog", sub.getNumberOfEntriesInBacklog(false));

traceDetails.put("subscriptionProperties", sub.getSubscriptionProperties());
}
Expand Down Expand Up @@ -567,9 +565,11 @@ private static void populateConsumerDetails(Consumer consumer, Map<String, Objec

traceDetails.put("priorityLevel", consumer.getPriorityLevel());
traceDetails.put("subType", consumer.subType() == null ? null : consumer.subType().name());
traceDetails.put("clientAddress", hostNameOf(consumer.getClientAddress()));
traceDetails.put("clientHost", hostNameOf(consumer.getClientAddress()));

traceDetails.put("metadata", consumer.getMetadata());
traceDetails.put("unackedMessages", consumer.getUnackedMessages());
traceDetails.put("authRole", consumer.cnx().getAuthRole());
}

public static Map<String, Object> getProducerDetails(Producer producer, boolean traceSchema) {
Expand Down Expand Up @@ -597,7 +597,7 @@ private static void populateProducerDetails(
"topicName", TopicName.get(producer.getTopic().getName()).getPartitionedTopicName());
}

traceDetails.put("clientAddress", hostNameOf(producer.getClientAddress()));
traceDetails.put("clientHost", hostNameOf(producer.getClientAddress()));

traceDetails.put("metadata", producer.getMetadata());

Expand All @@ -613,6 +613,8 @@ private static void populateProducerDetails(
traceDetails.put("schemaVersion", schemaVersion);
}
traceDetails.put("remoteCluster", producer.getRemoteCluster());

traceDetails.put("authRole", producer.getCnx().getAuthRole());
}

public static Map<String, Object> getMessageMetadataDetails(MessageMetadata msgMetadata) {
Expand Down

0 comments on commit 87c6408

Please sign in to comment.