Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check archived jobs for last known job number before creating cluster #738

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,18 @@ public void onJobClusterCreate(final CreateJobClusterRequest request) {
try {
Optional<JobClusterInfo> jobClusterInfoO = jobClusterInfoManager.createClusterActorAndRegister(request.getJobClusterDefinition());
if (jobClusterInfoO.isPresent()) {
jobClusterInfoManager.initializeClusterAsync(jobClusterInfoO.get(), new JobClusterProto.InitializeJobClusterRequest(request.getJobClusterDefinition(), request.getUser(), getSender()));
// Check if the job cluster name used to exist but was deleted
// If it was, use the last known job number from that cluster instead of starting from 0
// This ensures that there are no conflicts between archived jobs and old jobs
// e.g. If a job cluster with the name "MyJobCluster" was deleted, but had run a job, then
// we'd have a Job ID of `MyJobCluster-1` in the archived jobs table. When the new job cluster
// came online it may partially overwrite the old archived job if we started a new job with ID MyJobCluster-1.
List<CompletedJob> completedJobs = jobStore.loadCompletedJobsForCluster(request.getJobClusterDefinition().getName(), 1, null);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my main concern here is that this loadCompletedJobsForCluster call's logic is essentially doing a getAll (the limit is applied after getAll rows) while this creation logic is on the api response path with a default 1 sec timeout. Thus the costly call to getAll rows can cause timeout and other error here.
What about moving this to use a different call to only load the single row needed here instead of the full partition?
Another alternative would be to add a flag to the create request to make "global unique job id" an optional argument so we can still disable this call if needed.

long lastJobNum = completedJobs.stream()
.map((job) -> JobId.fromId(job.getJobId()).map(JobId::getJobNum).orElse(0L))
.max(Long::compareTo)
.orElse(0L);
jobClusterInfoManager.initializeClusterAsync(jobClusterInfoO.get(), new JobClusterProto.InitializeJobClusterRequest(request.getJobClusterDefinition(), lastJobNum, request.getUser(), getSender()));
} else {
getSender().tell(new CreateJobClusterResponse(
request.requestId, CLIENT_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ public InitializeJobClusterRequest(final JobClusterDefinitionImpl jobClusterDefi
/**
* Invoked during Job Cluster Creation
* @param jobClusterDefinition
* @param lastJobNumber
* @param user
* @param requestor
*/
public InitializeJobClusterRequest(final JobClusterDefinitionImpl jobClusterDefinition, String user, ActorRef requestor) {
this(jobClusterDefinition, false, 0, Lists.newArrayList(), user, requestor, true);
public InitializeJobClusterRequest(final JobClusterDefinitionImpl jobClusterDefinition, long lastJobNumber, String user, ActorRef requestor) {
this(jobClusterDefinition, false, lastJobNumber, Lists.newArrayList(), user, requestor, true);

}

Expand Down
Loading