From e8dadbae8abdd8e96471eab2e9e9febb9b73ed2a Mon Sep 17 00:00:00 2001 From: Tim Martin Date: Fri, 3 Jan 2025 12:11:45 -0500 Subject: [PATCH] Check archived jobs for last known job number before creating cluster We ran into a bug where we deleted a job cluster and then recreated the job cluster with the same name. The old job cluster had 4 stages and the new one had two. When a job was completed, it would write to the archived tables. However, there already existed a job cluster there with the same ID. The KV provider only overwrote the rows for stages 1 and 2. It did not delete the values for stages 3 and 4. When Mantis tried to load the archived job, it would see job metadata indicating 2 stages, but then would receive 4 stages (two from the new job and 4 from the old job). This would lead to the Mantis not loading the job. We could probably consider this a bug in the Dynamo KV Provider, _but_ it felt like we don't want to overwrite archived jobs in any scenario since we'd like to maintain a record of those jobs. Instead, the problem is further upstream. When we create a job, we should be reasonably confident that the Job ID is globally unique. However, when creating a job cluster, the `lastJobCount` value is always set to 0. We should instead check if there are any archived jobs with the same cluster name. If so, we should grab the last value and set that as the last known job number. We desire the following scenario 1. Create a job cluster "MyJob" 2. Create a job "MyJob-1" 3. Delete the job and job cluster 4. Create another job cluster MyJob 5. Create a job "MyJob-2" instead of "MyJob-1" Previously, we would have an archived job "MyJob-1" and an active job "MyJob-1" that are distinct. Stopping the active one would overwrite the archived one. --- .../master/JobClustersManagerActor.java | 13 ++- .../jobcluster/proto/JobClusterProto.java | 5 +- .../master/jobcluster/JobClusterAkkaTest.java | 90 +++++++++---------- .../job/JobClusterManagerAkkaTest.java | 31 ++++++- 4 files changed, 87 insertions(+), 52 deletions(-) diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java index da57f1a1a..c0538f38a 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java @@ -447,7 +447,18 @@ public void onJobClusterCreate(final CreateJobClusterRequest request) { try { Optional jobClusterInfoO = jobClusterInfoManager.createClusterActorAndRegister(request.getJobClusterDefinition()); if (jobClusterInfoO.isPresent()) { - jobClusterInfoManager.initializeClusterAsync(jobClusterInfoO.get(), new JobClusterProto.InitializeJobClusterRequest(request.getJobClusterDefinition(), request.getUser(), getSender())); + // Check if the job cluster name used to exist but was deleted + // If it was, use the last known job number from that cluster instead of starting from 0 + // This ensures that there are no conflicts between archived jobs and old jobs + // e.g. If a job cluster with the name "MyJobCluster" was deleted, but had run a job, then + // we'd have a Job ID of `MyJobCluster-1` in the archived jobs table. When the new job cluster + // came online it may partially overwrite the old archived job if we started a new job with ID MyJobCluster-1. + List completedJobs = jobStore.loadCompletedJobsForCluster(request.getJobClusterDefinition().getName(), 1, null); + long lastJobNum = completedJobs.stream() + .map((job) -> JobId.fromId(job.getJobId()).map(JobId::getJobNum).orElse(0L)) + .max(Long::compareTo) + .orElse(0L); + jobClusterInfoManager.initializeClusterAsync(jobClusterInfoO.get(), new JobClusterProto.InitializeJobClusterRequest(request.getJobClusterDefinition(), lastJobNum, request.getUser(), getSender())); } else { getSender().tell(new CreateJobClusterResponse( request.requestId, CLIENT_ERROR, diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterProto.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterProto.java index 594b1e528..1396c37d1 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterProto.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterProto.java @@ -74,11 +74,12 @@ public InitializeJobClusterRequest(final JobClusterDefinitionImpl jobClusterDefi /** * Invoked during Job Cluster Creation * @param jobClusterDefinition + * @param lastJobNumber * @param user * @param requestor */ - public InitializeJobClusterRequest(final JobClusterDefinitionImpl jobClusterDefinition, String user, ActorRef requestor) { - this(jobClusterDefinition, false, 0, Lists.newArrayList(), user, requestor, true); + public InitializeJobClusterRequest(final JobClusterDefinitionImpl jobClusterDefinition, long lastJobNumber, String user, ActorRef requestor) { + this(jobClusterDefinition, false, lastJobNumber, Lists.newArrayList(), user, requestor, true); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java index ae68d5d27..e927145fc 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java @@ -343,7 +343,7 @@ public void testJobClusterCreate() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(name); ActorRef jobClusterActor = system.actorOf(props(name, jobStoreMock, schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -390,7 +390,7 @@ public void testJobClusterEnable() { SLA sla = new SLA(1,1,null,null); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, Lists.newArrayList(),sla); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -461,7 +461,7 @@ public void testJobClusterUpdateAndDelete() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, labels); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -528,7 +528,7 @@ public void testJobClusterUpdateFailsIfArtifactNotUnique() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, labels); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -556,7 +556,7 @@ public void testJobClusterDeleteFailsIfJobsActive() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, labels); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -593,7 +593,7 @@ public void testJobClusterDeletePurgesCompletedJobs() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, labels); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -636,7 +636,7 @@ public void testJobClusterDisable() throws InterruptedException { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -731,7 +731,7 @@ public void testJobClusterSLAUpdate() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); SLA newSLA = new SLA(0,10,null,null); @@ -764,7 +764,7 @@ public void testJobClusterMigrationConfigUpdate() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); WorkerMigrationConfig newConfig = new WorkerMigrationConfig(MigrationStrategyEnum.ONE_WORKER, "{'name':'value'}"); @@ -797,7 +797,7 @@ public void testJobClusterArtifactUpdate() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -836,7 +836,7 @@ public void testJobClusterArtifactUpdateNotUniqueFails() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -873,7 +873,7 @@ public void testJobClusterArtifactUpdateMultipleTimes() throws Exception { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -941,7 +941,7 @@ public void testJobClusterInvalidSLAUpdateIgnored() throws Exception { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -976,7 +976,7 @@ public void testJobClusterLabelsUpdate() throws Exception { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); // assert initially no labels @@ -1032,7 +1032,7 @@ public void testJobSubmit() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1070,7 +1070,7 @@ public void testJobSubmitWithNoJarAndSchedInfo() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1123,7 +1123,7 @@ public void testJobSubmitWithVersionAndNoSchedInfo() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1231,7 +1231,7 @@ public void testJobComplete() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1272,7 +1272,7 @@ public void testJobKillTriggersSLAToLaunchNew() { SLA sla = new SLA(1,1,null,null); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, Lists.newArrayList(),sla); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, schedulerMockFactory, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); String jobId = clusterName + "-1"; @@ -1328,7 +1328,7 @@ public void testJobSubmitTriggersSLAToKillOld() { SLA sla = new SLA(1,1,null,null); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, Lists.newArrayList(),sla); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, schedulerMockFactory, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); String jobId = clusterName + "-1"; @@ -1410,7 +1410,7 @@ public void testJobSubmitTriggersSLAToKillOldHandlesErrors() { SLA sla = new SLA(1,1,null,null); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, Lists.newArrayList(),sla); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1493,7 +1493,7 @@ public void testCronTriggersSLAToKillOld() { SLA sla = new SLA(1,1,"0/1 * * * * ?",IJobClusterDefinition.CronPolicy.KEEP_NEW); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, Lists.newArrayList(),sla); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1543,7 +1543,7 @@ public void testJobSubmitWithUnique() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1592,7 +1592,7 @@ public void testJobSubmitWithoutInheritInstance() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1644,7 +1644,7 @@ public void testJobSubmitWithInheritInstanceFlagsSingleStage() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1710,7 +1710,7 @@ public void testJobSubmitWithInheritInstanceFlagsMultiStage() { createFakeJobClusterDefn(clusterName, Lists.newArrayList(), NO_OP_SLA, schedulingInfo1); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest( - fakeJobCluster, user, probe.getRef()), probe.getRef()); + fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1786,7 +1786,7 @@ public void testJobSubmitWithInheritInstanceFlagsScaled() { createFakeJobClusterDefn(clusterName, Lists.newArrayList(), NO_OP_SLA, schedulingInfo1); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest( - fakeJobCluster, user, probe.getRef()), probe.getRef()); + fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1854,7 +1854,7 @@ public void testQuickJobSubmit() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1895,7 +1895,7 @@ public void testQuickJobSubmitWithNoSchedInfoInPreviousJob() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -1949,7 +1949,7 @@ public void testJobSubmitWithNoSchedInfoUsesJobClusterValues() { clusterLabels.add(label); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, clusterLabels); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2026,7 +2026,7 @@ public void testQuickJobSubmitWithNoPreviousHistoryFails() { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2061,7 +2061,7 @@ public void testUpdateJobClusterArtifactWithAutoSubmit() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, Lists.newArrayList(),sla); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2132,7 +2132,7 @@ public void testJobSubmitFails() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); Mockito.doThrow(Exception.class).when(jobStoreMock).storeNewJob(any()); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2168,7 +2168,7 @@ public void testGetLastSubmittedJobSubject() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2224,7 +2224,7 @@ public void testGetLastSubmittedJobSubjectWithWrongClusterNameFails() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2268,7 +2268,7 @@ public void testListArchivedWorkers() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStore, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2327,7 +2327,7 @@ public void testZombieWorkerKilledOnMessage() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, schedulerMockFactory, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2361,7 +2361,7 @@ public void testZombieWorkerTerminateEventIgnored() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, schedulerMockFactory, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2388,7 +2388,7 @@ public void testResubmitWorker() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, schedulerMockFactory, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2448,7 +2448,7 @@ public void testScaleStage() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, schedulerMockFactory, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2514,7 +2514,7 @@ public void testGetJobDetailsForArchivedJob() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); String jobId = clusterName + "-1"; @@ -2566,7 +2566,7 @@ public void testListJobIdsForCluster() throws InvalidJobException { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2674,7 +2674,7 @@ public void testListJobsForCluster() throws InvalidJobException, InterruptedExce MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2849,7 +2849,7 @@ public void testListJobWithLabelMatch() { MantisJobStore jobStoreMock = mock(MantisJobStore.class); final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); @@ -2952,7 +2952,7 @@ public void testLostWorkerGetsReplaced() { final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreSpied, schedulerMockFactory, eventPublisher, costsCalculator, 0)); - jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, 0, user, probe.getRef()), probe.getRef()); JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); assertEquals(SUCCESS, createResp.responseCode); diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobClusterManagerAkkaTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobClusterManagerAkkaTest.java index b388994ec..c3095cba7 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobClusterManagerAkkaTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobClusterManagerAkkaTest.java @@ -107,6 +107,7 @@ import java.net.MalformedURLException; import java.time.Duration; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -856,9 +857,7 @@ public void testJobClusterCreate() throws MalformedURLException { GetJobClusterResponse resp2 = probe.expectMsgClass(GetJobClusterResponse.class); assertEquals(SUCCESS, resp2.responseCode); assertEquals(clusterName, resp2.getJobCluster().get().getName()); - - //assertEquals(jobClusterManagerActor, probe.getLastSender().path()); - + assertEquals(0L, resp2.getJobCluster().get().getLastJobCount()); } @Test @@ -879,6 +878,7 @@ public void testJobClusterCreateDupFails() throws MalformedURLException { GetJobClusterResponse resp2 = probe.expectMsgClass(GetJobClusterResponse.class); assertEquals(SUCCESS, resp2.responseCode); assertEquals(clusterName, resp2.getJobCluster().get().getName()); + assertEquals(0L, resp2.getJobCluster().get().getLastJobCount()); jobClusterManagerActor.tell(new JobClusterManagerProto.CreateJobClusterRequest( fakeJobCluster, @@ -893,9 +893,32 @@ public void testJobClusterCreateDupFails() throws MalformedURLException { GetJobClusterResponse resp4 = probe.expectMsgClass(GetJobClusterResponse.class); assertEquals(SUCCESS, resp4.responseCode); assertEquals(clusterName, resp4.getJobCluster().get().getName()); + assertEquals(0L, resp4.getJobCluster().get().getLastJobCount()); + } - //assertEquals(jobClusterManagerActor, probe.getLastSender().path()); + @Test + public void testJobClusterCreateDeletedDup() throws IOException { + TestKit probe = new TestKit(system); + String clusterName = "testJobClusterCreateCluster"; + List completedJobs = new ArrayList<>(); + completedJobs.add(new CompletedJob(clusterName, clusterName + "-45", "123", JobState.Completed, 0, 0, "me", new ArrayList<>())); + when(jobStoreMock.loadCompletedJobsForCluster(clusterName, 1, null)) + .thenReturn(completedJobs); + final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn( + clusterName, + Lists.newArrayList()); + jobClusterManagerActor.tell(new JobClusterManagerProto.CreateJobClusterRequest( + fakeJobCluster, + "user"), probe.getRef()); + JobClusterManagerProto.CreateJobClusterResponse resp = probe.expectMsgClass( + JobClusterManagerProto.CreateJobClusterResponse.class); + assertEquals(SUCCESS_CREATED, resp.responseCode); + jobClusterManagerActor.tell(new GetJobClusterRequest(clusterName), probe.getRef()); + GetJobClusterResponse resp2 = probe.expectMsgClass(GetJobClusterResponse.class); + assertEquals(SUCCESS, resp2.responseCode); + assertEquals(clusterName, resp2.getJobCluster().get().getName()); + assertEquals(45L, resp2.getJobCluster().get().getLastJobCount()); } @Test