Skip to content

Commit

Permalink
remote vs local counter to better observe how many internal and exter…
Browse files Browse the repository at this point in the history
…nal messages is being processed (#14617)

CAMEL-20879: camel-core: remote vs local endpoints counters
  • Loading branch information
davsclaus authored Jun 22, 2024
1 parent 1a6a166 commit 87c8e83
Show file tree
Hide file tree
Showing 21 changed files with 260 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ interface InflightExchange {
*/
String getFromRouteId();

/**
* Whether the endpoint is remote where the exchange originates (started)
*/
boolean isFromRemoteEndpoint();

/**
* The id of the route where the exchange currently is being processed
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,14 @@ public String getFromRouteId() {
return exchange.getFromRouteId();
}

@Override
public boolean isFromRemoteEndpoint() {
if (exchange.getFromEndpoint() != null) {
return exchange.getFromEndpoint().isRemote();
}
return false;
}

@Override
public String getAtRouteId() {
return ExchangeHelper.getAtRouteId(exchange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ protected String doCallText(Map<String, Object> options) {
sb.append(String.format("\n Uri: %s", mc.getEndpointUri()));
sb.append(String.format("\n State: %s", mc.getState()));
sb.append(String.format("\n Class: %s", mc.getServiceType()));
sb.append(String.format("\n Remote: %b", mc.isRemoteEndpoint()));
sb.append(String.format("\n Hosted: %b", mc.isHostedService()));
sb.append(String.format("\n Inflight: %d", inflight));
if (mcc instanceof ManagedSchedulePollConsumerMBean mpc) {
sb.append(String.format("\n Polling: %s", mpc.isPolling()));
Expand All @@ -74,9 +76,9 @@ protected String doCallText(Map<String, Object> options) {
sb.append(String.format("\n Greedy: %s", mpc.isGreedy()));
sb.append(String.format("\n Running Logging Level: %s", mpc.getRunningLoggingLevel()));
sb.append(String.format("\n Send Empty Message When Idle: %s", mpc.isSendEmptyMessageWhenIdle()));
sb.append(String.format("\n Counter(total: %d success: %d error: %d)",
sb.append(String.format("\n Counter (total: %d success: %d error: %d)",
mpc.getCounter(), mpc.getSuccessCounter(), mpc.getErrorCounter()));
sb.append(String.format("\n Delay(initial: %d delay: %d unit: %s)",
sb.append(String.format("\n Delay (initial: %d delay: %d unit: %s)",
mpc.getInitialDelay(), mpc.getDelay(), mpc.getTimeUnit()));
sb.append(String.format(
"\n Backoff(counter: %d multiplier: %d errorThreshold: %d, idleThreshold: %d )",
Expand Down Expand Up @@ -113,7 +115,7 @@ protected String doCallText(Map<String, Object> options) {
sb.append(String.format("\n Repeat Count: %s", repeatCount));
}
sb.append(String.format("\n Running Logging Level: %s", runLoggingLevel));
sb.append(String.format("\n Counter(total: %s)", counter));
sb.append(String.format("\n Counter (total: %s)", counter));

}
} catch (Exception e) {
Expand Down Expand Up @@ -150,6 +152,8 @@ protected JsonObject doCallJson(Map<String, Object> options) {
jo.put("uri", mc.getEndpointUri());
jo.put("state", mc.getState());
jo.put("class", mc.getServiceType());
jo.put("remote", mc.isRemoteEndpoint());
jo.put("hosted", mc.isHostedService());
jo.put("inflight", inflight);
jo.put("scheduled", false);
if (mcc instanceof ManagedSchedulePollConsumerMBean mpc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ public ContextDevConsole() {
protected String doCallText(Map<String, Object> options) {
StringBuilder sb = new StringBuilder();

sb.append(String.format("Apache Camel %s %s (%s) uptime %s", getCamelContext().getVersion(),
getCamelContext().getStatus().name().toLowerCase(Locale.ROOT), getCamelContext().getName(),
CamelContextHelper.getUptime(getCamelContext())));
String profile = "";
if (getCamelContext().getCamelContextExtension().getProfile() != null) {
sb.append(String.format("\n Profile: %s", getCamelContext().getCamelContextExtension().getProfile()));
profile = " (profile: " + getCamelContext().getCamelContextExtension().getProfile() + ")";
}
sb.append(String.format("Apache Camel %s %s (%s)%s uptime %s", getCamelContext().getVersion(),
getCamelContext().getStatus().name().toLowerCase(Locale.ROOT), getCamelContext().getName(),
profile, CamelContextHelper.getUptime(getCamelContext())));
if (getCamelContext().getDescription() != null) {
sb.append(String.format("\n %s", getCamelContext().getDescription()));
}
Expand All @@ -70,9 +71,9 @@ protected String doCallText(Map<String, Object> options) {
if (!thp.isEmpty()) {
sb.append(String.format("\n Messages/Sec: %s", thp));
}
sb.append(String.format("\n Total: %s", mb.getExchangesTotal()));
sb.append(String.format("\n Failed: %s", mb.getExchangesFailed()));
sb.append(String.format("\n Inflight: %s", mb.getExchangesInflight()));
sb.append(String.format("\n Total: %s/%s", mb.getRemoteExchangesTotal(), mb.getExchangesTotal()));
sb.append(String.format("\n Failed: %s/%s", mb.getRemoteExchangesFailed(), mb.getExchangesFailed()));
sb.append(String.format("\n Inflight: %s/%s", mb.getRemoteExchangesInflight(), mb.getExchangesInflight()));
long idle = mb.getIdleSince();
if (idle > 0) {
sb.append(String.format("\n Idle Since: %s", TimeUtils.printDuration(idle)));
Expand Down Expand Up @@ -151,6 +152,9 @@ protected JsonObject doCallJson(Map<String, Object> options) {
stats.put("exchangesTotal", mb.getExchangesTotal());
stats.put("exchangesFailed", mb.getExchangesFailed());
stats.put("exchangesInflight", mb.getExchangesInflight());
stats.put("remoteExchangesTotal", mb.getRemoteExchangesTotal());
stats.put("remoteExchangesFailed", mb.getRemoteExchangesFailed());
stats.put("remoteExchangesInflight", mb.getRemoteExchangesInflight());
stats.put("reloaded", reloaded);
stats.put("meanProcessingTime", mb.getMeanProcessingTime());
stats.put("maxProcessingTime", mb.getMaxProcessingTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ protected String doCallText(Map<String, Object> options) {
if (!col.isEmpty()) {
for (Endpoint e : col) {
boolean stub = e.getComponent().getClass().getSimpleName().equals("StubComponent");
boolean remote = e.isRemote();
String uri = e.toString();
if (!uri.startsWith("stub:") && stub) {
// shadow-stub
Expand All @@ -62,9 +63,10 @@ protected String doCallText(Map<String, Object> options) {
var stat = findStats(stats, e.getEndpointUri());
if (stat.isPresent()) {
var st = stat.get();
sb.append(String.format("\n %s (direction: %s, usage: %s)", uri, st.getDirection(), st.getHits()));
sb.append(String.format("\n %s (remote: %s direction: %s, usage: %s)", uri, remote, st.getDirection(),
st.getHits()));
} else {
sb.append(String.format("\n %s", uri));
sb.append(String.format("\n %s (remote: %s)", uri, remote));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ protected String doCallText(Map<String, Object> options) {
if (repo.isInflightBrowseEnabled()) {
for (InflightRepository.InflightExchange ie : repo.browse(filter, max, false)) {
String age = TimeUtils.printDuration(ie.getDuration(), true);
sb.append(String.format("\n %s (from: %s at: %s/%s age: %s)",
ie.getExchange().getExchangeId(), ie.getFromRouteId(), ie.getAtRouteId(), ie.getNodeId(), age));
sb.append(String.format("\n %s (from: %s at: %s/%s remote: %b age: %s)",
ie.getExchange().getExchangeId(), ie.getFromRouteId(), ie.getAtRouteId(), ie.getNodeId(),
ie.isFromRemoteEndpoint(), age));
}
}

Expand All @@ -82,6 +83,7 @@ protected JsonObject doCallJson(Map<String, Object> options) {
JsonObject props = new JsonObject();
props.put("exchangeId", ie.getExchange().getExchangeId());
props.put("fromRouteId", ie.getFromRouteId());
props.put("fromRemoteEndpoint", ie.isFromRemoteEndpoint());
props.put("atRouteId", ie.getAtRouteId());
props.put("nodeId", ie.getNodeId());
props.put("elapsed", ie.getElapsed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ protected String doCallText(Map<String, Object> options) {
sb.append(String.format(" Node Prefix Id: %s", mrb.getNodePrefixId()));
}
sb.append(String.format("\n From: %s", mrb.getEndpointUri()));
sb.append(String.format("\n Remote: %s", mrb.isRemoteEndpoint()));
if (mrb.getSourceLocation() != null) {
sb.append(String.format("\n Source: %s", mrb.getSourceLocation()));
}
Expand Down Expand Up @@ -233,6 +234,7 @@ protected JsonObject doCallJson(Map<String, Object> options) {
jo.put("nodePrefixId", mrb.getNodePrefixId());
}
jo.put("from", mrb.getEndpointUri());
jo.put("remote", mrb.isRemoteEndpoint());
if (mrb.getSourceLocation() != null) {
jo.put("source", mrb.getSourceLocation());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,18 @@ public interface ManagedCamelContextMBean extends ManagedPerformanceCounterMBean
@ManagedAttribute(description = "Throughput message/second")
String getThroughput();

@ManagedAttribute(description = "Total number of exchanges processed from remote endpoints only")
long getRemoteExchangesTotal();

@ManagedAttribute(description = "Completed (success) number of exchanges processed from remote endpoints only")
long getRemoteExchangesCompleted();

@ManagedAttribute(description = "Failed number of exchanges processed from remote endpoints only")
long getRemoteExchangesFailed();

@ManagedAttribute(description = "Total number of exchanges inflight from remote endpoints only")
long getRemoteExchangesInflight();

@ManagedAttribute(description = "Whether breadcrumbs is in use")
boolean isUseBreadcrumb();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ public interface ManagedConsumerMBean extends ManagedServiceMBean {
@ManagedAttribute(description = "Whether this consumer hosts a service such as acting as a HTTP server (only available for some components)")
boolean isHostedService();

@ManagedAttribute(description = "Whether this consumer connects to remote or local systems")
boolean isRemoteEndpoint();

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public interface ManagedEndpointMBean {
@ManagedAttribute(description = "Singleton")
boolean isSingleton();

@ManagedAttribute(description = "Remote")
@ManagedAttribute(description = "Whether this endpoint connects to remote or local systems")
boolean isRemote();

@ManagedAttribute(description = "Endpoint State")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ public interface ManagedProducerMBean extends ManagedServiceMBean {
@ManagedAttribute(description = "Singleton")
boolean isSingleton();

@ManagedAttribute(description = "Whether this producer connects to remote or local systems")
boolean isRemoteEndpoint();

}
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,7 @@ public interface ManagedRouteMBean extends ManagedPerformanceCounterMBean {
@ManagedAttribute(description = "Whether update route from XML is enabled")
boolean isUpdateRouteEnabled();

@ManagedAttribute(description = "Whether the consumer connects to remote or local systems")
boolean isRemoteEndpoint();

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti
private final String jmxDomain;
private final boolean includeRouteTemplates;
private final boolean includeKamelets;
private Statistic remoteExchangesTotal;
private Statistic remoteExchangesCompleted;
private Statistic remoteExchangesFailed;
private Statistic remoteExchangesInflight;

public ManagedCamelContext(CamelContext context) {
this.context = context;
Expand All @@ -75,11 +79,24 @@ public ManagedCamelContext(CamelContext context) {
@Override
public void init(ManagementStrategy strategy) {
super.init(strategy);
this.remoteExchangesTotal = new StatisticCounter();
this.remoteExchangesCompleted = new StatisticCounter();
this.remoteExchangesFailed = new StatisticCounter();
this.remoteExchangesInflight = new StatisticCounter();
boolean enabled = context.getManagementStrategy().getManagementAgent() != null
&& context.getManagementStrategy().getManagementAgent().getStatisticsLevel() != ManagementStatisticsLevel.Off;
setStatisticsEnabled(enabled);
}

@Override
public void reset() {
super.reset();
remoteExchangesTotal.reset();
remoteExchangesCompleted.reset();
remoteExchangesFailed.reset();
remoteExchangesInflight.reset();
}

@Override
public void completedExchange(Exchange exchange, long time) {
// the camel-context mbean is triggered for every route mbean
Expand All @@ -91,9 +108,19 @@ public void completedExchange(Exchange exchange, long time) {
int level = uow.routeStackLevel(includeRouteTemplates, includeKamelets);
if (level <= 1) {
super.completedExchange(exchange, time);
if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) {
remoteExchangesTotal.increment();
remoteExchangesCompleted.increment();
remoteExchangesInflight.decrement();
}
}
} else {
super.completedExchange(exchange, time);
if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) {
remoteExchangesTotal.increment();
remoteExchangesCompleted.increment();
remoteExchangesInflight.decrement();
}
}
}

Expand All @@ -108,9 +135,19 @@ public void failedExchange(Exchange exchange) {
int level = uow.routeStackLevel(includeRouteTemplates, includeKamelets);
if (level <= 1) {
super.failedExchange(exchange);
if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) {
remoteExchangesTotal.increment();
remoteExchangesFailed.increment();
remoteExchangesInflight.decrement();
}
}
} else {
super.failedExchange(exchange);
if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) {
remoteExchangesTotal.increment();
remoteExchangesFailed.increment();
remoteExchangesInflight.decrement();
}
}
}

Expand All @@ -125,9 +162,15 @@ public void processExchange(Exchange exchange, String type) {
int level = uow.routeStackLevel(includeRouteTemplates, includeKamelets);
if (level <= 1) {
super.processExchange(exchange, type);
if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) {
remoteExchangesInflight.increment();
}
}
} else {
super.processExchange(exchange, type);
if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) {
remoteExchangesInflight.increment();
}
}
}

Expand Down Expand Up @@ -330,6 +373,26 @@ public String getThroughput() {
}
}

@Override
public long getRemoteExchangesTotal() {
return remoteExchangesTotal.getValue();
}

@Override
public long getRemoteExchangesCompleted() {
return remoteExchangesCompleted.getValue();
}

@Override
public long getRemoteExchangesFailed() {
return remoteExchangesFailed.getValue();
}

@Override
public long getRemoteExchangesInflight() {
return remoteExchangesInflight.getValue();
}

@Override
public boolean isUseBreadcrumb() {
return context.isUseBreadcrumb();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,9 @@ public boolean isHostedService() {
}
return false;
}

@Override
public boolean isRemoteEndpoint() {
return consumer.getEndpoint().isRemote();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ public boolean isSingleton() {
return producer.isSingleton();
}

@Override
public boolean isRemoteEndpoint() {
return producer.getEndpoint().isRemote();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,14 @@ public boolean isUpdateRouteEnabled() {
return enabled != null ? enabled : false;
}

@Override
public boolean isRemoteEndpoint() {
if (route.getEndpoint() != null) {
return route.getEndpoint().isRemote();
}
return false;
}

@Override
public boolean equals(Object o) {
return this == o || o != null && getClass() == o.getClass() && route.equals(((ManagedRoute) o).route);
Expand Down
Loading

0 comments on commit 87c8e83

Please sign in to comment.