Skip to content

Commit

Permalink
:fix: (metrics) add publish and subscribe time (metrics created but n…
Browse files Browse the repository at this point in the history
…ot linked to the code)

Signed-off-by: riccardomodanese <[email protected]>
  • Loading branch information
riccardomodanese committed Apr 16, 2024
1 parent 27a4ee6 commit 67f1718
Showing 1 changed file with 47 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,14 @@ public void afterDestroyConnection(RemotingConnection connection) throws ActiveM
*/
@Override
public void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException {
subscribeMetric.getAllowedMessages().inc();
ActiveMQServerPlugin.super.afterCreateConsumer(consumer);
Context subscribeContext = subscribeMetric.getTime().time();
try {
subscribeMetric.getAllowedMessages().inc();
ActiveMQServerPlugin.super.afterCreateConsumer(consumer);
}
finally {
subscribeContext.stop();
}
}

/**
Expand All @@ -220,41 +226,47 @@ public void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQExceptio
@Override
public void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct,
boolean noAutoCreateQueue) throws ActiveMQException {
String address = message.getAddress();
int messageSize = message.getEncodeSize();
SessionContext sessionContext = serverContext.getSecurityContext().getSessionContextWithCacheFallback(pluginUtility.getConnectionId(session.getRemotingConnection()));
logger.debug("Publishing message on address {} from clientId: {} - clientIp: {}", address, sessionContext.getClientId(), sessionContext.getClientIp());
message.putStringProperty(MessageConstants.HEADER_KAPUA_CLIENT_ID, sessionContext.getClientId());
message.putStringProperty(MessageConstants.HEADER_KAPUA_CONNECTOR_NAME, sessionContext.getConnectorName());
message.putStringProperty(MessageConstants.HEADER_KAPUA_SESSION, Base64.getEncoder().encodeToString(SerializationUtils.serialize(sessionContext.getKapuaSession())));
message.putLongProperty(MessageConstants.HEADER_KAPUA_RECEIVED_TIMESTAMP, KapuaDateUtils.getKapuaSysDate().getEpochSecond());
message.putStringProperty(MessageConstants.HEADER_KAPUA_MESSAGE_TYPE, getMessgeType(address));
if (!sessionContext.isInternal()) {
if (isLwt(address)) {
//handle the missing message case
logger.info("Detected missing message for client {}... Flag session to tell disconnector to avoid disconnect event sending", sessionContext.getClientId());
sessionContext.setMissing(true);
}
// FIX #164
message.putStringProperty(MessageConstants.HEADER_KAPUA_CONNECTION_ID, Base64.getEncoder().encodeToString(SerializationUtils.serialize(sessionContext.getKapuaConnectionId())));
message.putBooleanProperty(MessageConstants.HEADER_KAPUA_BROKER_CONTEXT, false);
if (publishInfoMessageSizeLimit < messageSize) {
logger.info("Published message size over threshold. size: {} - destination: {} - account id: {} - username: {} - clientId: {}",
messageSize, address, sessionContext.getAccountName(), sessionContext.getUsername(), sessionContext.getClientId());
}
publishMetric.getMessageSizeAllowed().update(messageSize);
} else {
if (publishInfoMessageSizeLimit < messageSize) {
logger.info("Published message size over threshold. size: {} - destination: {}",
messageSize, address);
Context sendContext = publishMetric.getTime().time();
try {
String address = message.getAddress();
int messageSize = message.getEncodeSize();
SessionContext sessionContext = serverContext.getSecurityContext().getSessionContextWithCacheFallback(pluginUtility.getConnectionId(session.getRemotingConnection()));
logger.debug("Publishing message on address {} from clientId: {} - clientIp: {}", address, sessionContext.getClientId(), sessionContext.getClientIp());
message.putStringProperty(MessageConstants.HEADER_KAPUA_CLIENT_ID, sessionContext.getClientId());
message.putStringProperty(MessageConstants.HEADER_KAPUA_CONNECTOR_NAME, sessionContext.getConnectorName());
message.putStringProperty(MessageConstants.HEADER_KAPUA_SESSION, Base64.getEncoder().encodeToString(SerializationUtils.serialize(sessionContext.getKapuaSession())));
message.putLongProperty(MessageConstants.HEADER_KAPUA_RECEIVED_TIMESTAMP, KapuaDateUtils.getKapuaSysDate().getEpochSecond());
message.putStringProperty(MessageConstants.HEADER_KAPUA_MESSAGE_TYPE, getMessgeType(address));
if (!sessionContext.isInternal()) {
if (isLwt(address)) {
//handle the missing message case
logger.info("Detected missing message for client {}... Flag session to tell disconnector to avoid disconnect event sending", sessionContext.getClientId());
sessionContext.setMissing(true);
}
// FIX #164
message.putStringProperty(MessageConstants.HEADER_KAPUA_CONNECTION_ID, Base64.getEncoder().encodeToString(SerializationUtils.serialize(sessionContext.getKapuaConnectionId())));
message.putBooleanProperty(MessageConstants.HEADER_KAPUA_BROKER_CONTEXT, false);
if (publishInfoMessageSizeLimit < messageSize) {
logger.info("Published message size over threshold. size: {} - destination: {} - account id: {} - username: {} - clientId: {}",
messageSize, address, sessionContext.getAccountName(), sessionContext.getUsername(), sessionContext.getClientId());
}
publishMetric.getMessageSizeAllowed().update(messageSize);
} else {
if (publishInfoMessageSizeLimit < messageSize) {
logger.info("Published message size over threshold. size: {} - destination: {}",
messageSize, address);
}
message.putBooleanProperty(MessageConstants.HEADER_KAPUA_BROKER_CONTEXT, true);
publishMetric.getMessageSizeAllowedInternal().update(messageSize);
}
message.putBooleanProperty(MessageConstants.HEADER_KAPUA_BROKER_CONTEXT, true);
publishMetric.getMessageSizeAllowedInternal().update(messageSize);
message.putStringProperty(MessageConstants.PROPERTY_ORIGINAL_TOPIC, address);
serverContext.getAddressAccessTracker().update(address);
logger.debug("Published message on address {} from clientId: {} - clientIp: {}", address, sessionContext.getClientId(), sessionContext.getClientIp());
ActiveMQServerPlugin.super.beforeSend(session, tx, message, direct, noAutoCreateQueue);
}
finally {
sendContext.stop();
}
message.putStringProperty(MessageConstants.PROPERTY_ORIGINAL_TOPIC, address);
serverContext.getAddressAccessTracker().update(address);
logger.debug("Published message on address {} from clientId: {} - clientIp: {}", address, sessionContext.getClientId(), sessionContext.getClientIp());
ActiveMQServerPlugin.super.beforeSend(session, tx, message, direct, noAutoCreateQueue);
}

private boolean isLwt(String originalTopic) {
Expand Down

0 comments on commit 67f1718

Please sign in to comment.