diff --git a/broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/ServerPlugin.java b/broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/ServerPlugin.java index cc4ac2a4f67..db7f7a14491 100644 --- a/broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/ServerPlugin.java +++ b/broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/ServerPlugin.java @@ -210,8 +210,14 @@ public void afterDestroyConnection(RemotingConnection connection) throws ActiveM */ @Override public void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException { - subscribeMetric.getAllowedMessages().inc(); - ActiveMQServerPlugin.super.afterCreateConsumer(consumer); + Context subscribeContext = subscribeMetric.getTime().time(); + try { + subscribeMetric.getAllowedMessages().inc(); + ActiveMQServerPlugin.super.afterCreateConsumer(consumer); + } + finally { + subscribeContext.stop(); + } } /** @@ -220,41 +226,47 @@ public void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQExceptio @Override public void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException { - String address = message.getAddress(); - int messageSize = message.getEncodeSize(); - SessionContext sessionContext = serverContext.getSecurityContext().getSessionContextWithCacheFallback(pluginUtility.getConnectionId(session.getRemotingConnection())); - logger.debug("Publishing message on address {} from clientId: {} - clientIp: {}", address, sessionContext.getClientId(), sessionContext.getClientIp()); - message.putStringProperty(MessageConstants.HEADER_KAPUA_CLIENT_ID, sessionContext.getClientId()); - message.putStringProperty(MessageConstants.HEADER_KAPUA_CONNECTOR_NAME, sessionContext.getConnectorName()); - message.putStringProperty(MessageConstants.HEADER_KAPUA_SESSION, Base64.getEncoder().encodeToString(SerializationUtils.serialize(sessionContext.getKapuaSession()))); - message.putLongProperty(MessageConstants.HEADER_KAPUA_RECEIVED_TIMESTAMP, KapuaDateUtils.getKapuaSysDate().getEpochSecond()); - message.putStringProperty(MessageConstants.HEADER_KAPUA_MESSAGE_TYPE, getMessgeType(address)); - if (!sessionContext.isInternal()) { - if (isLwt(address)) { - //handle the missing message case - logger.info("Detected missing message for client {}... Flag session to tell disconnector to avoid disconnect event sending", sessionContext.getClientId()); - sessionContext.setMissing(true); - } - // FIX #164 - message.putStringProperty(MessageConstants.HEADER_KAPUA_CONNECTION_ID, Base64.getEncoder().encodeToString(SerializationUtils.serialize(sessionContext.getKapuaConnectionId()))); - message.putBooleanProperty(MessageConstants.HEADER_KAPUA_BROKER_CONTEXT, false); - if (publishInfoMessageSizeLimit < messageSize) { - logger.info("Published message size over threshold. size: {} - destination: {} - account id: {} - username: {} - clientId: {}", - messageSize, address, sessionContext.getAccountName(), sessionContext.getUsername(), sessionContext.getClientId()); - } - publishMetric.getMessageSizeAllowed().update(messageSize); - } else { - if (publishInfoMessageSizeLimit < messageSize) { - logger.info("Published message size over threshold. size: {} - destination: {}", - messageSize, address); + Context sendContext = publishMetric.getTime().time(); + try { + String address = message.getAddress(); + int messageSize = message.getEncodeSize(); + SessionContext sessionContext = serverContext.getSecurityContext().getSessionContextWithCacheFallback(pluginUtility.getConnectionId(session.getRemotingConnection())); + logger.debug("Publishing message on address {} from clientId: {} - clientIp: {}", address, sessionContext.getClientId(), sessionContext.getClientIp()); + message.putStringProperty(MessageConstants.HEADER_KAPUA_CLIENT_ID, sessionContext.getClientId()); + message.putStringProperty(MessageConstants.HEADER_KAPUA_CONNECTOR_NAME, sessionContext.getConnectorName()); + message.putStringProperty(MessageConstants.HEADER_KAPUA_SESSION, Base64.getEncoder().encodeToString(SerializationUtils.serialize(sessionContext.getKapuaSession()))); + message.putLongProperty(MessageConstants.HEADER_KAPUA_RECEIVED_TIMESTAMP, KapuaDateUtils.getKapuaSysDate().getEpochSecond()); + message.putStringProperty(MessageConstants.HEADER_KAPUA_MESSAGE_TYPE, getMessgeType(address)); + if (!sessionContext.isInternal()) { + if (isLwt(address)) { + //handle the missing message case + logger.info("Detected missing message for client {}... Flag session to tell disconnector to avoid disconnect event sending", sessionContext.getClientId()); + sessionContext.setMissing(true); + } + // FIX #164 + message.putStringProperty(MessageConstants.HEADER_KAPUA_CONNECTION_ID, Base64.getEncoder().encodeToString(SerializationUtils.serialize(sessionContext.getKapuaConnectionId()))); + message.putBooleanProperty(MessageConstants.HEADER_KAPUA_BROKER_CONTEXT, false); + if (publishInfoMessageSizeLimit < messageSize) { + logger.info("Published message size over threshold. size: {} - destination: {} - account id: {} - username: {} - clientId: {}", + messageSize, address, sessionContext.getAccountName(), sessionContext.getUsername(), sessionContext.getClientId()); + } + publishMetric.getMessageSizeAllowed().update(messageSize); + } else { + if (publishInfoMessageSizeLimit < messageSize) { + logger.info("Published message size over threshold. size: {} - destination: {}", + messageSize, address); + } + message.putBooleanProperty(MessageConstants.HEADER_KAPUA_BROKER_CONTEXT, true); + publishMetric.getMessageSizeAllowedInternal().update(messageSize); } - message.putBooleanProperty(MessageConstants.HEADER_KAPUA_BROKER_CONTEXT, true); - publishMetric.getMessageSizeAllowedInternal().update(messageSize); + message.putStringProperty(MessageConstants.PROPERTY_ORIGINAL_TOPIC, address); + serverContext.getAddressAccessTracker().update(address); + logger.debug("Published message on address {} from clientId: {} - clientIp: {}", address, sessionContext.getClientId(), sessionContext.getClientIp()); + ActiveMQServerPlugin.super.beforeSend(session, tx, message, direct, noAutoCreateQueue); + } + finally { + sendContext.stop(); } - message.putStringProperty(MessageConstants.PROPERTY_ORIGINAL_TOPIC, address); - serverContext.getAddressAccessTracker().update(address); - logger.debug("Published message on address {} from clientId: {} - clientIp: {}", address, sessionContext.getClientId(), sessionContext.getClientIp()); - ActiveMQServerPlugin.super.beforeSend(session, tx, message, direct, noAutoCreateQueue); } private boolean isLwt(String originalTopic) {