diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java index 866644f50f0..780086c1e69 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java @@ -51,6 +51,7 @@ public class ServiceMetricNames { public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "dagManager"; public static final String DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".failedLaunchEventsOnStartupCount"; + public static final String DAG_MANAGER_SUCCESSFUL_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".successfulLaunchEventsOnStartupCount"; public static final String FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT = DAG_MANAGER_PREFIX + ".flowFailedForwardToDagManagerCount"; //Job status poll timer diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java index d3b25f54e17..473c6ad0ee9 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java @@ -84,6 +84,10 @@ public Dag compileFlow(Spec spec) { List jobExecutionPlans; try { jobExecutionPlans = getJobExecutionPlans(source, destination, jobSpec); + if (jobExecutionPlans.isEmpty()) { + flowSpec.addCompilationError(source, destination, + String.format("Could not find path between source: %s and destination: %s", source, destination)); + } } catch (InterruptedException | ExecutionException e) { Instrumented.markMeter(this.flowCompilationFailedMeter); throw new RuntimeException("Cannot determine topology capabilities", e); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index c90841b50a1..0b2440212b5 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -507,6 +507,7 @@ public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) { this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec, Optional.absent()); if (optionalJobExecutionPlanDag.isPresent()) { addDag(optionalJobExecutionPlanDag.get(), true, true); + this.dagManagerMetrics.incrementSuccessfulLaunchCount(); } else { log.warn("Failed flow compilation of spec causing launch flow event to be skipped on startup. Flow {}", flowId); this.dagManagerMetrics.incrementFailedLaunchCount(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java index 6d6c545b5b4..79bc5bf1d5d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java @@ -76,8 +76,9 @@ public class DagManagerMetrics { private final Map executorSlaExceededMeters = Maps.newConcurrentMap(); private final Map executorJobSentMeters = Maps.newConcurrentMap(); - // Metrics for unexpected flow handling failures - private ContextAwareCounter failedLaunchEventsOnActivationCount; + // Metric for unexpected flow handling outcomes + private ContextAwareCounter failedLaunchEventsOnStartupCount; + private ContextAwareCounter successfulLaunchEventsOnStartupCount; MetricContext metricContext; public DagManagerMetrics(MetricContext metricContext) { @@ -103,9 +104,12 @@ public void activate() { ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER)); allRunningMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR)); - failedLaunchEventsOnActivationCount = metricContext.contextAwareCounter( + // TODO: remove duplicate use of 'GOBBLIN_SERVICE_PREFIX' once startup functionality is stable + failedLaunchEventsOnStartupCount = metricContext.contextAwareCounter( MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT)); + successfulLaunchEventsOnStartupCount = metricContext.contextAwareCounter( + ServiceMetricNames.DAG_MANAGER_SUCCESSFUL_LAUNCH_EVENTS_ON_STARTUP_COUNT); } } @@ -205,10 +209,21 @@ public void incrementCountsStartSlaExceeded(Dag.DagNode node) } } - // Increment the count for num of failed launches during leader activation + /** + * Increment the count for num of failed launches during system startup + */ public void incrementFailedLaunchCount() { if (this.metricContext != null) { - this.failedLaunchEventsOnActivationCount.inc(); + this.failedLaunchEventsOnStartupCount.inc(); + } + } + + /** + * Increment the count for num of successful launches during system startup + */ + public void incrementSuccessfulLaunchCount() { + if (this.metricContext != null) { + this.successfulLaunchEventsOnStartupCount.inc(); } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java index 78b5446bf7c..f19abee927a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java @@ -86,10 +86,16 @@ public Optional> createExecutionPlanIfValid(FlowSpec flowS Map flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec); if (!jobExecutionPlanDagOptional.isPresent()) { + log.warn("No dag execution plan created for flowGroup: {} flowName: {}", + flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD), + flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD)); return Optional.absent(); } if (jobExecutionPlanDagOptional.get() == null || jobExecutionPlanDagOptional.get().isEmpty()) { + log.warn("Null or empty dag execution plan created for flowGroup: {} flowName: {}", + flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD), + flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD)); populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, flowMetadata); return Optional.absent(); }