From 0936beb64d3135cadac7252691ebdfd17a8c3f9b Mon Sep 17 00:00:00 2001 From: fdc-ntflx <103213338+fdc-ntflx@users.noreply.github.com> Date: Tue, 13 Jun 2023 04:04:46 -0700 Subject: [PATCH] Add remove/get job artifacts to cache endpoints. (#458) --- .../resourcecluster/ResourceCluster.java | 4 +++ .../server/worker/TaskExecutorGateway.java | 2 +- ...esourceClustersNonLeaderRedirectRoute.java | 36 ++++++++++++++++--- .../resourcecluster/ResourceClusterActor.java | 32 +++++++++++++++++ .../ResourceClusterAkkaImpl.java | 26 ++++++++++++++ .../ResourceClustersManagerActor.java | 6 ++++ .../proto/ResourceClusterScaleRuleProto.java | 4 +-- .../IMantisPersistenceProvider.java | 2 ++ .../KeyValueBasedPersistenceProvider.java | 10 ++++++ .../master/persistence/MantisJobStore.java | 4 +++ 10 files changed, 119 insertions(+), 7 deletions(-) diff --git a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java index 370e011d5..992f55288 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java +++ b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java @@ -66,6 +66,10 @@ public interface ResourceCluster extends ResourceClusterGateway { CompletableFuture addNewJobArtifactsToCache(ClusterID clusterID, List artifacts); + CompletableFuture removeJobArtifactsToCache(List artifacts); + + CompletableFuture> getJobArtifactsToCache(); + /** * Can throw {@link NoResourceAvailableException} wrapped within the CompletableFuture in case there * are no task executors. diff --git a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/worker/TaskExecutorGateway.java b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/worker/TaskExecutorGateway.java index aa24d34f2..5b6ffd785 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/worker/TaskExecutorGateway.java +++ b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/worker/TaskExecutorGateway.java @@ -38,7 +38,7 @@ public interface TaskExecutorGateway extends RpcGateway { CompletableFuture submitTask(ExecuteStageRequest request); /** - * instruct the task executor on which job artifact to cache in order to speed up job initialization time. + * instruct the task executor on which job artifacts to cache in order to speed up job initialization time. * * @param request List of job artifacts that need to be cached. * @return Ack in any case (this task is best effort). diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/ResourceClustersNonLeaderRedirectRoute.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/ResourceClustersNonLeaderRedirectRoute.java index f190fff35..a0688c2dc 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/ResourceClustersNonLeaderRedirectRoute.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/ResourceClustersNonLeaderRedirectRoute.java @@ -42,7 +42,7 @@ import io.mantisrx.master.resourcecluster.proto.ResourceClusterScaleRuleProto.CreateResourceClusterScaleRuleRequest; import io.mantisrx.master.resourcecluster.proto.ResourceClusterScaleRuleProto.GetResourceClusterScaleRulesRequest; import io.mantisrx.master.resourcecluster.proto.ResourceClusterScaleRuleProto.GetResourceClusterScaleRulesResponse; -import io.mantisrx.master.resourcecluster.proto.ResourceClusterScaleRuleProto.JobArtifactToCacheRequest; +import io.mantisrx.master.resourcecluster.proto.ResourceClusterScaleRuleProto.JobArtifactsToCacheRequest; import io.mantisrx.master.resourcecluster.proto.ScaleResourceRequest; import io.mantisrx.master.resourcecluster.proto.ScaleResourceResponse; import io.mantisrx.master.resourcecluster.proto.SetResourceClusterScalerStatusRequest; @@ -87,7 +87,9 @@ *

* /api/v1/resourceClusters/{}/taskExecutors/{}/getTaskExecutorState (GET) *

+ * /api/v1/resourceClusters/{}/cacheJobArtifacts (GET) * /api/v1/resourceClusters/{}/cacheJobArtifacts (POST) + * /api/v1/resourceClusters/{}/cacheJobArtifacts (DELETE) * * [Notes] * To upgrade cluster containers: each container running task executor is using docker image tag based image version. @@ -239,19 +241,26 @@ protected Route constructRoutes() { path( PathMatchers.segment().slash("cacheJobArtifacts"), (clusterName) -> pathEndOrSingleSlash(() -> concat( + // GET + get(() -> withFuture(gateway.getClusterFor(getClusterID(clusterName)).getJobArtifactsToCache())), + // POST - post(() -> cacheJobArtifacts(clusterName)) + post(() -> cacheJobArtifacts(clusterName)), + + // DELETE + delete(() -> removeJobArtifactsToCache(clusterName)) )) ), // /api/v1/resourceClusters/{}/taskExecutors/{}/getTaskExecutorState pathPrefix( PathMatchers.segment().slash("taskExecutors"), - (clusterName) -> + (clusterName) -> concat ( path( PathMatchers.segment().slash("getTaskExecutorState"), (taskExecutorId) -> pathEndOrSingleSlash(() -> concat(get(() -> getTaskExecutorState(getClusterID(clusterName), getTaskExecutorID(taskExecutorId)))))) + ) ) )); @@ -458,7 +467,7 @@ private Route getScaleRules(String clusterId) { } private Route cacheJobArtifacts(String clusterId) { - return entity(Jackson.unmarshaller(JobArtifactToCacheRequest.class), request -> { + return entity(Jackson.unmarshaller(JobArtifactsToCacheRequest.class), request -> { log.info("POST /api/v1/resourceClusters/{}/cacheJobArtifacts {}", clusterId, request); final CompletionStage response = gateway.getClusterFor(getClusterID(clusterId)).addNewJobArtifactsToCache(request.getClusterID(), request.getArtifacts()); @@ -474,4 +483,23 @@ private Route cacheJobArtifacts(String clusterId) { ); }); } + + private Route removeJobArtifactsToCache(String clusterId) { + return entity(Jackson.unmarshaller(JobArtifactsToCacheRequest.class), request -> { + log.info("DELETE /api/v1/resourceClusters/{}/cacheJobArtifacts {}", clusterId, request); + + final CompletionStage response = + gateway.getClusterFor(getClusterID(clusterId)).removeJobArtifactsToCache(request.getArtifacts()); + + return completeAsync( + response.thenApply(dontCare -> new BaseResponse(request.requestId, BaseResponse.ResponseCode.SUCCESS, "job artifacts removed successfully")), + resp -> complete( + StatusCodes.OK, + request.getArtifacts(), + Jackson.marshaller()), + Endpoints.RESOURCE_CLUSTERS, + HttpRequestMetrics.HttpVerb.DELETE + ); + }); + } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java index a68c656e7..011dc04a6 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java @@ -60,6 +60,7 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -197,6 +198,8 @@ public Receive createReceive() { .match(PublishResourceOverviewMetricsRequest.class, this::onPublishResourceOverviewMetricsRequest) .match(CacheJobArtifactsOnTaskExecutorRequest.class, this::onCacheJobArtifactsOnTaskExecutorRequest) .match(AddNewJobArtifactsToCacheRequest.class, this::onAddNewJobArtifactsToCacheRequest) + .match(RemoveJobArtifactsToCacheRequest.class, this::onRemoveJobArtifactsToCacheRequest) + .match(GetJobArtifactsToCacheRequest.class, req -> sender().tell(new ArtifactList(new ArrayList<>(jobArtifactsToCache)), self())) .build(); } @@ -204,11 +207,22 @@ private void onAddNewJobArtifactsToCacheRequest(AddNewJobArtifactsToCacheRequest try { mantisJobStore.addNewJobArtifactsToCache(req.getClusterID(), req.getArtifacts()); jobArtifactsToCache.addAll(req.artifacts); + sender().tell(Ack.getInstance(), self()); } catch (IOException e) { log.warn("Cannot add new job artifacts {} to cache in cluster: {}", req.getArtifacts(), req.getClusterID(), e); } } + private void onRemoveJobArtifactsToCacheRequest(RemoveJobArtifactsToCacheRequest req) { + try { + mantisJobStore.removeJobArtifactsToCache(req.getClusterID(), req.getArtifacts()); + req.artifacts.forEach(jobArtifactsToCache::remove); + sender().tell(Ack.getInstance(), self()); + } catch (IOException e) { + log.warn("Cannot remove job artifacts {} to cache in cluster: {}", req.getArtifacts(), req.getClusterID(), e); + } + } + private void fetchJobArtifactsToCache() { try { mantisJobStore.getJobArtifactsToCache(clusterID) @@ -775,6 +789,11 @@ static class TaskExecutorsList { List taskExecutors; } + @Value + static class ArtifactList { + List artifacts; + } + @Value static class GetClusterUsageRequest { ClusterID clusterID; @@ -808,6 +827,19 @@ static class AddNewJobArtifactsToCacheRequest { List artifacts; } + @Value + @Builder + static class RemoveJobArtifactsToCacheRequest { + ClusterID clusterID; + List artifacts; + } + + @Value + @Builder + static class GetJobArtifactsToCacheRequest { + ClusterID clusterID; + } + /** * Represents the Availability of a given node in the resource cluster. * Can go from PENDING -> ASSIGNED(workerId) -> RUNNING(workerId) -> PENDING diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java index 1318d4668..c239fd077 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java @@ -20,15 +20,18 @@ import akka.pattern.Patterns; import io.mantisrx.common.Ack; import io.mantisrx.master.resourcecluster.ResourceClusterActor.AddNewJobArtifactsToCacheRequest; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.ArtifactList; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetActiveJobsRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetAssignedTaskExecutorRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetAvailableTaskExecutorsRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetBusyTaskExecutorsRequest; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetJobArtifactsToCacheRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetRegisteredTaskExecutorsRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetTaskExecutorStatusRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetTaskExecutorWorkerMappingRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetUnregisteredTaskExecutorsRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.InitializeTaskExecutorRequest; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.RemoveJobArtifactsToCacheRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.ResourceOverviewRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayRequest; @@ -147,6 +150,29 @@ public CompletableFuture addNewJobArtifactsToCache(ClusterID clusterID, Lis .toCompletableFuture(); } + @Override + public CompletableFuture removeJobArtifactsToCache(List artifacts) { + return Patterns + .ask( + resourceClusterManagerActor, + new RemoveJobArtifactsToCacheRequest(clusterID, artifacts), + askTimeout) + .thenApply(Ack.class::cast) + .toCompletableFuture(); + } + + @Override + public CompletableFuture> getJobArtifactsToCache() { + return Patterns + .ask( + resourceClusterManagerActor, + new GetJobArtifactsToCacheRequest(clusterID), + askTimeout) + .thenApply(ArtifactList.class::cast) + .toCompletableFuture() + .thenApply(ArtifactList::getArtifacts); + } + @Override public CompletableFuture getTaskExecutorFor(TaskExecutorAllocationRequest allocationRequest) { return diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java index a6bf34886..f7894531e 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java @@ -27,9 +27,11 @@ import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetAssignedTaskExecutorRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetAvailableTaskExecutorsRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetBusyTaskExecutorsRequest; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetJobArtifactsToCacheRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetRegisteredTaskExecutorsRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetTaskExecutorStatusRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetUnregisteredTaskExecutorsRequest; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.RemoveJobArtifactsToCacheRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.ResourceOverviewRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayRequest; @@ -145,6 +147,10 @@ public Receive createReceive() { getRCActor(req.getClusterID()).forward(req, context())) .match(AddNewJobArtifactsToCacheRequest.class, req -> getRCActor(req.getClusterID()).forward(req, context())) + .match(RemoveJobArtifactsToCacheRequest.class, req -> + getRCActor(req.getClusterID()).forward(req, context())) + .match(GetJobArtifactsToCacheRequest.class, req -> + getRCActor(req.getClusterID()).forward(req, context())) .match(TriggerClusterRuleRefreshRequest.class, req -> getRCScalerActor(req.getClusterID()).forward(req, context())) .match(SetResourceClusterScalerStatusRequest.class, req -> diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/proto/ResourceClusterScaleRuleProto.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/proto/ResourceClusterScaleRuleProto.java index b59af5f8f..2830142a5 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/proto/ResourceClusterScaleRuleProto.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/proto/ResourceClusterScaleRuleProto.java @@ -95,14 +95,14 @@ public static class ResourceClusterScaleRule { @EqualsAndHashCode(callSuper = true) @Value - public static class JobArtifactToCacheRequest extends BaseRequest { + public static class JobArtifactsToCacheRequest extends BaseRequest { ClusterID clusterID; @Singular @NonNull List artifacts; - public JobArtifactToCacheRequest(@JsonProperty("clusterID") ClusterID clusterID, @JsonProperty("artifacts") List artifacts) { + public JobArtifactsToCacheRequest(@JsonProperty("clusterID") ClusterID clusterID, @JsonProperty("artifacts") List artifacts) { super(); Preconditions.checkNotNull(clusterID, "clusterID cannot be null"); Preconditions.checkNotNull(artifacts, "artifacts cannot be null"); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/IMantisPersistenceProvider.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/IMantisPersistenceProvider.java index 6be93e490..b770493b7 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/IMantisPersistenceProvider.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/IMantisPersistenceProvider.java @@ -168,6 +168,8 @@ void storeAndUpdateWorkers(final IMantisWorkerMetadata existingWorker, final IMa void addNewJobArtifactsToCache(ClusterID clusterID, List artifacts) throws IOException; + void removeJobArtifactsToCache(ClusterID clusterID, List artifacts) throws IOException; + List listJobArtifactsToCache(ClusterID clusterID) throws IOException; List listJobArtifactsByName(String prefix, String contains) throws IOException; diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/KeyValueBasedPersistenceProvider.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/KeyValueBasedPersistenceProvider.java index 3f04c2c43..2708d0530 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/KeyValueBasedPersistenceProvider.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/KeyValueBasedPersistenceProvider.java @@ -702,6 +702,16 @@ public void addNewJobArtifactsToCache(ClusterID clusterID, List arti } } + @Override + public void removeJobArtifactsToCache(ClusterID clusterID, List artifacts) throws IOException { + for (ArtifactID artifact: artifacts) { + kvStore.delete( + JOB_ARTIFACTS_TO_CACHE_PER_CLUSTER_ID_NS, + clusterID.getResourceID(), + artifact.getResourceID()); + } + } + @Override public List listJobArtifactsToCache(ClusterID clusterID) throws IOException { return new ArrayList<>(kvStore.getAll(JOB_ARTIFACTS_TO_CACHE_PER_CLUSTER_ID_NS, clusterID.getResourceID()) diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/MantisJobStore.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/MantisJobStore.java index 0b9610b3c..32572470d 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/MantisJobStore.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/persistence/MantisJobStore.java @@ -267,6 +267,10 @@ public void addNewJobArtifactsToCache(ClusterID clusterID, List arti storageProvider.addNewJobArtifactsToCache(clusterID, artifacts); } + public void removeJobArtifactsToCache(ClusterID clusterID, List artifacts) throws IOException { + storageProvider.removeJobArtifactsToCache(clusterID, artifacts); + } + public List getJobArtifactsToCache(ClusterID clusterID) throws IOException { return storageProvider.listJobArtifactsToCache(clusterID); }