diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java index da57f1a1a..c0538f38a 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java @@ -447,7 +447,18 @@ public void onJobClusterCreate(final CreateJobClusterRequest request) { try { Optional jobClusterInfoO = jobClusterInfoManager.createClusterActorAndRegister(request.getJobClusterDefinition()); if (jobClusterInfoO.isPresent()) { - jobClusterInfoManager.initializeClusterAsync(jobClusterInfoO.get(), new JobClusterProto.InitializeJobClusterRequest(request.getJobClusterDefinition(), request.getUser(), getSender())); + // Check if the job cluster name used to exist but was deleted + // If it was, use the last known job number from that cluster instead of starting from 0 + // This ensures that there are no conflicts between archived jobs and old jobs + // e.g. If a job cluster with the name "MyJobCluster" was deleted, but had run a job, then + // we'd have a Job ID of `MyJobCluster-1` in the archived jobs table. When the new job cluster + // came online it may partially overwrite the old archived job if we started a new job with ID MyJobCluster-1. + List completedJobs = jobStore.loadCompletedJobsForCluster(request.getJobClusterDefinition().getName(), 1, null); + long lastJobNum = completedJobs.stream() + .map((job) -> JobId.fromId(job.getJobId()).map(JobId::getJobNum).orElse(0L)) + .max(Long::compareTo) + .orElse(0L); + jobClusterInfoManager.initializeClusterAsync(jobClusterInfoO.get(), new JobClusterProto.InitializeJobClusterRequest(request.getJobClusterDefinition(), lastJobNum, request.getUser(), getSender())); } else { getSender().tell(new CreateJobClusterResponse( request.requestId, CLIENT_ERROR, diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterProto.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterProto.java index 594b1e528..1396c37d1 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterProto.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterProto.java @@ -74,11 +74,12 @@ public InitializeJobClusterRequest(final JobClusterDefinitionImpl jobClusterDefi /** * Invoked during Job Cluster Creation * @param jobClusterDefinition + * @param lastJobNumber * @param user * @param requestor */ - public InitializeJobClusterRequest(final JobClusterDefinitionImpl jobClusterDefinition, String user, ActorRef requestor) { - this(jobClusterDefinition, false, 0, Lists.newArrayList(), user, requestor, true); + public InitializeJobClusterRequest(final JobClusterDefinitionImpl jobClusterDefinition, long lastJobNumber, String user, ActorRef requestor) { + this(jobClusterDefinition, false, lastJobNumber, Lists.newArrayList(), user, requestor, true); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java index ae68d5d27..e927145fc 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java @@ -343,7 +343,7 @@ public void testJobClusterCreate() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(name); ActorRef jobClusterActor = system.actorOf(props(name, jobStoreMock, schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -390,7 +390,7 @@ public void testJobClusterEnable() { SLA sla = new SLA(1,1,null,null); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, Lists.newArrayList(),sla); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -461,7 +461,7 @@ public void testJobClusterUpdateAndDelete() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, labels); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -528,7 +528,7 @@ public void testJobClusterUpdateFailsIfArtifactNotUnique() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, labels); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -556,7 +556,7 @@ public void testJobClusterDeleteFailsIfJobsActive() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, labels); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -593,7 +593,7 @@ public void testJobClusterDeletePurgesCompletedJobs() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, labels); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -636,7 +636,7 @@ public void testJobClusterDisable() throws InterruptedException { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -731,7 +731,7 @@ public void testJobClusterSLAUpdate() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); SLA newSLA = new SLA(0,10,null,null); @@ -764,7 +764,7 @@ public void testJobClusterMigrationConfigUpdate() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); WorkerMigrationConfig newConfig = new WorkerMigrationConfig(MigrationStrategyEnum.ONE_WORKER, "{'name':'value'}"); @@ -797,7 +797,7 @@ public void testJobClusterArtifactUpdate() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -836,7 +836,7 @@ public void testJobClusterArtifactUpdateNotUniqueFails() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -873,7 +873,7 @@ public void testJobClusterArtifactUpdateMultipleTimes() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -941,7 +941,7 @@ public void testJobClusterInvalidSLAUpdateIgnored() throws Exception { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -976,7 +976,7 @@ public void testJobClusterLabelsUpdate() throws Exception { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); // assert initially no labels @@ -1032,7 +1032,7 @@ public void testJobSubmit() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1070,7 +1070,7 @@ public void testJobSubmitWithNoJarAndSchedInfo() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1123,7 +1123,7 @@ public void testJobSubmitWithVersionAndNoSchedInfo() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1231,7 +1231,7 @@ public void testJobComplete() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1272,7 +1272,7 @@ public void testJobKillTriggersSLAToLaunchNew() { SLA sla = new SLA(1,1,null,null); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, Lists.newArrayList(),sla); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, schedulerMockFactory, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); String jobId = clusterName + "-1"; @@ -1328,7 +1328,7 @@ public void testJobSubmitTriggersSLAToKillOld() { SLA sla = new SLA(1,1,null,null); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, Lists.newArrayList(),sla); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, schedulerMockFactory, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); String jobId = clusterName + "-1"; @@ -1410,7 +1410,7 @@ public void testJobSubmitTriggersSLAToKillOldHandlesErrors() { SLA sla = new SLA(1,1,null,null); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, Lists.newArrayList(),sla); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1493,7 +1493,7 @@ public void testCronTriggersSLAToKillOld() { SLA sla = new SLA(1,1,"0/1 * * * * ?",IJobClusterDefinition.CronPolicy.KEEP_NEW); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, Lists.newArrayList(),sla); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1543,7 +1543,7 @@ public void testJobSubmitWithUnique() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1592,7 +1592,7 @@ public void testJobSubmitWithoutInheritInstance() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1644,7 +1644,7 @@ public void testJobSubmitWithInheritInstanceFlagsSingleStage() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1710,7 +1710,7 @@ public void testJobSubmitWithInheritInstanceFlagsMultiStage() { createFakeJobClusterDefn(clusterName, Lists.newArrayList(), NO_OP_SLA, schedulingInfo1); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest( - fakeJobCluster, user, probe.getRef()), probe.getRef()); + fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1786,7 +1786,7 @@ public void testJobSubmitWithInheritInstanceFlagsScaled() { createFakeJobClusterDefn(clusterName, Lists.newArrayList(), NO_OP_SLA, schedulingInfo1); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest( - fakeJobCluster, user, probe.getRef()), probe.getRef()); + fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1854,7 +1854,7 @@ public void testQuickJobSubmit() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1895,7 +1895,7 @@ public void testQuickJobSubmitWithNoSchedInfoInPreviousJob() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1949,7 +1949,7 @@ public void testJobSubmitWithNoSchedInfoUsesJobClusterValues() { clusterLabels.add(label); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, clusterLabels); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2026,7 +2026,7 @@ public void testQuickJobSubmitWithNoPreviousHistoryFails() { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2061,7 +2061,7 @@ public void testUpdateJobClusterArtifactWithAutoSubmit() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, Lists.newArrayList(),sla); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2132,7 +2132,7 @@ public void testJobSubmitFails() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); Mockito.doThrow(Exception.class).when(jobStoreMock).storeNewJob(any()); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2168,7 +2168,7 @@ public void testGetLastSubmittedJobSubject() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2224,7 +2224,7 @@ public void testGetLastSubmittedJobSubjectWithWrongClusterNameFails() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2268,7 +2268,7 @@ public void testListArchivedWorkers() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStore, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2327,7 +2327,7 @@ public void testZombieWorkerKilledOnMessage() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, schedulerMockFactory, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2361,7 +2361,7 @@ public void testZombieWorkerTerminateEventIgnored() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, schedulerMockFactory, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2388,7 +2388,7 @@ public void testResubmitWorker() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, schedulerMockFactory, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2448,7 +2448,7 @@ public void testScaleStage() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, schedulerMockFactory, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2514,7 +2514,7 @@ public void testGetJobDetailsForArchivedJob() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); String jobId = clusterName + "-1"; @@ -2566,7 +2566,7 @@ public void testListJobIdsForCluster() throws InvalidJobException { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2674,7 +2674,7 @@ public void testListJobsForCluster() throws InvalidJobException, InterruptedExce MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2849,7 +2849,7 @@ public void testListJobWithLabelMatch() { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2952,7 +2952,7 @@ public void testLostWorkerGetsReplaced() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreSpied, schedulerMockFactory, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobClusterManagerAkkaTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobClusterManagerAkkaTest.java index b388994ec..c3095cba7 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobClusterManagerAkkaTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobClusterManagerAkkaTest.java @@ -107,6 +107,7 @@ import java.net.MalformedURLException; import java.time.Duration; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -856,9 +857,7 @@ public void testJobClusterCreate() throws MalformedURLException { GetJobClusterResponse resp2 = probe.expectMsgClass(GetJobClusterResponse.class); assertEquals(SUCCESS, resp2.responseCode); assertEquals(clusterName, resp2.getJobCluster().get().getName()); - - //assertEquals(jobClusterManagerActor, probe.getLastSender().path()); - + assertEquals(0L, resp2.getJobCluster().get().getLastJobCount()); } @Test @@ -879,6 +878,7 @@ public void testJobClusterCreateDupFails() throws MalformedURLException { GetJobClusterResponse resp2 = probe.expectMsgClass(GetJobClusterResponse.class); assertEquals(SUCCESS, resp2.responseCode); assertEquals(clusterName, resp2.getJobCluster().get().getName()); + assertEquals(0L, resp2.getJobCluster().get().getLastJobCount()); jobClusterManagerActor.tell(new JobClusterManagerProto.CreateJobClusterRequest( fakeJobCluster, @@ -893,9 +893,32 @@ public void testJobClusterCreateDupFails() throws MalformedURLException { GetJobClusterResponse resp4 = probe.expectMsgClass(GetJobClusterResponse.class); assertEquals(SUCCESS, resp4.responseCode); assertEquals(clusterName, resp4.getJobCluster().get().getName()); + assertEquals(0L, resp4.getJobCluster().get().getLastJobCount()); + } - //assertEquals(jobClusterManagerActor, probe.getLastSender().path()); + @Test + public void testJobClusterCreateDeletedDup() throws IOException { + TestKit probe = new TestKit(system); + String clusterName = "testJobClusterCreateCluster"; + List completedJobs = new ArrayList<>(); + completedJobs.add(new CompletedJob(clusterName, clusterName + "-45", "123", JobState.Completed, 0, 0, "me", new ArrayList<>())); + when(jobStoreMock.loadCompletedJobsForCluster(clusterName, 1, null)) + .thenReturn(completedJobs); + final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn( + clusterName, + Lists.newArrayList()); + jobClusterManagerActor.tell(new JobClusterManagerProto.CreateJobClusterRequest( + fakeJobCluster, + "user"), probe.getRef()); + JobClusterManagerProto.CreateJobClusterResponse resp = probe.expectMsgClass( + JobClusterManagerProto.CreateJobClusterResponse.class); + assertEquals(SUCCESS_CREATED, resp.responseCode); + jobClusterManagerActor.tell(new GetJobClusterRequest(clusterName), probe.getRef()); + GetJobClusterResponse resp2 = probe.expectMsgClass(GetJobClusterResponse.class); + assertEquals(SUCCESS, resp2.responseCode); + assertEquals(clusterName, resp2.getJobCluster().get().getName()); + assertEquals(45L, resp2.getJobCluster().get().getLastJobCount()); } @Test