diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java index a246a8473e3..a045afb676e 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java @@ -83,15 +83,14 @@ public class DagActionStoreChangeMonitorTest { */ class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor { - public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads, - boolean isMultiActiveSchedulerEnabled) { - this(topic, config, numThreads, isMultiActiveSchedulerEnabled, mock(DagManagementStateStore.class), mock(DagManager.class), mock(FlowCatalog.class), mock(Orchestrator.class)); + public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads) { + this(topic, config, numThreads, mock(DagManagementStateStore.class), mock(DagManager.class), mock(FlowCatalog.class), mock(Orchestrator.class)); } - public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads, boolean isMultiActiveSchedulerEnabled, + public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads, DagManagementStateStore dagManagementStateStore, DagManager dagManager, FlowCatalog flowCatalog, Orchestrator orchestrator) { super(topic, config, dagManager, numThreads, flowCatalog, orchestrator, - dagManagementStateStore, isMultiActiveSchedulerEnabled, mock(DagProcessingEngineMetrics.class)); + dagManagementStateStore, mock(DagProcessingEngineMetrics.class)); } protected void processMessageForTest(DecodeableKafkaRecord record) { @@ -108,7 +107,7 @@ MockDagActionStoreChangeMonitor createMockDagActionStoreChangeMonitor() { .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer")) .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore")) .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121")); - return new MockDagActionStoreChangeMonitor("dummyTopic", config, 5, true); + return new MockDagActionStoreChangeMonitor("dummyTopic", config, 5); } // Called at start of every test so the count of each method being called is reset to 0 @@ -239,7 +238,7 @@ public void testStartupSequenceHandlesFailures() throws Exception { // Throw an uncaught exception during startup sequence when(mockFlowCatalog.getSpecs(any(URI.class))).thenThrow(new RuntimeException("Uncaught exception")); mockDagActionStoreChangeMonitor = new MockDagActionStoreChangeMonitor("dummyTopic", monitorConfig, 5, - true, dagManagementStateStore, mockDagManager, mockFlowCatalog, mockOrchestrator); + dagManagementStateStore, mockDagManager, mockFlowCatalog, mockOrchestrator); try { mockDagActionStoreChangeMonitor.setActive(); } catch (Exception e) { diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java index 261e686bd21..2715147377b 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java @@ -74,9 +74,9 @@ public class DagManagementDagActionStoreChangeMonitorTest { */ static class MockDagManagementDagActionStoreChangeMonitor extends DagManagementDagActionStoreChangeMonitor { - public MockDagManagementDagActionStoreChangeMonitor(Config config, int numThreads, boolean isMultiActiveSchedulerEnabled) { + public MockDagManagementDagActionStoreChangeMonitor(Config config, int numThreads) { super(config, numThreads, mock(FlowCatalog.class), mock(Orchestrator.class), mock(DagManagementStateStore.class), - isMultiActiveSchedulerEnabled, mock(DagManagement.class), dagActionReminderScheduler, + mock(DagManagement.class), dagActionReminderScheduler, mock(DagProcessingEngineMetrics.class)); } protected void processMessageForTest(DecodeableKafkaRecord record) { @@ -89,7 +89,7 @@ MockDagManagementDagActionStoreChangeMonitor createMockDagManagementDagActionSto .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer")) .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore")) .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121")); - return new MockDagManagementDagActionStoreChangeMonitor(config, 5, true); + return new MockDagManagementDagActionStoreChangeMonitor(config, 5); } // Called at start of every test so the count of each method being called is reset to 0 diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java index 54f8779b29f..826cf54e31f 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java @@ -43,9 +43,6 @@ public class GobblinServiceConfiguration { @Getter private final boolean isWarmStandbyEnabled; - @Getter - private final boolean isMultiActiveSchedulerEnabled; - @Getter private final boolean isMultiActiveExecutionEnabled; @@ -108,7 +105,6 @@ public GobblinServiceConfiguration(String serviceName, String serviceId, Config } this.isWarmStandbyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false); - this.isMultiActiveSchedulerEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY, false); this.isMultiActiveExecutionEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_MULTI_ACTIVE_EXECUTION_ENABLED, false); this.isHelixManagerEnabled = config.hasPath(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java index 508dcb6d8e9..eb997bc854e 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java @@ -165,9 +165,6 @@ public void configure(Binder binder) { binder.bindConstant() .annotatedWith(Names.named(InjectionNames.WARM_STANDBY_ENABLED)) .to(serviceConfig.isWarmStandbyEnabled()); - binder.bindConstant() - .annotatedWith(Names.named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED)) - .to(serviceConfig.isMultiActiveSchedulerEnabled()); binder.bindConstant() .annotatedWith(Names.named(InjectionNames.DAG_PROC_ENGINE_ENABLED)) .to(serviceConfig.isDagProcessingEngineEnabled()); @@ -198,9 +195,6 @@ binding time (optionally bound classes cannot have names associated with them), ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME)).toProvider( FlowLaunchMultiActiveLeaseArbiterFactory.class); OptionalBinder.newOptionalBinder(binder, FlowLaunchHandler.class); - if (serviceConfig.isMultiActiveSchedulerEnabled()) { - binder.bind(FlowLaunchHandler.class); - } OptionalBinder.newOptionalBinder(binder, DagManagement.class); OptionalBinder.newOptionalBinder(binder, DagTaskStream.class); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index ba6d4f0c46d..83b84067cf6 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -324,11 +324,6 @@ private void handleLeadershipChange(NotificationContext changeContext) { LOGGER.info("Leader notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(), this.helixManager.get().isLeader()); - if (configuration.isSchedulerEnabled()) { - LOGGER.info("Gobblin Service is now running in master instance mode, enabling Scheduler."); - this.scheduler.setActive(true); - } - if (helixLeaderGauges.isPresent()) { helixLeaderGauges.get().setState(LeaderState.MASTER); } @@ -354,12 +349,6 @@ private void handleLeadershipChange(NotificationContext changeContext) { LOGGER.info("Leader lost notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(), this.helixManager.get().isLeader()); - if (configuration.isSchedulerEnabled() && !configuration.isMultiActiveSchedulerEnabled()) { - LOGGER.info("Gobblin Service is now running in non-leader mode without multi-active scheduler enabled, " - + "disabling Scheduler."); - this.scheduler.setActive(false); - } - if (helixLeaderGauges.isPresent()) { helixLeaderGauges.get().setState(LeaderState.SLAVE); } @@ -467,15 +456,13 @@ public void start() throws ApplicationException { if (this.helixManager.isPresent()) { // Subscribe to leadership changes this.helixManager.get().addControllerListener((ControllerChangeListener) this::handleLeadershipChange); - + if (configuration.isSchedulerEnabled()) { + LOGGER.info("[Init] Gobblin service is running in multi active mode, enabling Scheduler."); + this.scheduler.setActive(true); + } // Update for first time since there might be no notification if (helixManager.get().isLeader()) { - if (configuration.isSchedulerEnabled()) { - LOGGER.info("[Init] Gobblin Service is running in master instance mode, enabling Scheduler."); - this.scheduler.setActive(true); - } - if (configuration.isGitConfigMonitorEnabled()) { this.gitConfigMonitor.setActive(true); } @@ -485,14 +472,6 @@ public void start() throws ApplicationException { } } else { - if (configuration.isSchedulerEnabled()) { - if (configuration.isMultiActiveSchedulerEnabled()) { - LOGGER.info("[Init] Gobblin Service enabling scheduler for non-leader since multi-active scheduler enabled"); - this.scheduler.setActive(true); - } else { - LOGGER.info("[Init] Gobblin Service is running in non-leader instance mode, not enabling Scheduler."); - } - } if (helixLeaderGauges.isPresent()) { helixLeaderGauges.get().setState(LeaderState.SLAVE); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index bb95d2481f0..03d84f8bae1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -96,7 +96,6 @@ public String load(String key) throws Exception { @VisibleForTesting protected DagManager dagManager; protected Orchestrator orchestrator; - protected boolean isMultiActiveSchedulerEnabled; @Getter @VisibleForTesting protected FlowCatalog flowCatalog; @@ -109,7 +108,7 @@ public String load(String key) throws Exception { // client itself to determine all Kafka related information dynamically rather than through the config. public DagActionStoreChangeMonitor(String topic, Config config, DagManager dagManager, int numThreads, FlowCatalog flowCatalog, Orchestrator orchestrator, DagManagementStateStore dagManagementStateStore, - boolean isMultiActiveSchedulerEnabled, DagProcessingEngineMetrics dagProcEngineMetrics) { + DagProcessingEngineMetrics dagProcEngineMetrics) { // Differentiate group id for each host super(topic, config.withValue(GROUP_ID_KEY, ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX + UUID.randomUUID().toString())), @@ -118,7 +117,6 @@ public DagActionStoreChangeMonitor(String topic, Config config, DagManager dagMa this.flowCatalog = flowCatalog; this.orchestrator = orchestrator; this.dagManagementStateStore = dagManagementStateStore; - this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled; this.dagProcEngineMetrics = dagProcEngineMetrics; /* @@ -282,11 +280,6 @@ protected void handleDagAction(DagActionStore.DagAction dagAction, boolean isSta this.killsInvoked.mark(); } else if (dagAction.getDagActionType().equals(DagActionStore.DagActionType.LAUNCH)) { // If multi-active scheduler is NOT turned on we should not receive these type of events - if (!this.isMultiActiveSchedulerEnabled) { - this.unexpectedErrors.mark(); - throw new RuntimeException(String.format("Received LAUNCH dagAction while not in multi-active scheduler " - + "mode for flowAction: %s", dagAction)); - } submitFlowToDagManagerHelper(dagAction, isStartup); } else { log.warn("Received unsupported dagAction {}. Expected to be a KILL, RESUME, or LAUNCH", dagAction.getDagActionType()); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java index b3925602511..e216f8d096d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java @@ -22,12 +22,10 @@ import com.typesafe.config.Config; import javax.inject.Inject; -import javax.inject.Named; import javax.inject.Provider; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; -import org.apache.gobblin.runtime.util.InjectionNames; import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; import org.apache.gobblin.service.modules.orchestration.DagManager; import org.apache.gobblin.service.modules.orchestration.Orchestrator; @@ -47,20 +45,17 @@ public class DagActionStoreChangeMonitorFactory implements Provider