diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java index 8e10fe7e47b9..58aa8a9869a3 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java @@ -18,7 +18,7 @@ public class JobQueueConfigProducer { // The interval in milliseconds to poll for job updates. static final int DEFAULT_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS = Config.getIntProperty( - "JOB_QUEUE_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS", 1000 + "JOB_QUEUE_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS", 3000 ); /** diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java index 86f5b24ca77d..8ada82da14c0 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java @@ -7,7 +7,6 @@ import com.dotcms.jobs.business.job.JobPaginatedResult; import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.queue.JobQueue; -import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotmarketing.exception.DotDataException; import java.util.Map; import java.util.Optional; @@ -94,10 +93,10 @@ String createJob(String queueName, Map parameters) * @param page The page number * @param pageSize The number of jobs per page * @return A result object containing the list of active jobs and pagination information. - * @throws JobQueueDataException if there's an error fetching the jobs + * @throws DotDataException if there's an error fetching the jobs */ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) - throws JobQueueDataException; + throws DotDataException; /** * Retrieves a list of jobs. @@ -115,9 +114,9 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) * @param page The page number * @param pageSize The number of jobs per page * @return A result object containing the list of active jobs and pagination information. - * @throws JobQueueDataException if there's an error fetching the jobs + * @throws DotDataException if there's an error fetching the jobs */ - JobPaginatedResult getActiveJobs(int page, int pageSize) throws JobQueueDataException; + JobPaginatedResult getActiveJobs(int page, int pageSize) throws DotDataException; /** * Retrieves a list of completed jobs @@ -125,9 +124,9 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) * @param page The page number * @param pageSize The number of jobs per page * @return A result object containing the list of completed jobs and pagination information. - * @throws JobQueueDataException if there's an error fetching the jobs + * @throws DotDataException if there's an error fetching the jobs */ - JobPaginatedResult getCompletedJobs(int page, int pageSize) throws JobQueueDataException; + JobPaginatedResult getCompletedJobs(int page, int pageSize) throws DotDataException; /** * Retrieves a list of canceled jobs @@ -135,9 +134,9 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) * @param page The page number * @param pageSize The number of jobs per page * @return A result object containing the list of canceled jobs and pagination information. - * @throws JobQueueDataException if there's an error fetching the jobs + * @throws DotDataException if there's an error fetching the jobs */ - JobPaginatedResult getCanceledJobs(int page, int pageSize) throws JobQueueDataException; + JobPaginatedResult getCanceledJobs(int page, int pageSize) throws DotDataException; /** * Retrieves a list of failed jobs @@ -145,9 +144,9 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) * @param page The page number * @param pageSize The number of jobs per page * @return A result object containing the list of failed jobs and pagination information. - * @throws JobQueueDataException if there's an error fetching the jobs + * @throws DotDataException if there's an error fetching the jobs */ - JobPaginatedResult getFailedJobs(int page, int pageSize) throws JobQueueDataException; + JobPaginatedResult getFailedJobs(int page, int pageSize) throws DotDataException; /** * Cancels a job. diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index ffb777cf8791..0c74f378d4e6 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -1,8 +1,11 @@ package com.dotcms.jobs.business.api; +import com.dotcms.api.system.event.Payload; +import com.dotcms.api.system.event.SystemEventType; import com.dotcms.business.CloseDBIfOpened; import com.dotcms.business.WrapInTransaction; import com.dotcms.jobs.business.api.events.EventProducer; +import com.dotcms.jobs.business.api.events.JobCancelRequestEvent; import com.dotcms.jobs.business.api.events.JobCanceledEvent; import com.dotcms.jobs.business.api.events.JobCancellingEvent; import com.dotcms.jobs.business.api.events.JobCompletedEvent; @@ -14,6 +17,7 @@ import com.dotcms.jobs.business.api.events.RealTimeJobMonitor; import com.dotcms.jobs.business.error.CircuitBreaker; import com.dotcms.jobs.business.error.ErrorDetail; +import com.dotcms.jobs.business.error.JobCancellationException; import com.dotcms.jobs.business.error.JobProcessorNotFoundException; import com.dotcms.jobs.business.error.RetryPolicyProcessor; import com.dotcms.jobs.business.error.RetryStrategy; @@ -30,11 +34,14 @@ import com.dotcms.jobs.business.queue.error.JobNotFoundException; import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotcms.jobs.business.queue.error.JobQueueException; +import com.dotcms.system.event.local.model.EventSubscriber; +import com.dotmarketing.business.APILocator; import com.dotmarketing.exception.DoesNotExistException; import com.dotmarketing.exception.DotDataException; import com.dotmarketing.exception.DotRuntimeException; import com.dotmarketing.util.Logger; import com.google.common.annotations.VisibleForTesting; +import io.vavr.control.Try; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Arrays; @@ -170,6 +177,11 @@ public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue, // Events this.realTimeJobMonitor = realTimeJobMonitor; this.eventProducer = eventProducer; + + APILocator.getLocalSystemEventsAPI().subscribe( + JobCancelRequestEvent.class, + (EventSubscriber) this::onCancelRequestJob + ); } @Override @@ -304,11 +316,11 @@ public Job getJob(final String jobId) throws DotDataException { @CloseDBIfOpened @Override public JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) - throws JobQueueDataException { + throws DotDataException { try { return jobQueue.getActiveJobs(queueName, page, pageSize); } catch (JobQueueDataException e) { - throw new JobQueueDataException("Error fetching active jobs", e); + throw new DotDataException("Error fetching active jobs", e); } } @@ -324,45 +336,41 @@ public JobPaginatedResult getJobs(final int page, final int pageSize) throws Dot @CloseDBIfOpened @Override - public JobPaginatedResult getActiveJobs(int page, int pageSize) - throws JobQueueDataException { + public JobPaginatedResult getActiveJobs(int page, int pageSize) throws DotDataException { try { return jobQueue.getActiveJobs(page, pageSize); } catch (JobQueueDataException e) { - throw new JobQueueDataException("Error fetching active jobs", e); + throw new DotDataException("Error fetching active jobs", e); } } @CloseDBIfOpened @Override - public JobPaginatedResult getCompletedJobs(int page, int pageSize) - throws JobQueueDataException { + public JobPaginatedResult getCompletedJobs(int page, int pageSize) throws DotDataException { try { return jobQueue.getCompletedJobs(page, pageSize); } catch (JobQueueDataException e) { - throw new JobQueueDataException("Error fetching completed jobs", e); + throw new DotDataException("Error fetching completed jobs", e); } } @CloseDBIfOpened @Override - public JobPaginatedResult getCanceledJobs(int page, int pageSize) - throws JobQueueDataException { + public JobPaginatedResult getCanceledJobs(int page, int pageSize) throws DotDataException { try { return jobQueue.getCanceledJobs(page, pageSize); } catch (JobQueueDataException e) { - throw new JobQueueDataException("Error fetching canceled jobs", e); + throw new DotDataException("Error fetching canceled jobs", e); } } @CloseDBIfOpened @Override - public JobPaginatedResult getFailedJobs(int page, int pageSize) - throws JobQueueDataException { + public JobPaginatedResult getFailedJobs(int page, int pageSize) throws DotDataException { try { return jobQueue.getFailedJobs(page, pageSize); } catch (JobQueueDataException e) { - throw new JobQueueDataException("Error fetching failed jobs", e); + throw new DotDataException("Error fetching failed jobs", e); } } @@ -370,37 +378,60 @@ public JobPaginatedResult getFailedJobs(int page, int pageSize) @Override public void cancelJob(final String jobId) throws DotDataException { - final Job job; - try { - job = jobQueue.getJob(jobId); - } catch (JobNotFoundException e) { - throw new DoesNotExistException(e); - } catch (JobQueueDataException e) { - throw new DotDataException("Error fetching job", e); + final Job job = getJob(jobId); + + if (job.state() == JobState.PENDING || job.state() == JobState.RUNNING) { + handleJobCancelRequest(job); + } else { + Logger.warn(this, "Job " + job.id() + " is not in a cancellable state. " + + "Current state: " + job.state()); } - final Optional instance = getInstance(jobId); - if (instance.isPresent()) { - final var processor = instance.get(); - if (processor instanceof Cancellable) { - try { - Logger.info(this, "Cancelling job " + jobId); - ((Cancellable) processor).cancel(job); - handleJobCancelling(job, processor); - } catch (Exception e) { - final var error = new DotDataException("Error cancelling job " + jobId, e); - Logger.error(JobQueueManagerAPIImpl.class, error); - throw error; + } + + /** + * Handles the cancellation of a job based on the given JobCancelRequestEvent. Retrieves the job + * and checks its state. If the job is in a PENDING or RUNNING state, attempts to cancel it by + * leveraging the job's associated processor. Logs and throws exceptions if any issues occur + * during the cancellation process. + * + * @param event The event that triggers the job cancellation request. + */ + @VisibleForTesting + @WrapInTransaction + void onCancelRequestJob(final JobCancelRequestEvent event) { + + try { + + final var job = getJob(event.getJob().id()); + if (job.state() == JobState.PENDING + || job.state() == JobState.RUNNING + || job.state() == JobState.CANCEL_REQUESTED) { + + final Optional instance = getInstance(job.id()); + if (instance.isPresent()) { + final var processor = instance.get(); + if (processor instanceof Cancellable) { + handleJobCancelling(job, processor); + } else { + final var error = new JobCancellationException( + job.id(), "Job is not Cancellable"); + Logger.error(JobQueueManagerAPIImpl.class, error); + throw error; + } + } else { + // In a cluster, the job may be running on another server + Logger.debug(this, + "Job cancellation requested. No processor found for job " + job.id()); } } else { - final var error = new DotDataException("Job " + jobId + " cannot be canceled"); - Logger.error(JobQueueManagerAPIImpl.class, error); - throw error; + Logger.warn(this, "Job " + job.id() + " is not in a cancellable state. " + + "Current state: " + job.state()); } - } else { - Logger.error(this, "No processor found for job " + jobId); - throw new JobProcessorNotFoundException(job.queueName(), jobId); + } catch (DotDataException e) { + throw new JobCancellationException(event.getJob().id(), e); } + } @Override @@ -465,6 +496,24 @@ private void pollJobUpdates() { } } + /** + * Fetches the state of a job from the job queue using the provided job ID. + * + * @param jobId the unique identifier of the job whose state is to be fetched + * @return the current state of the job + * @throws DotDataException if there is an error accessing the job state data + */ + @CloseDBIfOpened + private JobState getJobState(final String jobId) throws DotDataException { + try { + return jobQueue.getJobState(jobId); + } catch (JobNotFoundException e) { + throw new DoesNotExistException(e); + } catch (JobQueueDataException e) { + throw new DotDataException("Error fetching job state", e); + } + } + /** * Updates the progress of a job and notifies its watchers. * @@ -484,7 +533,11 @@ private float updateJobProgress(final Job job, final ProgressTracker progressTra // Only update progress if it has changed if (progress > previousProgress) { - Job updatedJob = job.withProgress(progress); + // Make sure we have the latest state, the job of the running processor won't + // be updated with changes on the state, like a cancel request. + final var latestState = getJobState(job.id()); + + Job updatedJob = job.withProgress(progress).withState(latestState); jobQueue.updateJobProgress(job.id(), updatedJob.progress()); eventProducer.getEvent(JobProgressUpdatedEvent.class).fire( @@ -708,19 +761,14 @@ private void processJob(final Job job) throws DotDataException { } catch (DotDataException e) { throw new DotRuntimeException("Error updating job progress", e); } - }, 0, 1, TimeUnit.SECONDS + }, 0, 2, TimeUnit.SECONDS ); // Process the job processor.process(runningJob); - if (jobQueue.hasJobBeenInState(runningJob.id(), JobState.CANCELLING)) { - handleJobCancellation(runningJob, processor); - } else { - handleJobCompletion(runningJob, processor); - } - //Free up resources - removeInstanceRef(runningJob.id()); + // The job finished processing + handleJobCompletion(runningJob, processor); } catch (Exception e) { Logger.error(this, @@ -729,6 +777,9 @@ private void processJob(final Job job) throws DotDataException { handleJobFailure( runningJob, processor, e, "Job execution" ); + } finally { + //Free up resources + removeInstanceRef(runningJob.id()); } } else { @@ -808,27 +859,54 @@ private void handleJobCompletion(final Job job, final JobProcessor processor) final float progress = getJobProgress(job); - final Job completedJob = job.markAsCompleted(jobResult).withProgress(progress); - updateJobStatus(completedJob); - eventProducer.getEvent(JobCompletedEvent.class).fire( - new JobCompletedEvent(completedJob, LocalDateTime.now()) - ); + try { + if (jobQueue.hasJobBeenInState(job.id(), JobState.CANCEL_REQUESTED, JobState.CANCELLING)) { + Job canceledJob = job.markAsCanceled(jobResult).withProgress(progress); + updateJobStatus(canceledJob); + eventProducer.getEvent(JobCanceledEvent.class).fire( + new JobCanceledEvent(canceledJob, LocalDateTime.now()) + ); + } else { + final Job completedJob = job.markAsCompleted(jobResult).withProgress(progress); + updateJobStatus(completedJob); + eventProducer.getEvent(JobCompletedEvent.class).fire( + new JobCompletedEvent(completedJob, LocalDateTime.now()) + ); + } + } catch (JobQueueDataException e) { + final var errorMessage = "Error updating job status"; + Logger.error(this, errorMessage, e); + throw new DotDataException(errorMessage, e); + } } /** - * Handles the cancellation of a job. + * Handles the request to cancel a job. * - * @param job The job that was canceled. - * @param processor The processor that handled the job. + * @param job The job to cancel. */ @WrapInTransaction - private void handleJobCancelling(final Job job, final JobProcessor processor) - throws DotDataException { + private void handleJobCancelRequest(final Job job) throws DotDataException { - Job cancelJob = job.withState(JobState.CANCELLING); + Job cancelJob = job.withState(JobState.CANCEL_REQUESTED); updateJobStatus(cancelJob); - eventProducer.getEvent(JobCancellingEvent.class).fire( - new JobCancellingEvent(cancelJob, LocalDateTime.now()) + + // Prepare the cancel request events + final JobCancelRequestEvent cancelRequestEvent = new JobCancelRequestEvent( + cancelJob, LocalDateTime.now() + ); + + // LOCAL event + APILocator.getLocalSystemEventsAPI().notify(cancelRequestEvent); + + // CLUSTER WIDE event + Try.run(() -> APILocator.getSystemEventsAPI() + .push(SystemEventType.CLUSTER_WIDE_EVENT, new Payload(cancelRequestEvent))) + .onFailure(e -> Logger.error(JobQueueManagerAPIImpl.this, e.getMessage())); + + // CDI event + eventProducer.getEvent(JobCancelRequestEvent.class).fire( + cancelRequestEvent ); } @@ -839,23 +917,22 @@ private void handleJobCancelling(final Job job, final JobProcessor processor) * @param processor The processor that handled the job. */ @WrapInTransaction - private void handleJobCancellation(final Job job, final JobProcessor processor) - throws DotDataException { + private void handleJobCancelling(final Job job, final JobProcessor processor) { - final var resultMetadata = processor.getResultMetadata(job); + try { + Logger.info(this, "Cancelling job " + job.id()); + ((Cancellable) processor).cancel(job); - JobResult jobResult = null; - if (resultMetadata != null && !resultMetadata.isEmpty()) { - jobResult = JobResult.builder().metadata(resultMetadata).build(); + Job cancelJob = job.withState(JobState.CANCELLING); + updateJobStatus(cancelJob); + eventProducer.getEvent(JobCancellingEvent.class).fire( + new JobCancellingEvent(cancelJob, LocalDateTime.now()) + ); + } catch (DotDataException e) { + final var error = new JobCancellationException(job.id(), e); + Logger.error(this, error); + throw error; } - - final float progress = getJobProgress(job); - - Job canceledJob = job.markAsCanceled(jobResult).withProgress(progress); - updateJobStatus(canceledJob); - eventProducer.getEvent(JobCanceledEvent.class).fire( - new JobCanceledEvent(canceledJob, LocalDateTime.now()) - ); } /** diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancelRequestEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancelRequestEvent.java new file mode 100644 index 000000000000..9554b1e8f573 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancelRequestEvent.java @@ -0,0 +1,38 @@ +package com.dotcms.jobs.business.api.events; + +import com.dotcms.jobs.business.job.Job; +import java.time.LocalDateTime; + +/** + * Event fired when there is a request to cancel a job. + */ +public class JobCancelRequestEvent { + + private final Job job; + private final LocalDateTime canceledOn; + + /** + * Constructs a new JobCancelRequestEvent. + * + * @param job The job to cancel. + * @param canceledOn The timestamp when the cancel request was made. + */ + public JobCancelRequestEvent(Job job, LocalDateTime canceledOn) { + this.job = job; + this.canceledOn = canceledOn; + } + + /** + * @return The job to cancel. + */ + public Job getJob() { + return job; + } + + /** + * @return The timestamp when the cancel request was made. + */ + public LocalDateTime getCanceledOn() { + return canceledOn; + } +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java index 29c9eab2e0b7..4c761158fa0e 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java @@ -3,14 +3,13 @@ import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobState; import com.dotmarketing.util.Logger; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; import java.util.function.Predicate; import javax.enterprise.context.ApplicationScoped; @@ -77,35 +76,90 @@ public class RealTimeJobMonitor { * its own filter predicate. Watchers are automatically removed when a job reaches a final state * (completed, cancelled, or removed).

* + *

Thread Safety:

+ *
    + *
  • This method is thread-safe and can be called concurrently from multiple threads.
  • + *
  • Internally uses {@link CopyOnWriteArrayList} to store watchers, which provides: + *
      + *
    • Thread-safe reads without synchronization - all iteration operations use an immutable snapshot
    • + *
    • Thread-safe modifications - each modification creates a new internal copy
    • + *
    • Memory consistency effects - actions in a thread prior to placing an object into a + * CopyOnWriteArrayList happen-before actions subsequent to the access or removal + * of that element from the CopyOnWriteArrayList in another thread
    • + *
    + *
  • + *
  • The trade-off is that modifications (adding/removing watchers) are more expensive as they + * create a new copy of the internal array, but this is acceptable since: + *
      + *
    • Reads (notifications) are much more frequent than writes (registering/removing watchers)
    • + *
    • The number of watchers per job is typically small
    • + *
    • Registration/removal of watchers is not in the critical path
    • + *
    + *
  • + *
+ * * @param jobId The ID of the job to watch * @param watcher The consumer to be notified of job updates * @param filter Optional predicate to filter job updates (null means receive all updates) - * @throws IllegalArgumentException if jobId or watcher is null + * @throws NullPointerException if jobId or watcher is null * @see Predicates for common filter predicates + * @see CopyOnWriteArrayList for more details about the thread-safety guarantees */ public void registerWatcher(String jobId, Consumer watcher, Predicate filter) { + + Objects.requireNonNull(jobId, "jobId cannot be null"); + Objects.requireNonNull(watcher, "watcher cannot be null"); + jobWatchers.compute(jobId, (key, existingWatchers) -> { List watchers = Objects.requireNonNullElseGet( existingWatchers, - () -> Collections.synchronizedList(new ArrayList<>()) + CopyOnWriteArrayList::new ); - final var jobWatcher = JobWatcher.builder() + watchers.add(JobWatcher.builder() .watcher(watcher) - .filter(filter != null ? filter : job -> true).build(); + .filter(filter != null ? filter : job -> true) + .build()); + + Logger.debug(this, String.format( + "Added watcher for job %s. Total watchers: %d", jobId, watchers.size())); - watchers.add(jobWatcher); return watchers; }); } /** - * Registers a watcher for a specific job that receives all updates. - * This is a convenience method equivalent to calling {@code registerWatcher(jobId, watcher, null)}. + * Registers a watcher for a specific job. The watcher receives all updates for the job. + * + *

Multiple watchers can be registered for the same job. Watchers are automatically removed + * when a job reaches a final state (completed, cancelled, or removed).

+ * + *

Thread Safety:

+ *
    + *
  • This method is thread-safe and can be called concurrently from multiple threads.
  • + *
  • Internally uses {@link CopyOnWriteArrayList} to store watchers, which provides: + *
      + *
    • Thread-safe reads without synchronization - all iteration operations use an immutable snapshot
    • + *
    • Thread-safe modifications - each modification creates a new internal copy
    • + *
    • Memory consistency effects - actions in a thread prior to placing an object into a + * CopyOnWriteArrayList happen-before actions subsequent to the access or removal + * of that element from the CopyOnWriteArrayList in another thread
    • + *
    + *
  • + *
  • The trade-off is that modifications (adding/removing watchers) are more expensive as they + * create a new copy of the internal array, but this is acceptable since: + *
      + *
    • Reads (notifications) are much more frequent than writes (registering/removing watchers)
    • + *
    • The number of watchers per job is typically small
    • + *
    • Registration/removal of watchers is not in the critical path
    • + *
    + *
  • + *
* * @param jobId The ID of the job to watch * @param watcher The consumer to be notified of job updates - * @throws IllegalArgumentException if jobId or watcher is null + * @throws NullPointerException if jobId or watcher is null + * @see CopyOnWriteArrayList for more details about the thread-safety guarantees */ public void registerWatcher(String jobId, Consumer watcher) { registerWatcher(jobId, watcher, null); @@ -150,7 +204,14 @@ private void updateWatchers(Job job) { } } catch (Exception e) { Logger.error(this, "Error notifying job watcher for job " + job.id(), e); + + // Direct remove is thread-safe with CopyOnWriteArrayList watchers.remove(jobWatcher); + + // If this was the last watcher, clean up the map entry + if (watchers.isEmpty()) { + jobWatchers.remove(job.id()); + } } }); } @@ -162,7 +223,13 @@ private void updateWatchers(Job job) { * @param jobId The ID of the job whose watcher is to be removed. */ private void removeWatcher(String jobId) { - jobWatchers.remove(jobId); + + List removed = jobWatchers.remove(jobId); + if (removed != null) { + Logger.debug(this, + String.format("Removed all watchers for job %s. Watchers removed: %d", + jobId, removed.size())); + } } /** @@ -174,6 +241,15 @@ public void onJobStarted(@Observes JobStartedEvent event) { updateWatchers(event.getJob()); } + /** + * Handles the job cancel request event. + * + * @param event The JobCancelRequestEvent. + */ + public void onJobCancelRequest(@Observes JobCancelRequestEvent event) { + updateWatchers(event.getJob()); + } + /** * Handles the job cancelling event. * diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobCancellationException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobCancellationException.java index 449ce08aee29..3b4ac80b19de 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobCancellationException.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobCancellationException.java @@ -6,6 +6,16 @@ */ public class JobCancellationException extends RuntimeException { + /** + * Constructs a new JobCancellationException with the specified job ID and cause. + * + * @param jobId The ID of the job that encountered an error during cancellation + * @param cause The underlying cause of the error (can be null) + */ + public JobCancellationException(String jobId, Throwable cause) { + super("Failed to cancel job " + jobId + ".", cause); + } + /** * Constructs a new JobCancellationException with the specified job ID and reason. * diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobCache.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobCache.java new file mode 100644 index 000000000000..85daa8869659 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobCache.java @@ -0,0 +1,64 @@ +package com.dotcms.jobs.business.job; + +import com.dotmarketing.business.Cachable; + +/** + * Interface for caching job objects. This interface extends the {@link Cachable} interface and + * provides methods for adding, retrieving, and removing jobs from the cache. + */ +public interface JobCache extends Cachable { + + /** + * Adds a job to the cache. + * + * @param job the job to be added to the cache + */ + void put(Job job); + + /** + * Adds a job state to the cache. + * + * @param jobId the ID of the job state to add to the cache + * @param jobState the state of the job to add to the cache + */ + void putState(String jobId, JobState jobState); + + /** + * Retrieves a job from the cache by its ID. + * + * @param jobId the ID of the job to be retrieved + * @return the job associated with the given ID, or null if no such job exists in the cache + */ + Job get(String jobId); + + /** + * Retrieves the state of a job from the cache by its ID. + * + * @param jobId the ID of the job whose state is to be retrieved + * @return the state of the job associated with the given ID, or null if no such state exists in + * the cache + */ + JobState getState(String jobId); + + /** + * Removes a job from the cache. + * + * @param job the job to be removed from the cache + */ + void remove(Job job); + + /** + * Removes a job from the cache by its ID. + * + * @param jobId the ID of the job to be removed + */ + void remove(String jobId); + + /** + * Removes a job state from the cache by its ID. + * + * @param jobId the ID of the job state to be removed + */ + void removeState(final String jobId); + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobCacheImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobCacheImpl.java new file mode 100644 index 000000000000..68d8cf3bb46e --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobCacheImpl.java @@ -0,0 +1,90 @@ +package com.dotcms.jobs.business.job; + +import com.dotcms.util.DotPreconditions; +import com.dotmarketing.business.CacheLocator; +import com.dotmarketing.business.DotCacheAdministrator; +import com.dotmarketing.business.DotCacheException; + +public class JobCacheImpl implements JobCache { + + static final String JOB_BY_ID = "JOB_BY_ID_"; + static final String JOB_STATE_BY_ID = "JOB_STATE_BY_ID_"; + + @Override + public String getPrimaryGroup() { + return "JobCacheImpl"; + } + + @Override + public String[] getGroups() { + return new String[0]; + } + + private String getKeyById(final String id) { + return JOB_BY_ID + id; + } + + private String getStateKeyById(final String id) { + return JOB_STATE_BY_ID + id; + } + + @Override + public void put(final Job job) { + DotCacheAdministrator cache = CacheLocator.getCacheAdministrator(); + cache.put(getKeyById(job.id()), job, getPrimaryGroup()); + } + + @Override + public void putState(final String jobId, final JobState jobState) { + DotCacheAdministrator cache = CacheLocator.getCacheAdministrator(); + cache.put(getStateKeyById(jobId), jobState, getPrimaryGroup()); + } + + @Override + public Job get(final String jobId) { + DotPreconditions.checkNotNull(jobId); + + try { + DotCacheAdministrator cache = CacheLocator.getCacheAdministrator(); + return (Job) cache.get(getKeyById(jobId), getPrimaryGroup()); + } catch (DotCacheException e) { + return null; + } + } + + @Override + public JobState getState(final String jobId) { + DotPreconditions.checkNotNull(jobId); + + try { + DotCacheAdministrator cache = CacheLocator.getCacheAdministrator(); + return (JobState) cache.get(getStateKeyById(jobId), getPrimaryGroup()); + } catch (DotCacheException e) { + return null; + } + } + + @Override + public void remove(final Job job) { + remove(job.id()); + } + + @Override + public void remove(final String jobId) { + DotCacheAdministrator cache = CacheLocator.getCacheAdministrator(); + cache.remove(getKeyById(jobId), getPrimaryGroup()); + } + + @Override + public void removeState(final String jobId) { + DotCacheAdministrator cache = CacheLocator.getCacheAdministrator(); + cache.remove(getStateKeyById(jobId), getPrimaryGroup()); + } + + @Override + public void clearCache() { + DotCacheAdministrator cache = CacheLocator.getCacheAdministrator(); + cache.flushGroup(getPrimaryGroup()); + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java index 0bbdb4c50a0f..c13b8a6d3587 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java @@ -15,11 +15,6 @@ public enum JobState { */ RUNNING, - /** - * The job is currently being canceled. - */ - CANCELLING, - /** * The job has finished executing successfully. */ @@ -30,6 +25,16 @@ public enum JobState { */ FAILED, + /** + * The job is waiting to be canceled. + */ + CANCEL_REQUESTED, + + /** + * The job is currently being canceled. + */ + CANCELLING, + /** * The job was canceled before it could complete. */ diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java index 5c586ee62a59..3c8b446f8b39 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java @@ -1,5 +1,6 @@ package com.dotcms.jobs.business.processor.impl; +import com.dotcms.contenttype.exception.NotFoundInDbException; import com.dotcms.contenttype.model.type.ContentType; import com.dotcms.jobs.business.error.JobCancellationException; import com.dotcms.jobs.business.error.JobProcessingException; @@ -20,8 +21,8 @@ import com.dotmarketing.exception.DotHibernateException; import com.dotmarketing.exception.DotSecurityException; import com.dotmarketing.portlets.contentlet.action.ImportAuditUtil; +import com.dotmarketing.portlets.languagesmanager.model.Language; import com.dotmarketing.util.AdminLogger; -import com.dotmarketing.util.FileUtil; import com.dotmarketing.util.ImportUtil; import com.dotmarketing.util.Logger; import com.google.common.hash.Hashing; @@ -138,11 +139,8 @@ public void process(final Job job) throws JobProcessingException { // Validate the job has the required data validate(job); - final var language = getLanguage(job); final var fileToImport = tempFile.get().file; final long totalLines = totalLines(job, fileToImport); - final Charset charset = language == -1 ? - Charset.defaultCharset() : FileUtil.detectEncodeType(fileToImport); // Create a progress callback function final var progressTracker = job.progressTracker().orElseThrow( @@ -154,11 +152,10 @@ public void process(final Job job) throws JobProcessingException { progressTracker.updateProgress(Math.min(1.0f, Math.max(0.0f, progressPercentage))); }; - if (CMD_PREVIEW.equals(command)) { - handlePreview(job, language, fileToImport, charset, user, progressCallback); - } else if (CMD_PUBLISH.equals(command)) { - handlePublish(job, language, fileToImport, charset, user, progressCallback); - } + // Handle the import operation based on the command, by default any command that is not + // "publish" is considered preview. + final boolean isPublish = CMD_PUBLISH.equals(command); + handleImport(!isPublish, job, fileToImport, user, progressCallback); if (!cancellationRequested.get()) { // Ensure the progress is at 100% when the job is done @@ -200,76 +197,34 @@ public Map getResultMetadata(Job job) { } /** - * Handles the preview phase of content import. This method analyzes the CSV file and provides - * information about potential issues without actually importing the content. + * Handles the content import. Depending on the preview flag, this method will either analyze + * the content for potential issues or perform the actual import operation. * + * @param preview Flag indicating whether the operation is a preview or publish * @param job The import job configuration - * @param language The target language for import * @param fileToImport The CSV file to be imported - * @param charset The character encoding of the import file * @param user The user performing the import * @param progressCallback Callback for tracking import progress */ - private void handlePreview(final Job job, long language, final File fileToImport, - final Charset charset, final User user, final LongConsumer progressCallback) { - - try { - try (Reader reader = new BufferedReader( - new InputStreamReader(new FileInputStream(fileToImport), charset))) { - - CsvReader csvReader = createCsvReader(reader); - CsvHeaderInfo headerInfo = processHeadersBasedOnLanguage(job, language, csvReader); - - final var previewResult = generatePreview(job, user, - headerInfo.headers, csvReader, headerInfo.languageCodeColumn, - headerInfo.countryCodeColumn, progressCallback); - resultMetadata = new HashMap<>(previewResult); - } - } catch (Exception e) { + private void handleImport(final boolean preview, final Job job, final File fileToImport, + final User user, final LongConsumer progressCallback) { - try { - HibernateUtil.rollbackTransaction(); - } catch (DotHibernateException he) { - Logger.error(this, he.getMessage(), he); - } - - final var errorMessage = "An error occurred when analyzing the CSV file."; - Logger.error(this, errorMessage, e); - throw new JobProcessingException(job.id(), errorMessage, e); + if (!preview) { + AdminLogger.log( + ImportContentletsProcessor.class, "process", + "Importing Contentlets", user + ); } - } - - /** - * Handles the publish phase of content import. This method performs the actual content import - * operation, creating or updating content based on the CSV file. - * - * @param job The import job configuration - * @param language The target language for import - * @param fileToImport The CSV file to be imported - * @param charset The character encoding of the import file - * @param user The user performing the import - * @param progressCallback Callback for tracking import progress - */ - private void handlePublish(final Job job, long language, final File fileToImport, - final Charset charset, final User user, final LongConsumer progressCallback) { - AdminLogger.log( - ImportContentletsProcessor.class, "process", - "Importing Contentlets", user - ); + try (Reader reader = new BufferedReader( + new InputStreamReader(new FileInputStream(fileToImport), + Charset.defaultCharset()))) { - try { - try (Reader reader = new BufferedReader( - new InputStreamReader(new FileInputStream(fileToImport), charset))) { + CsvReader csvReader = createCsvReader(reader); - CsvReader csvReader = createCsvReader(reader); - CsvHeaderInfo headerInfo = readPublishHeaders(language, csvReader); - - final var importResults = processFile(job, user, headerInfo.headers, csvReader, - headerInfo.languageCodeColumn, headerInfo.countryCodeColumn, - progressCallback); - resultMetadata = new HashMap<>(importResults); - } + final var importResults = processImport(preview, job, user, csvReader, + progressCallback); + resultMetadata = new HashMap<>(importResults); } catch (Exception e) { try { @@ -278,7 +233,8 @@ private void handlePublish(final Job job, long language, final File fileToImport Logger.error(this, he.getMessage(), he); } - final var errorMessage = "An error occurred when importing the CSV file."; + final var errorMessage = String.format("An error occurred when %s the CSV file.", + preview ? "analyzing" : "importing"); Logger.error(this, errorMessage, e); throw new JobProcessingException(job.id(), errorMessage, e); } finally { @@ -288,75 +244,42 @@ private void handlePublish(final Job job, long language, final File fileToImport } /** - * Reads and analyzes the content of the CSV import file to determine potential errors, - * inconsistencies or warnings, and provide the user with useful information regarding the - * contents of the file. - * - * @param job - The {@link Job} being processed. - * @param user - The {@link User} performing this action. - * @param csvHeaders - The headers that make up the CSV file. - * @param csvReader - The actual data contained in the CSV file. - * @param languageCodeHeaderColumn - The column name containing the language code. - * @param countryCodeHeaderColumn - The column name containing the country code. - * @param progressCallback - The callback function to update the progress of the job. - * @throws DotDataException An error occurred when analyzing the CSV file. - */ - private Map> generatePreview(final Job job, final User user, - final String[] csvHeaders, final CsvReader csvReader, - final int languageCodeHeaderColumn, int countryCodeHeaderColumn, - final LongConsumer progressCallback) throws DotDataException { - - final var currentSiteId = getSiteIdentifier(job); - final var currentSiteName = getSiteName(job); - final var contentType = getContentType(job); - final var fields = getFields(job); - final var language = getLanguage(job); - final var workflowActionId = getWorkflowActionId(job); - final var httpReq = JobUtil.generateMockRequest(user, currentSiteName); - - Logger.info(this, "-------- Starting Content Import Preview -------- "); - Logger.info(this, String.format("-> Content Type ID: %s", contentType)); - - return ImportUtil.importFile(0L, currentSiteId, contentType, fields, true, - (language == -1), user, language, csvHeaders, csvReader, languageCodeHeaderColumn, - countryCodeHeaderColumn, workflowActionId, httpReq, progressCallback); - } - - /** - * Executes the content import process after the review process has been run and displayed to - * the user. + * Executes the content import for a preview or publish operation. This method processes the + * CSV file and imports the content into dotCMS or reviews the content for potential issues. * + * @param preview - Flag indicating whether the operation is a preview or publish * @param job - The {@link Job} being processed. * @param user - The {@link User} performing this action. - * @param csvHeaders - The headers that make up the CSV file. * @param csvReader - The actual data contained in the CSV file. - * @param languageCodeHeaderColumn - The column name containing the language code. - * @param countryCodeHeaderColumn - The column name containing the country code. * @param progressCallback - The callback function to update the progress of the job. * @return The status of the content import performed by dotCMS. This provides information * regarding inconsistencies, errors, warnings and/or precautions to the user. * @throws DotDataException An error occurred when importing the CSV file. */ - private Map> processFile(final Job job, final User user, - final String[] csvHeaders, final CsvReader csvReader, - final int languageCodeHeaderColumn, final int countryCodeHeaderColumn, - final LongConsumer progressCallback) throws DotDataException { + private Map> processImport(final boolean preview, final Job job, + final User user, final CsvReader csvReader, final LongConsumer progressCallback) + throws DotDataException, IOException, DotSecurityException { final var currentSiteId = getSiteIdentifier(job); final var currentSiteName = getSiteName(job); - final var contentType = getContentType(job); + final var contentType = findContentType(job); final var fields = getFields(job); - final var language = getLanguage(job); + final var language = findLanguage(job); final var workflowActionId = getWorkflowActionId(job); final var httpReq = JobUtil.generateMockRequest(user, currentSiteName); final var importId = jobIdToLong(job.id()); - Logger.info(this, "-------- Starting Content Import Process -------- "); - Logger.info(this, String.format("-> Content Type ID: %s", contentType)); + // Read headers and process language columns for multilingual imports + CsvHeaderInfo headerInfo = readHeaders(job, language == null, csvReader); - return ImportUtil.importFile(importId, currentSiteId, contentType, fields, false, - (language == -1), user, language, csvHeaders, csvReader, languageCodeHeaderColumn, - countryCodeHeaderColumn, workflowActionId, httpReq, progressCallback); + Logger.info(this, String.format("-------- Starting Content Import %s -------- ", + preview ? "Preview" : "Process")); + Logger.info(this, String.format("-> Content Type: %s", contentType.variable())); + + return ImportUtil.importFile(importId, currentSiteId, contentType.id(), fields, preview, + language == null, user, language == null ? -1 : language.getId(), + headerInfo.headers, csvReader, headerInfo.languageCodeColumn, + headerInfo.countryCodeColumn, workflowActionId, httpReq, progressCallback); } /** @@ -428,26 +351,19 @@ private String getWorkflowActionId(final Job job) { } /** - * Retrieves the language setting from the job parameters. Handles both string and long - * parameter types. + * Retrieves the language from the job parameters. * * @param job The job containing the parameters - * @return The language ID as a long, or -1 if not specified + * @return An optional containing the language string, or an empty optional if not present */ - private long getLanguage(final Job job) { + private Optional getLanguage(final Job job) { if (!job.parameters().containsKey(PARAMETER_LANGUAGE) || job.parameters().get(PARAMETER_LANGUAGE) == null) { - return -1; + return Optional.empty(); } - final Object language = job.parameters().get(PARAMETER_LANGUAGE); - - if (language instanceof String) { - return Long.parseLong((String) language); - } - - return (long) language; + return Optional.of((String) job.parameters().get(PARAMETER_LANGUAGE)); } /** @@ -481,23 +397,38 @@ public String[] getFields(final Job job) { */ private void validate(final Job job) { + // Validating the language (will throw an exception if it doesn't) + final Language language = findLanguage(job); + if (getContentType(job) != null && getContentType(job).isEmpty()) { - Logger.error(this.getClass(), "A Content Type is required"); - throw new JobValidationException(job.id(), "A Content Type is required"); + final var errorMessage = "A Content Type id or variable is required"; + Logger.error(this.getClass(), errorMessage); + throw new JobValidationException(job.id(), errorMessage); } else if (getWorkflowActionId(job) != null && getWorkflowActionId(job).isEmpty()) { - Logger.error(this.getClass(), "Workflow action type is required"); - throw new JobValidationException(job.id(), "Workflow action type is required"); + final var errorMessage = "A Workflow Action id is required"; + Logger.error(this.getClass(), errorMessage); + throw new JobValidationException(job.id(), errorMessage); + } else if (language == null && getFields(job).length == 0) { + final var errorMessage = + "A key identifying the different Language versions of the same " + + "content must be defined when importing multilingual files."; + Logger.error(this, errorMessage); + throw new JobValidationException(job.id(), errorMessage); } - // Security measure to prevent invalid attempts to import a host. try { + + // Make sure the content type exist (will throw an exception if it doesn't) + final var contentTypeFound = findContentType(job); + + // Security measure to prevent invalid attempts to import a host. final ContentType hostContentType = APILocator.getContentTypeAPI( - APILocator.systemUser()).find(Host.HOST_VELOCITY_VAR_NAME - ); - final boolean isHost = (hostContentType.id().equals(getContentType(job))); + APILocator.systemUser()).find(Host.HOST_VELOCITY_VAR_NAME); + final boolean isHost = (hostContentType.id().equals(contentTypeFound.id())); if (isHost) { - Logger.error(this, "Invalid attempt to import a host."); - throw new JobValidationException(job.id(), "Invalid attempt to import a host."); + final var errorMessage = "Invalid attempt to import a host."; + Logger.error(this, errorMessage); + throw new JobValidationException(job.id(), errorMessage); } } catch (DotSecurityException | DotDataException e) { throw new JobProcessingException(job.id(), "Error validating content type", e); @@ -547,28 +478,33 @@ private Long totalLines(final Job job, final File dotTempFile) { } /** - * Reads and processes headers for publishing operation. + * Reads and processes headers from the CSV file. Handles both single and multilingual content + * imports. * - * @param language The target language for import - * @param csvreader The CSV reader containing the file data + * @param job The current import job + * @param isMultilingual Flag indicating whether the import is multilingual + * @param csvReader The CSV reader containing the file data * @return CsvHeaderInfo containing processed header information * @throws IOException if an error occurs reading the CSV file */ - private CsvHeaderInfo readPublishHeaders(long language, CsvReader csvreader) + private CsvHeaderInfo readHeaders(final Job job, boolean isMultilingual, CsvReader csvReader) throws IOException { - if (language == -1 && csvreader.readHeaders()) { - return findLanguageColumnsInHeaders(csvreader.getHeaders()); + + if (isMultilingual) { + return processMultilingualHeaders(job, csvReader); } + return new CsvHeaderInfo(null, -1, -1); } /** * Locates language-related columns in CSV headers. * + * @param job The current import job * @param headers Array of CSV header strings * @return CsvHeaderInfo containing the positions of language and country code columns */ - private CsvHeaderInfo findLanguageColumnsInHeaders(String[] headers) { + private CsvHeaderInfo findLanguageColumnsInHeaders(Job job, String[] headers) { int languageCodeColumn = -1; int countryCodeColumn = -1; @@ -585,6 +521,7 @@ private CsvHeaderInfo findLanguageColumnsInHeaders(String[] headers) { } } + validateLanguageColumns(job, languageCodeColumn, countryCodeColumn); return new CsvHeaderInfo(headers, languageCodeColumn, countryCodeColumn); } @@ -600,39 +537,6 @@ private CsvReader createCsvReader(final Reader reader) { return csvreader; } - /** - * Processes CSV headers based on the specified language configuration. - * - * @param job The current import job - * @param language The target language for import - * @param csvReader The CSV reader to process headers from - * @return CsvHeaderInfo containing processed header information - * @throws IOException if an error occurs reading the CSV file - */ - private CsvHeaderInfo processHeadersBasedOnLanguage(final Job job, final long language, - final CsvReader csvReader) throws IOException { - if (language != -1) { - validateLanguage(job, language); - return new CsvHeaderInfo(null, -1, -1); - } - - return processMultilingualHeaders(job, csvReader); - } - - /** - * Validates the language configuration for import operations. - * - * @param job The current import job - * @param language The language identifier to validate - */ - private void validateLanguage(Job job, long language) { - if (language == 0) { - final var errorMessage = "Please select a valid Language."; - Logger.error(this, errorMessage); - throw new JobValidationException(job.id(), errorMessage); - } - } - /** * Processes headers for multilingual content imports. * @@ -644,14 +548,6 @@ private void validateLanguage(Job job, long language) { private CsvHeaderInfo processMultilingualHeaders(final Job job, final CsvReader csvReader) throws IOException { - if (getFields(job).length == 0) { - final var errorMessage = - "A key identifying the different Language versions of the same " - + "content must be defined when importing multilingual files."; - Logger.error(this, errorMessage); - throw new JobValidationException(job.id(), errorMessage); - } - if (!csvReader.readHeaders()) { final var errorMessage = "An error occurred when attempting to read the CSV file headers."; Logger.error(this, errorMessage); @@ -659,35 +555,7 @@ private CsvHeaderInfo processMultilingualHeaders(final Job job, final CsvReader } String[] headers = csvReader.getHeaders(); - return findLanguageColumns(job, headers); - } - - /** - * Locates language-related columns in CSV headers. - * - * @param headers Array of CSV header strings - * @return CsvHeaderInfo containing the positions of language and country code columns - */ - private CsvHeaderInfo findLanguageColumns(Job job, String[] headers) - throws JobProcessingException { - - int languageCodeColumn = -1; - int countryCodeColumn = -1; - - for (int column = 0; column < headers.length; ++column) { - if (headers[column].equals(LANGUAGE_CODE_HEADER)) { - languageCodeColumn = column; - } - if (headers[column].equals(COUNTRY_CODE_HEADER)) { - countryCodeColumn = column; - } - if (languageCodeColumn != -1 && countryCodeColumn != -1) { - break; - } - } - - validateLanguageColumns(job, languageCodeColumn, countryCodeColumn); - return new CsvHeaderInfo(headers, languageCodeColumn, countryCodeColumn); + return findLanguageColumnsInHeaders(job, headers); } /** @@ -708,6 +576,85 @@ private void validateLanguageColumns(Job job, int languageCodeColumn, int countr } } + /** + * Retrieves the existing content type based on an id or variable. + * + * @param job The current import job. + * @return The existing content type if found, otherwise fails with an exception. + * @throws DotSecurityException If there are security restrictions preventing the evaluation. + */ + private ContentType findContentType(final Job job) + throws DotSecurityException { + + final var contentTypeIdOrVar = getContentType(job); + final User user; + + // Retrieving the user requesting the import + try { + user = getUser(job); + } catch (DotDataException e) { + final var errorMessage = "Error retrieving user."; + Logger.error(this.getClass(), errorMessage); + throw new JobProcessingException(job.id(), errorMessage, e); + } + + try { + return APILocator.getContentTypeAPI(user, true) + .find(contentTypeIdOrVar); + } catch (NotFoundInDbException e) { + final var errorMessage = String.format( + "Content Type [%s] not found.", contentTypeIdOrVar + ); + Logger.error(this.getClass(), errorMessage); + throw new JobValidationException(job.id(), errorMessage); + } catch (DotDataException e) { + final var errorMessage = String.format( + "Error finding Content Type [%s].", contentTypeIdOrVar + ); + Logger.error(this.getClass(), errorMessage); + throw new JobProcessingException(job.id(), errorMessage, e); + } + } + + /** + * Retrieves the existing language based on an id or ISO code. + * + * @param job The current import job. + * @return The existing language if found, otherwise fails with an exception. + */ + private Language findLanguage(final Job job) { + + // Read the language from the job parameters + final var languageIsoOrIdOptional = getLanguage(job); + if (languageIsoOrIdOptional.isEmpty()) { + return null; + } + + final var languageIsoOrId = languageIsoOrIdOptional.get(); + if (languageIsoOrId.equals("-1")) { + return null; + } + + // Retrieve the language based on the provided ISO code or ID + Language foundLanguage; + if (!languageIsoOrId.contains("-")) { + foundLanguage = APILocator.getLanguageAPI().getLanguage(languageIsoOrId); + } else { + final String[] codes = languageIsoOrId.split("[_|-]"); + foundLanguage = APILocator.getLanguageAPI().getLanguage(codes[0], codes[1]); + } + + if (foundLanguage != null && foundLanguage.getId() > 0) { + return foundLanguage; + } + + final var errorMessage = String.format( + "Language [%s] not found.", languageIsoOrId + ); + Logger.error(this.getClass(), errorMessage); + throw new JobValidationException(job.id(), errorMessage); + } + /** * Container class for CSV header information, particularly for handling language-related * columns in multilingual imports. diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/LargeFileReader.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/LargeFileReader.java index 36245a0c4ba6..1fc4b9708984 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/LargeFileReader.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/LargeFileReader.java @@ -45,6 +45,12 @@ public void process(Job job) { final DotTempFile dotTempFile = tempFile.get(); doReadLargeFile(dotTempFile, nLines, maxLines, job); + + if (!working) { + Logger.info(this.getClass(), "Job cancelled: " + job.id()); + // Adding some delay to simulate some cancellation processing, this demo is too fast + delay(3000); + } } /** @@ -77,7 +83,7 @@ private void doReadLargeFile(DotTempFile dotTempFile, int nLines, int maxLines , if (lineCount == nLines) { lineCount = 0; // Reset the counter Logger.debug(this.getClass(), line); - delay(); + delay(1000); } final float progressPercentage = ((float) readCount / totalCount); progressTracker.ifPresent(tracker -> tracker.updateProgress(progressPercentage)); @@ -112,9 +118,9 @@ private Long countLines(DotTempFile dotTempFile) { return totalCount; } - private void delay() { + private void delay(final long millis) { Try.of(()->{ - Thread.sleep(1000); + Thread.sleep(millis); return null; }).onFailure(e->Logger.error(this.getClass(), "Error during delay", e)); } diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java index af9d434bfcb7..0e792c1424fc 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java @@ -40,6 +40,20 @@ String createJob(String queueName, Map parameters) */ Job getJob(String jobId) throws JobNotFoundException, JobQueueDataException; + /** + * Retrieves the current state of a specific job. + *

+ * If only the status is required, this method has better performance than + * {@link #getJob(String)} as it uses a cache that is only cleared on status changes, whereas + * {@link #getJob(String)} uses a cache that is cleared on any job change. + * + * @param jobId The ID of the job whose state is being queried. + * @return The current state of the job as a JobState enum. + * @throws JobNotFoundException if the job with the given ID is not found. + * @throws JobQueueDataException if there's a data storage error while fetching the job state. + */ + JobState getJobState(final String jobId) throws JobNotFoundException, JobQueueDataException; + /** * Retrieves a list of active jobs for a specific queue. * @@ -176,10 +190,10 @@ List getUpdatedJobsSince(Set jobIds, LocalDateTime since) * Checks if a job has ever been in a specific state. * * @param jobId The ID of the job to check. - * @param state The state to check for. + * @param states The states to check for. * @return true if the job has been in the specified state, false otherwise. * @throws JobQueueDataException if there's an error accessing the job data. */ - boolean hasJobBeenInState(String jobId, JobState state) throws JobQueueDataException; + boolean hasJobBeenInState(String jobId, JobState... states) throws JobQueueDataException; } \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java index dfc179106563..10298c44e578 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java @@ -8,9 +8,11 @@ import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotcms.jobs.business.queue.error.JobQueueException; import com.dotmarketing.business.APILocator; +import com.dotmarketing.business.CacheLocator; import com.dotmarketing.common.db.DotConnect; import com.dotmarketing.exception.DotDataException; import com.dotmarketing.util.Logger; +import com.dotmarketing.util.UtilMethods; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; @@ -148,7 +150,7 @@ public class PostgresJobQueue implements JobQueue { + " WHERE id = ?"; private static final String HAS_JOB_BEEN_IN_STATE_QUERY = "SELECT " - + "EXISTS (SELECT 1 FROM job_history WHERE job_id = ? AND state = ?)"; + + "EXISTS (SELECT 1 FROM job_history WHERE job_id = ? AND state IN $??$)"; private static final String COLUMN_TOTAL_COUNT = "total_count"; @@ -224,6 +226,12 @@ public String createJob(final String queueName, final Map parame @Override public Job getJob(final String jobId) throws JobNotFoundException, JobQueueDataException { + // Check cache first + Job job = CacheLocator.getJobCache().get(jobId); + if (UtilMethods.isSet(job)) { + return job; + } + try { DotConnect dc = new DotConnect(); dc.setSQL(SELECT_JOB_BY_ID_QUERY); @@ -231,7 +239,13 @@ public Job getJob(final String jobId) throws JobNotFoundException, JobQueueDataE List> results = dc.loadObjectResults(); if (!results.isEmpty()) { - return DBJobTransformer.toJob(results.get(0)); + + job = DBJobTransformer.toJob(results.get(0)); + + // Cache the job + CacheLocator.getJobCache().put(job); + + return job; } Logger.warn(this, "Job with id: " + jobId + " not found"); @@ -242,6 +256,24 @@ public Job getJob(final String jobId) throws JobNotFoundException, JobQueueDataE } } + @Override + public JobState getJobState(final String jobId) + throws JobNotFoundException, JobQueueDataException { + + // Check cache first + JobState jobState = CacheLocator.getJobCache().getState(jobId); + if (UtilMethods.isSet(jobState)) { + return jobState; + } + + final var job = getJob(jobId); + + // Cache the job state + CacheLocator.getJobCache().putState(job.id(), job.state()); + + return job.state(); + } + @Override public JobPaginatedResult getActiveJobs(final String queueName, final int page, final int pageSize) throws JobQueueDataException { @@ -463,6 +495,11 @@ public void updateJobStatus(final Job job) throws JobQueueDataException { || job.state() == JobState.CANCELED) { removeJobFromQueue(job.id()); } + + // Cleanup cache + CacheLocator.getJobCache().remove(job); + CacheLocator.getJobCache().removeState(job.id()); + } catch (DotDataException e) { Logger.error(this, "Database error while updating job status", e); throw new JobQueueDataException("Database error while updating job status", e); @@ -554,6 +591,10 @@ public void updateJobProgress(final String jobId, final float progress) dc.addParam(Timestamp.valueOf(LocalDateTime.now())); dc.addParam(jobId); dc.loadResult(); + + // Cleanup cache + CacheLocator.getJobCache().remove(jobId); + } catch (DotDataException e) { Logger.error(this, "Database error while updating job progress", e); throw new JobQueueDataException("Database error while updating job progress", e); @@ -575,13 +616,25 @@ public void removeJobFromQueue(final String jobId) throws JobQueueDataException } @Override - public boolean hasJobBeenInState(String jobId, JobState state) throws JobQueueDataException { + public boolean hasJobBeenInState(final String jobId, final JobState... states) + throws JobQueueDataException { + + if (states.length == 0) { + return false; + } + + String parameters = String.join(", ", Collections.nCopies(states.length, "?")); + + var query = HAS_JOB_BEEN_IN_STATE_QUERY + .replace(REPLACE_TOKEN_PARAMETERS, "(" + parameters + ")"); try { DotConnect dc = new DotConnect(); - dc.setSQL(HAS_JOB_BEEN_IN_STATE_QUERY); + dc.setSQL(query); dc.addParam(jobId); - dc.addParam(state.name()); + for (JobState state : states) { + dc.addParam(state.name()); + } List> results = dc.loadObjectResults(); if (!results.isEmpty()) { @@ -604,7 +657,7 @@ public boolean hasJobBeenInState(String jobId, JobState state) throws JobQueueDa * @return A JobPaginatedResult instance * @throws DotDataException If there is an error loading the query results */ - private static JobPaginatedResult jobPaginatedResult( + private JobPaginatedResult jobPaginatedResult( int page, int pageSize, DotConnect dc) throws DotDataException { final var results = dc.loadObjectResults(); diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java b/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java index 6a6ec3650e4e..c19e19056153 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java @@ -12,6 +12,8 @@ import com.dotmarketing.util.UtilMethods; import com.dotmarketing.util.WebKeys; import com.liferay.portal.model.User; +import java.math.BigDecimal; +import java.math.RoundingMode; import java.util.List; import java.util.Map; import java.util.Optional; @@ -102,4 +104,20 @@ public static HttpServletRequest generateMockRequest(final User user, final Stri return requestProxy; } + /** + * Helper method to round the progress to 3 decimal places. + * + * @param progress The progress value to round + * @return The rounded progress value + */ + public static float roundedProgress(final float progress) { + + // Round the progress to 3 decimal places + final var roundedProgress = BigDecimal.valueOf(progress) + .setScale(3, RoundingMode.HALF_UP) + .floatValue(); + + return Math.round(roundedProgress * 1000f) / 1000f; + } + } diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java index 1902207ce186..534986fb7b37 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java @@ -1,5 +1,7 @@ package com.dotcms.rest.api.v1.job; +import static com.dotcms.jobs.business.util.JobUtil.roundedProgress; + import com.dotcms.jobs.business.api.JobProcessorScanner; import com.dotcms.jobs.business.api.JobQueueManagerAPI; import com.dotcms.jobs.business.error.JobProcessorNotFoundException; @@ -8,7 +10,6 @@ import com.dotcms.jobs.business.job.JobState; import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.Queue; -import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotcms.rest.api.v1.temp.DotTempFile; import com.dotcms.rest.api.v1.temp.TempFileAPI; import com.dotmarketing.business.APILocator; @@ -19,6 +20,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; import com.liferay.portal.model.User; +import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Constructor; import java.time.format.DateTimeFormatter; @@ -32,7 +34,10 @@ import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.MediaType; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; +import org.glassfish.jersey.media.sse.EventOutput; +import org.glassfish.jersey.media.sse.OutboundEvent; /** * Helper class for interacting with the job queue system. This class provides methods for creating, cancelling, and listing jobs. @@ -241,7 +246,7 @@ void watchJob(String jobId, Consumer watcher) { JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) { try { return jobQueueManagerAPI.getActiveJobs(queueName, page, pageSize); - } catch (JobQueueDataException e) { + } catch (DotDataException e) { Logger.error(this.getClass(), "Error fetching active jobs", e); } return JobPaginatedResult.builder().build(); @@ -273,7 +278,7 @@ JobPaginatedResult getJobs(int page, int pageSize) { JobPaginatedResult getActiveJobs(int page, int pageSize) { try { return jobQueueManagerAPI.getActiveJobs(page, pageSize); - } catch (JobQueueDataException e) { + } catch (DotDataException e) { Logger.error(this.getClass(), "Error fetching active jobs", e); } return JobPaginatedResult.builder().build(); @@ -289,7 +294,7 @@ JobPaginatedResult getActiveJobs(int page, int pageSize) { JobPaginatedResult getCompletedJobs(int page, int pageSize) { try { return jobQueueManagerAPI.getCompletedJobs(page, pageSize); - } catch (JobQueueDataException e) { + } catch (DotDataException e) { Logger.error(this.getClass(), "Error fetching completed jobs", e); } return JobPaginatedResult.builder().build(); @@ -305,7 +310,7 @@ JobPaginatedResult getCompletedJobs(int page, int pageSize) { JobPaginatedResult getCanceledJobs(int page, int pageSize) { try { return jobQueueManagerAPI.getCanceledJobs(page, pageSize); - } catch (JobQueueDataException e) { + } catch (DotDataException e) { Logger.error(this.getClass(), "Error fetching canceled jobs", e); } return JobPaginatedResult.builder().build(); @@ -321,7 +326,7 @@ JobPaginatedResult getCanceledJobs(int page, int pageSize) { JobPaginatedResult getFailedJobs(int page, int pageSize) { try { return jobQueueManagerAPI.getFailedJobs(page, pageSize); - } catch (JobQueueDataException e) { + } catch (DotDataException e) { Logger.error(this.getClass(), "Error fetching failed jobs", e); } return JobPaginatedResult.builder().build(); @@ -363,9 +368,9 @@ void handleUploadIfPresent(final JobParams form, Map params, Htt * @param job The job * @return true if the job is watchable, false otherwise */ - public boolean isNotWatchable(Job job){ + boolean isNotWatchable(Job job) { return JobState.PENDING != job.state() && JobState.RUNNING != job.state() - && JobState.CANCELLING != job.state(); + && JobState.CANCEL_REQUESTED != job.state() && JobState.CANCELLING != job.state(); } /** @@ -373,14 +378,82 @@ public boolean isNotWatchable(Job job){ * @param job The job * @return The status info */ - public Map getJobStatusInfo(Job job) { + Map getJobStatusInfo(Job job) { final DateTimeFormatter isoFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME; return Map.of( - "startedAt", job.startedAt().map(isoFormatter::format).orElse("N/A"), - "finishedAt", job.completedAt().map(isoFormatter::format).orElse("N/A"), "state", job.state(), - "progress", job.progress() + "progress", roundedProgress(job.progress()), + "startedAt", job.startedAt().map(isoFormatter::format).orElse("N/A"), + "finishedAt", job.completedAt().map(isoFormatter::format).orElse("N/A") ); } + /** + * Get the job for the given ID + * + * @param jobId The ID of the job + * @return The job or null if it doesn't exist + */ + Job getJobForSSE(final String jobId) throws DotDataException { + + Job job = null; + + try { + job = getJob(jobId); + } catch (DoesNotExistException e) { + // ignore + } + + return job; + } + + /** + * Send an error event and close the connection + * + * @param errorName The name of the error event + * @param errorCode The error code + * @param eventOutput The event output + */ + void sendErrorAndClose(final String errorName, final String errorCode, + final EventOutput eventOutput) { + + try { + OutboundEvent event = new OutboundEvent.Builder() + .mediaType(MediaType.TEXT_HTML_TYPE) + .name(errorName) + .data(String.class, errorCode) + .build(); + eventOutput.write(event); + closeSSEConnection(eventOutput); + } catch (IOException e) { + Logger.error(this, "Error sending error event", e); + closeSSEConnection(eventOutput); + } + } + + /** + * Close the SSE connection + * + * @param eventOutput The event output + */ + void closeSSEConnection(final EventOutput eventOutput) { + try { + eventOutput.close(); + } catch (IOException e) { + Logger.error(this, "Error closing SSE connection", e); + } + } + + /** + * Check if a job is in a terminal state + * + * @param state The state of the job + * @return true if the job is in a terminal state, false otherwise + */ + boolean isTerminalState(final JobState state) { + return state == JobState.COMPLETED || + state == JobState.FAILED || + state == JobState.CANCELED; + } + } diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java index c298d24d50a6..d2b537e94f03 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java @@ -4,15 +4,14 @@ import com.dotcms.jobs.business.job.JobPaginatedResult; import com.dotcms.rest.ResponseEntityView; import com.dotcms.rest.WebResource; -import com.dotmarketing.exception.DoesNotExistException; import com.dotmarketing.exception.DotDataException; -import com.dotmarketing.exception.DotRuntimeException; import com.dotmarketing.util.Logger; import com.fasterxml.jackson.core.JsonProcessingException; import graphql.VisibleForTesting; import java.io.IOException; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; import javax.inject.Inject; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.BeanParam; @@ -34,18 +33,21 @@ public class JobQueueResource { private final WebResource webResource; - private final JobQueueHelper helper; + private final SSEConnectionManager sseConnectionManager; @Inject - public JobQueueResource(final JobQueueHelper helper) { - this(new WebResource(), helper); + public JobQueueResource(final JobQueueHelper helper, + final SSEConnectionManager sseConnectionManager) { + this(new WebResource(), helper, sseConnectionManager); } @VisibleForTesting - public JobQueueResource(WebResource webResource, JobQueueHelper helper) { + public JobQueueResource(WebResource webResource, JobQueueHelper helper, + SSEConnectionManager sseConnectionManager) { this.webResource = webResource; this.helper = helper; + this.sseConnectionManager = sseConnectionManager; } @POST @@ -162,30 +164,29 @@ public EventOutput monitorJob(@Context HttpServletRequest request, .rejectWhenNoUser(true) .init(); - Job job = null; + final EventOutput eventOutput = new EventOutput(); + try { - job = helper.getJob(jobId); - } catch (DotDataException | DoesNotExistException e) { - // ignore - } + Job job = helper.getJobForSSE(jobId); - final EventOutput eventOutput = new EventOutput(); + if (job == null) { + helper.sendErrorAndClose("job-not-found", "404", eventOutput); + return eventOutput; + } + + if (helper.isNotWatchable(job)) { + helper.sendErrorAndClose(String.format("job-not-watchable [%s]", + job.state()), "400", eventOutput); + return eventOutput; + } - if (job == null || helper.isNotWatchable(job)) { - try { - OutboundEvent event = new OutboundEvent.Builder() - .mediaType(MediaType.TEXT_HTML_TYPE) - .name("job-not-found") - .data(String.class, "404") - .build(); - eventOutput.write(event); - eventOutput.close(); - } catch (IOException e) { - Logger.error(this, "Error closing SSE connection", e); + if (!sseConnectionManager.canAcceptNewConnection(jobId)) { + helper.sendErrorAndClose("too-many-connections", "429", eventOutput); + return eventOutput; } - } else { + // Callback for watching job updates and sending them to the client - helper.watchJob(job.id(), watched -> { + Consumer jobWatcher = watched -> { if (!eventOutput.isClosed()) { try { OutboundEvent event = new OutboundEvent.Builder() @@ -194,13 +195,30 @@ public EventOutput monitorJob(@Context HttpServletRequest request, .data(Map.class, helper.getJobStatusInfo(watched)) .build(); eventOutput.write(event); + + // If job is complete/failed/cancelled, close the connection + if (helper.isTerminalState(watched.state())) { + sseConnectionManager.closeJobConnections(jobId); + } + } catch (IOException e) { Logger.error(this, "Error writing SSE event", e); - throw new DotRuntimeException(e); + sseConnectionManager.closeJobConnections(jobId); } } - }); + }; + + // Register the connection and watcher + sseConnectionManager.addConnection(jobId, eventOutput); + + // Start watching the job + helper.watchJob(job.id(), jobWatcher); + + } catch (DotDataException e) { + Logger.error(this, "Error setting up job monitor", e); + helper.closeSSEConnection(eventOutput); } + return eventOutput; } diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java new file mode 100644 index 000000000000..104b824f6429 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java @@ -0,0 +1,270 @@ +package com.dotcms.rest.api.v1.job; + +import com.dotmarketing.util.Config; +import com.dotmarketing.util.Logger; +import io.vavr.Lazy; +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import javax.enterprise.context.ApplicationScoped; +import org.glassfish.jersey.media.sse.EventOutput; + +/** + * Manages Server-Sent Events (SSE) connections for job monitoring. This class provides + * functionality for tracking, limiting, and cleaning up SSE connections across multiple jobs. + * + *

Key features include: + *

    + *
  • Connection limits per job and system-wide
  • + *
  • Automatic connection timeout and cleanup
  • + *
  • Thread-safe connection management
  • + *
  • Proper resource cleanup on shutdown
  • + *
+ * + *

Configuration properties: + *

    + *
  • {@code MAX_SSE_CONNECTIONS_PER_JOB} - Maximum number of concurrent connections per job (default: 5)
  • + *
  • {@code MAX_SSE_TOTAL_CONNECTIONS} - Maximum total concurrent connections across all jobs (default: 50)
  • + *
  • {@code SSE_CONNECTION_TIMEOUT_MINUTES} - Connection timeout in minutes (default: 30)
  • + *
+ * + *

Usage example: + *

{@code
+ * SSEConnectionManager manager = new SSEConnectionManager();
+ *
+ * // Check if new connection can be accepted
+ * if (manager.canAcceptNewConnection(jobId)) {
+ *     // Add new connection
+ *     manager.addConnection(jobId, eventOutput);
+ * }
+ *
+ * // Close connections when job completes
+ * manager.closeJobConnections(jobId);
+ * }
+ */ +@ApplicationScoped +public class SSEConnectionManager { + + // Add status tracking + private volatile boolean isShutdown = false; + + private static final Lazy MAX_SSE_CONNECTIONS_PER_JOB = + Lazy.of(() -> Config.getIntProperty("MAX_SSE_CONNECTIONS_PER_JOB", 5)); + + private static final Lazy MAX_SSE_TOTAL_CONNECTIONS = + Lazy.of(() -> Config.getIntProperty("MAX_SSE_TOTAL_CONNECTIONS", 50)); + + private static final Lazy SSE_CONNECTION_TIMEOUT_MINUTES = + Lazy.of(() -> Config.getIntProperty("SSE_CONNECTION_TIMEOUT_MINUTES", 30)); + + private final ConcurrentMap> jobConnections = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService timeoutExecutor = + Executors.newSingleThreadScheduledExecutor(); + + /** + * Shuts down the SSE connection manager and cleans up all resources. This method closes all + * active connections and shuts down the timeout executor. After shutdown, no new connections + * can be added. + */ + @PreDestroy + public void shutdown() { + + isShutdown = true; + + try { + closeAllConnections(); + } finally { + timeoutExecutor.shutdown(); + try { + if (!timeoutExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + timeoutExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + timeoutExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + + /** + * Checks if a new SSE connection can be accepted for the given job. This method verifies both + * per-job and system-wide connection limits. + * + * @param jobId The ID of the job for which to check connection availability + * @return true if a new connection can be accepted, false otherwise + */ + public boolean canAcceptNewConnection(String jobId) { + if (getTotalConnections() >= MAX_SSE_TOTAL_CONNECTIONS.get()) { + return false; + } + + Set connections = jobConnections.get(jobId); + return connections == null || connections.size() < MAX_SSE_CONNECTIONS_PER_JOB.get(); + } + + /** + * Adds a new SSE connection for a job. The connection will be automatically closed after the + * configured timeout period. + * + * @param jobId The ID of the job to monitor + * @param eventOutput The EventOutput instance representing the SSE connection + * @throws IllegalStateException if the manager is shut down + */ + public void addConnection(String jobId, EventOutput eventOutput) { + + if (isShutdown) { + throw new IllegalStateException("SSEConnectionManager is shut down"); + } + + SSEConnection connection = new SSEConnection(jobId, eventOutput); + jobConnections.computeIfAbsent(jobId, k -> ConcurrentHashMap.newKeySet()).add(connection); + + // Schedule connection timeout + timeoutExecutor.schedule(() -> { + try { + removeConnection(jobId, connection); + } catch (Exception e) { + Logger.error(this, "Error removing expired connection", e); + } + }, SSE_CONNECTION_TIMEOUT_MINUTES.get(), TimeUnit.MINUTES); + } + + /** + * Removes a specific SSE connection for a job. If this was the last connection for the job, the + * job entry is removed from tracking. + * + * @param jobId The ID of the job + * @param connection The connection to remove + */ + public void removeConnection(String jobId, SSEConnection connection) { + Set connections = jobConnections.get(jobId); + if (connections != null) { + connections.remove(connection); + connection.close(); + + if (connections.isEmpty()) { + jobConnections.remove(jobId); + } + } + } + + /** + * Gets the total number of active SSE connections across all jobs. + * + * @return The total number of active connections + */ + private int getTotalConnections() { + return jobConnections.values().stream() + .mapToInt(Set::size) + .sum(); + } + + /** + * Closes all active SSE connections and clears connection tracking. + */ + private void closeAllConnections() { + jobConnections.values().forEach(connections -> + connections.forEach(SSEConnection::close) + ); + jobConnections.clear(); + } + + /** + * Closes all SSE connections for a specific job. + * + * @param jobId The ID of the job whose connections should be closed + */ + public void closeJobConnections(String jobId) { + Set connections = jobConnections.remove(jobId); + if (connections != null) { + connections.forEach(SSEConnection::close); + } + } + + /** + * Gets the number of active connections for a specific job. + * + * @param jobId The ID of the job + * @return The number of active connections for the job + */ + public int getConnectionCount(String jobId) { + Set connections = jobConnections.get(jobId); + return connections != null ? connections.size() : 0; + } + + /** + * Gets information about the current state of SSE connections. + * + * @return A map containing connection statistics: + * - totalConnections: Total number of active connections + * - activeJobs: Number of jobs with active connections + */ + public Map getConnectionInfo() { + return Map.of( + "totalConnections", getTotalConnections(), + "activeJobs", jobConnections.size() + ); + } + + /** + * Represents a single SSE connection for a job. Each connection tracks its creation time and + * handles its own cleanup. + */ + public static class SSEConnection { + + private final String jobId; + private final EventOutput eventOutput; + private final LocalDateTime createdAt; + + /** + * Creates a new SSE connection. + * + * @param jobId The ID of the job this connection is monitoring + * @param eventOutput The EventOutput instance representing the SSE connection + */ + public SSEConnection(String jobId, EventOutput eventOutput) { + this.jobId = jobId; + this.eventOutput = eventOutput; + this.createdAt = LocalDateTime.now(); + } + + /** + * Closes this SSE connection. + */ + public void close() { + try { + eventOutput.close(); + } catch (IOException e) { + Logger.error(SSEConnection.class, "Error closing SSE connection", e); + } + } + + /** + * Checks if this connection has exceeded its timeout period. + * + * @return true if the connection has expired, false otherwise + */ + public boolean isExpired() { + return LocalDateTime.now().isAfter( + createdAt.plusMinutes(SSE_CONNECTION_TIMEOUT_MINUTES.get())); + } + + /** + * Gets the ID of the job this connection is monitoring. + * + * @return The job ID + */ + public String getJobId() { + return jobId; + } + } + +} diff --git a/dotCMS/src/main/java/com/dotmarketing/business/CacheLocator.java b/dotCMS/src/main/java/com/dotmarketing/business/CacheLocator.java index 81bd05fe5b06..d99cbf8f3ed5 100644 --- a/dotCMS/src/main/java/com/dotmarketing/business/CacheLocator.java +++ b/dotCMS/src/main/java/com/dotmarketing/business/CacheLocator.java @@ -15,6 +15,8 @@ import com.dotcms.experiments.business.ExperimentsCacheImpl; import com.dotcms.graphql.GraphQLCache; import com.dotcms.graphql.business.GraphQLSchemaCache; +import com.dotcms.jobs.business.job.JobCache; +import com.dotcms.jobs.business.job.JobCacheImpl; import com.dotcms.notifications.business.NewNotificationCache; import com.dotcms.notifications.business.NewNotificationCacheImpl; import com.dotcms.publisher.assets.business.PushedAssetsCache; @@ -364,6 +366,13 @@ public static ExperimentsCache getExperimentsCache() { return (ExperimentsCache) getInstance(CacheIndex.EXPERIMENTS_CACHE); } + /** + * This will get you an instance of the {@link JobCache} singleton cache. + */ + public static JobCache getJobCache() { + return (JobCache) getInstance(CacheIndex.JOB_CACHE); + } + /** * The legacy cache administrator will invalidate cache entries within a cluster * on a put where the non legacy one will not. @@ -476,8 +485,8 @@ enum CacheIndex VariantCache("VariantCache"), EXPERIMENTS_CACHE("ExperimentsCache"), CHAINABLE_404_STORAGE_CACHE("Chainable404StorageCache"), - - Javascript("Javascript"); + Javascript("Javascript"), + JOB_CACHE("JobCache"); Cachable create() { switch(this) { @@ -533,7 +542,7 @@ Cachable create() { case EXPERIMENTS_CACHE: return new ExperimentsCacheImpl(); case CHAINABLE_404_STORAGE_CACHE: return new Chainable404StorageCache(); case Javascript: return new JsCache(); - + case JOB_CACHE: return new JobCacheImpl(); } throw new AssertionError("Unknown Cache index: " + this); } diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java index 210028bdee69..97c3387fde78 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java @@ -28,7 +28,6 @@ import javax.inject.Inject; import org.awaitility.Awaitility; import org.jboss.weld.junit5.EnableWeld; -import org.jboss.weld.junit5.WeldJunit5Extension; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -38,7 +37,6 @@ import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; import org.junit.jupiter.api.TestMethodOrder; -import org.junit.jupiter.api.extension.ExtendWith; /** * Integration tests for the JobQueueManagerAPI. @@ -50,6 +48,8 @@ @TestInstance(Lifecycle.PER_CLASS) public class JobQueueManagerAPIIntegrationTest extends com.dotcms.Junit5WeldBaseTest { + private static int attempts = 0; + @Inject JobQueueManagerAPI jobQueueManagerAPI; @@ -85,6 +85,9 @@ void reset() { if(null != jobQueueManagerAPI) { jobQueueManagerAPI.getCircuitBreaker().reset(); } + + // Reset retry attempts + attempts = 0; } /** @@ -185,8 +188,6 @@ void test_JobRetry() throws Exception { }); } - - /** * Method to test: Job failure handling in JobQueueManagerAPI * Given Scenario: A job is created that is designed to fail @@ -473,7 +474,6 @@ public Map getResultMetadata(Job job) { static class RetryingJobProcessor implements JobProcessor { public static final int MAX_RETRIES = 3; - private int attempts = 0; public RetryingJobProcessor() { // needed for instantiation purposes diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java index 8525440886d3..ccfe76f90c60 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java @@ -13,9 +13,11 @@ import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; @@ -23,6 +25,7 @@ import static org.mockito.Mockito.when; import com.dotcms.jobs.business.api.events.EventProducer; +import com.dotcms.jobs.business.api.events.JobCancelRequestEvent; import com.dotcms.jobs.business.api.events.RealTimeJobMonitor; import com.dotcms.jobs.business.error.CircuitBreaker; import com.dotcms.jobs.business.error.ErrorDetail; @@ -43,6 +46,8 @@ import com.dotcms.jobs.business.queue.error.JobNotFoundException; import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotcms.jobs.business.queue.error.JobQueueException; +import com.dotcms.system.event.local.business.LocalSystemEventsAPI; +import com.dotmarketing.business.APILocator; import com.dotmarketing.exception.DotDataException; import java.time.LocalDateTime; import java.util.ArrayList; @@ -65,6 +70,7 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; +import org.mockito.MockedStatic; public class JobQueueManagerAPITest { @@ -1182,116 +1188,170 @@ public void test_CircuitBreaker_Reset() throws Exception { public void test_simple_cancelJob2() throws DotDataException, JobQueueException, JobCancellationException { - // Set up the job queue manager to return our mock cancellable processor - final String testQueue = "CancellableTestQueue"; + try (MockedStatic apiLocator = mockStatic(APILocator.class)) { - // Create a mock cancellable processor - final String jobIdIn = "job2"; - when(mockJobQueue.createJob(anyString(), anyMap())).thenReturn(jobIdIn); - // Create a mock job - Job mockJob = mock(Job.class); - when(mockJobQueue.getJob(jobIdIn)).thenReturn(mockJob); - when(mockJob.queueName()).thenReturn(testQueue); - when(mockJob.id()).thenReturn(jobIdIn); - when(mockJob.withState(any())).thenReturn(mockJob); + // Set up the job queue manager to return our mock cancellable processor + final String testQueue = "CancellableTestQueue"; + final String jobIdIn = "job2"; - jobQueueManagerAPI.registerProcessor(testQueue, SimpleCancellableJobProcessor.class); - // Create a job so we can cancel it - final String jobIdOut = jobQueueManagerAPI.createJob(testQueue, Map.of()); - assertEquals(jobIdIn, jobIdOut); - // Perform the cancellation - jobQueueManagerAPI.cancelJob(jobIdOut); - // Verify that the cancel method was called on our mock processor - verify(mockCancellableProcessor).cancel(mockJob); - } + // Create a mock job + Job mockJob = mock(Job.class); + when(mockJob.queueName()).thenReturn(testQueue); + when(mockJob.id()).thenReturn(jobIdIn); + when(mockJob.state()).thenReturn(JobState.RUNNING); + when(mockJob.withState(any(JobState.class))).thenReturn(mockJob); - /** - * Method to test: Job cancellation in JobQueueManagerAPI - * Given Scenario: Running job is canceled - * ExpectedResult: Job is successfully canceled and its state transitions are correct - */ - @Test - public void test_complex_cancelJob() throws Exception { + // Mock job queue operations + when(mockJobQueue.createJob(anyString(), anyMap())).thenReturn(jobIdIn); + when(mockJobQueue.getJob(anyString())).thenReturn(mockJob); + doNothing().when(mockJobQueue).updateJobStatus(any(Job.class)); - // Create a mock job - Job mockJob = mock(Job.class); - final String jobId = "job5644"; - when(mockJob.id()).thenReturn(jobId); - final String testQueue = "myTestQueue"; - when(mockJob.queueName()).thenReturn(testQueue); + // Create Mock Job Event + JobCancelRequestEvent mockEvent = mock(JobCancelRequestEvent.class); + when(mockEvent.getJob()).thenReturn(mockJob); - // Configure JobQueue - when(mockJobQueue.getJob(jobId)).thenReturn(mockJob); - when(mockJobQueue.nextJob()).thenReturn(mockJob).thenReturn(null); - when(mockJobQueue.hasJobBeenInState(any(), eq(JobState.CANCELLING))).thenReturn(true); - when(mockJobQueue.createJob(anyString(), anyMap())).thenReturn(jobId); + // Mock system events + LocalSystemEventsAPI localSystemEventsAPI = mock(LocalSystemEventsAPI.class); + apiLocator.when(APILocator::getLocalSystemEventsAPI).thenReturn(localSystemEventsAPI); - // List to capture job state updates - List stateUpdates = new CopyOnWriteArrayList<>(); - - when(mockJob.withState(any())).thenAnswer(inv -> { - stateUpdates.add(inv.getArgument(0)); - return mockJob; - }); - when(mockJob.markAsRunning()).thenAnswer(inv -> { - stateUpdates.add(JobState.RUNNING); - return mockJob; - }); - when(mockJob.markAsCanceled(any())).thenAnswer(inv -> { - stateUpdates.add(JobState.CANCELED); - return mockJob; - }); - when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { - stateUpdates.add(JobState.COMPLETED); - return mockJob; - }); - when(mockJob.markAsFailed(any())).thenAnswer(inv -> { - stateUpdates.add(JobState.FAILED); - return mockJob; - }); - when(mockJob.progress()).thenReturn(0f); - when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); - when(mockJob.withProgressTracker(any(DefaultProgressTracker.class))).thenReturn(mockJob); + // Handle the event notification + doAnswer(invocation -> { + JobCancelRequestEvent event = invocation.getArgument(0); + ((JobQueueManagerAPIImpl) jobQueueManagerAPI).onCancelRequestJob(event); + return null; + }).when(localSystemEventsAPI).notify(any(JobCancelRequestEvent.class)); - when(mockJobQueue.getUpdatedJobsSince(anySet(), any(LocalDateTime.class))) - .thenAnswer(invocation -> Collections.singletonList(mockJob)); + jobQueueManagerAPI.registerProcessor(testQueue, SimpleCancellableJobProcessor.class); - // Register the test processor - jobQueueManagerAPI.registerProcessor(testQueue, ComplexCancellableJobProcessor.class); + // Create a job so we can cancel it + final String jobIdOut = jobQueueManagerAPI.createJob(testQueue, Map.of()); + assertEquals(jobIdIn, jobIdOut); - // Configure circuit breaker - when(mockCircuitBreaker.allowRequest()).thenReturn(true); + // Perform the cancellation + jobQueueManagerAPI.cancelJob(jobIdOut); - // Start the job queue manager - jobQueueManagerAPI.start(); + // Verify that the cancel method was called on our mock processor + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + verify(mockCancellableProcessor).cancel(mockJob); + }); - final String jobIdOut = jobQueueManagerAPI.createJob(testQueue, Map.of()); + } + } - final Optional instance = jobQueueManagerAPI.getInstance(jobIdOut); - assertTrue(instance.isPresent()); - // Use our TestJobProcessor - final ComplexCancellableJobProcessor testJobProcessor = (ComplexCancellableJobProcessor) instance.get(); + /** + * Method to test: Job cancellation in JobQueueManagerAPI + * Given Scenario: Running job is canceled + * ExpectedResult: Job is successfully canceled and its state transitions are correct + */ + @Test + public void test_complex_cancelJob() throws Exception { - // Wait for the job to start processing - Awaitility.await() - .atMost(5, TimeUnit.SECONDS) - .until(() -> testJobProcessor.awaitProcessingStart(100, TimeUnit.MILLISECONDS)); + try (MockedStatic apiLocator = mockStatic(APILocator.class)) { + + final String testQueue = "myTestQueue"; + final String jobId = "job5644"; + + // Create a mock job + Job mockJob = mock(Job.class); + when(mockJob.queueName()).thenReturn(testQueue); + when(mockJob.id()).thenReturn(jobId); + when(mockJob.state()).thenReturn(JobState.RUNNING); + when(mockJob.withState(any(JobState.class))).thenReturn(mockJob); + + // Configure JobQueue + when(mockJobQueue.getJob(jobId)).thenReturn(mockJob); + when(mockJobQueue.nextJob()).thenReturn(mockJob).thenReturn(null); + when(mockJobQueue.hasJobBeenInState(any(), eq(JobState.CANCEL_REQUESTED), + eq(JobState.CANCELLING))).thenReturn(true); + when(mockJobQueue.createJob(anyString(), anyMap())).thenReturn(jobId); + + // List to capture job state updates + List stateUpdates = new CopyOnWriteArrayList<>(); + + when(mockJob.withState(any())).thenAnswer(inv -> { + stateUpdates.add(inv.getArgument(0)); + return mockJob; + }); + when(mockJob.markAsRunning()).thenAnswer(inv -> { + stateUpdates.add(JobState.RUNNING); + return mockJob; + }); + when(mockJob.markAsCanceled(any())).thenAnswer(inv -> { + stateUpdates.add(JobState.CANCELED); + return mockJob; + }); + when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { + stateUpdates.add(JobState.COMPLETED); + return mockJob; + }); + when(mockJob.markAsFailed(any())).thenAnswer(inv -> { + stateUpdates.add(JobState.FAILED); + return mockJob; + }); + when(mockJob.progress()).thenReturn(0f); + when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); + when(mockJob.withProgressTracker(any(DefaultProgressTracker.class))).thenReturn( + mockJob); + + when(mockJobQueue.getUpdatedJobsSince(anySet(), any(LocalDateTime.class))) + .thenAnswer(invocation -> Collections.singletonList(mockJob)); + + // Create Mock Job Event + JobCancelRequestEvent mockEvent = mock(JobCancelRequestEvent.class); + when(mockEvent.getJob()).thenReturn(mockJob); + + // Mock system events + LocalSystemEventsAPI localSystemEventsAPI = mock(LocalSystemEventsAPI.class); + apiLocator.when(APILocator::getLocalSystemEventsAPI).thenReturn(localSystemEventsAPI); + + // Handle the event notification + doAnswer(invocation -> { + JobCancelRequestEvent event = invocation.getArgument(0); + ((JobQueueManagerAPIImpl) jobQueueManagerAPI).onCancelRequestJob(event); + return null; + }).when(localSystemEventsAPI).notify(any(JobCancelRequestEvent.class)); + + // Register the test processor + jobQueueManagerAPI.registerProcessor(testQueue, ComplexCancellableJobProcessor.class); + + // Configure circuit breaker + when(mockCircuitBreaker.allowRequest()).thenReturn(true); + + // Start the job queue manager + jobQueueManagerAPI.start(); + + final String jobIdOut = jobQueueManagerAPI.createJob(testQueue, Map.of()); + + final Optional instance = jobQueueManagerAPI.getInstance(jobIdOut); + assertTrue(instance.isPresent()); + // Use our TestJobProcessor + final ComplexCancellableJobProcessor testJobProcessor = (ComplexCancellableJobProcessor) instance.get(); + + // Wait for the job to start processing + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .until(() -> testJobProcessor.awaitProcessingStart(100, TimeUnit.MILLISECONDS)); - // Cancel the job - jobQueueManagerAPI.cancelJob(jobId); + // Cancel the job + jobQueueManagerAPI.cancelJob(jobId); - // Wait for the job to complete (which should be due to cancellation) - Awaitility.await() - .atMost(10, TimeUnit.SECONDS) - .until(() -> testJobProcessor.awaitProcessingCompleted(100, TimeUnit.MILLISECONDS)); + // Wait for the job to complete (which should be due to cancellation) + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .until(() -> testJobProcessor.awaitProcessingCompleted(100, + TimeUnit.MILLISECONDS)); - Awaitility.await() - .atMost(10, TimeUnit.SECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> stateUpdates.contains(JobState.CANCELED)); + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> stateUpdates.contains(JobState.CANCELED)); - // Clean up - jobQueueManagerAPI.close(); + // Clean up + jobQueueManagerAPI.close(); + } } /** diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java index e2889dc18f46..e25ee222c34d 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java @@ -5,6 +5,7 @@ import com.dotcms.contenttype.model.type.ContentType; import com.dotcms.datagen.TestDataUtils; +import com.dotcms.jobs.business.error.JobValidationException; import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobState; import com.dotcms.jobs.business.processor.DefaultProgressTracker; @@ -27,6 +28,7 @@ import java.util.Map; import javax.servlet.http.HttpServletRequest; import org.jboss.weld.junit5.EnableWeld; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -73,6 +75,188 @@ static void setUp() throws Exception { request = JobUtil.generateMockRequest(systemUser, defaultSite.getHostname()); } + /** + * Tests the preview mode of the content import process. This test: + *
    + *
  • Creates a test content type
  • + *
  • Generates a test CSV file with sample content
  • + *
  • Processes the import in preview mode
  • + *
  • Verifies the preview results and metadata
  • + *
  • Verifies there is no content creation in the database
  • + *
+ * + *

The test ensures that preview mode properly validates the content + * without actually creating it in the system using the content type variable instead of the ID. + * + * @throws Exception if there's an error during the test execution + */ + @Test + void test_process_preview_using_content_type_variable() throws Exception { + + ContentType testContentType = null; + + try { + // Initialize processor + final var processor = new ImportContentletsProcessor(); + + // Create test content type + testContentType = createTestContentType(); + + // Create test CSV file + File csvFile = createTestCsvFile(); + + // Create test job + final var testJob = createTestJob( + csvFile, "preview", "1", testContentType.variable(), + "b9d89c80-3d88-4311-8365-187323c96436" + ); + + // Process the job in preview mode + processor.process(testJob); + + // Verify preview results + Map metadata = processor.getResultMetadata(testJob); + assertNotNull(metadata, "Preview metadata should not be null"); + assertNotNull(metadata.get("errors"), "Preview metadata errors should not be null"); + assertNotNull(metadata.get("results"), "Preview metadata results should not be null"); + assertEquals(0, ((ArrayList) metadata.get("errors")).size(), + "Preview metadata errors should be empty"); + + // Verify no content was created + final var importedContent = findImportedContent(testContentType.id()); + assertNotNull(importedContent, "Imported content should not be null"); + assertEquals(0, importedContent.size(), "Imported content should have no items"); + + } finally { + if (testContentType != null) { + // Clean up test content type + APILocator.getContentTypeAPI(systemUser).delete(testContentType); + } + } + } + + /** + * Scenario: Test the preview mode of the content import process with an invalid content type. + *

+ * Expected: A JobValidationException should be thrown. + * + * @throws Exception if there's an error during the test execution + */ + @Test + void test_process_preview_invalid_content_type_variable() throws Exception { + + // Initialize processor + final var processor = new ImportContentletsProcessor(); + + // Create test CSV file + File csvFile = createTestCsvFile(); + + // Create test job + final var testJob = createTestJob( + csvFile, "preview", "1", "doesNotExist", + "b9d89c80-3d88-4311-8365-187323c96436" + ); + + try { + // Process the job in preview mode + processor.process(testJob); + Assertions.fail("A JobValidationException should have been thrown here."); + } catch (Exception e) { + Assertions.assertInstanceOf(JobValidationException.class, e); + } + } + + /** + * Tests the preview mode of the content import process. This test: + *

    + *
  • Creates a test content type
  • + *
  • Generates a test CSV file with sample content
  • + *
  • Processes the import in preview mode
  • + *
  • Verifies the preview results and metadata
  • + *
  • Verifies there is no content creation in the database
  • + *
+ * + *

The test ensures that preview mode properly validates the content + * without actually creating it in the system using the language ISO code instead of the ID. + * + * @throws Exception if there's an error during the test execution + */ + @Test + void test_process_preview_using_language_iso_code() throws Exception { + + ContentType testContentType = null; + + try { + // Initialize processor + final var processor = new ImportContentletsProcessor(); + + // Create test content type + testContentType = createTestContentType(); + + // Create test CSV file + File csvFile = createTestCsvFile(); + + // Create test job + final var testJob = createTestJob( + csvFile, "preview", "en-us", testContentType.variable(), + "b9d89c80-3d88-4311-8365-187323c96436" + ); + + // Process the job in preview mode + processor.process(testJob); + + // Verify preview results + Map metadata = processor.getResultMetadata(testJob); + assertNotNull(metadata, "Preview metadata should not be null"); + assertNotNull(metadata.get("errors"), "Preview metadata errors should not be null"); + assertNotNull(metadata.get("results"), "Preview metadata results should not be null"); + assertEquals(0, ((ArrayList) metadata.get("errors")).size(), + "Preview metadata errors should be empty"); + + // Verify no content was created + final var importedContent = findImportedContent(testContentType.id()); + assertNotNull(importedContent, "Imported content should not be null"); + assertEquals(0, importedContent.size(), "Imported content should have no items"); + + } finally { + if (testContentType != null) { + // Clean up test content type + APILocator.getContentTypeAPI(systemUser).delete(testContentType); + } + } + } + + /** + * Scenario: Test the preview mode of the content import process with an invalid language. + *

+ * Expected: A JobValidationException should be thrown. + * + * @throws Exception if there's an error during the test execution + */ + @Test + void test_process_preview_invalid_language() throws Exception { + + // Initialize processor + final var processor = new ImportContentletsProcessor(); + + // Create test CSV file + File csvFile = createTestCsvFile(); + + // Create test job + final var testJob = createTestJob( + csvFile, "preview", "12345", "doesNotExist", + "b9d89c80-3d88-4311-8365-187323c96436" + ); + + try { + // Process the job in preview mode + processor.process(testJob); + Assertions.fail("A JobValidationException should have been thrown here."); + } catch (Exception e) { + Assertions.assertInstanceOf(JobValidationException.class, e); + } + } + /** * Tests the preview mode of the content import process. This test: *

    @@ -105,7 +289,8 @@ void test_process_preview() throws Exception { // Create test job final var testJob = createTestJob( - csvFile, "preview", testContentType.id(), "b9d89c80-3d88-4311-8365-187323c96436" + csvFile, "preview", "1", testContentType.id(), + "b9d89c80-3d88-4311-8365-187323c96436" ); // Process the job in preview mode @@ -163,7 +348,8 @@ void test_process_publish() throws Exception { // Create test job final var testJob = createTestJob( - csvFile, "publish", testContentType.id(), "b9d89c80-3d88-4311-8365-187323c96436" + csvFile, "publish", "1", testContentType.id(), + "b9d89c80-3d88-4311-8365-187323c96436" ); // Process the job in preview mode @@ -205,14 +391,16 @@ private ContentType createTestContentType() { * * @param csvFile The CSV file containing the content to be imported * @param cmd The command to execute ('preview' or 'publish') - * @param contentTypeId The ID of the content type for the imported content + * @param contentType The content type for the imported content + * @param language The language of the imported content * @param workflowActionId The ID of the workflow action to be applied * @return A configured {@link Job} instance ready for processing * @throws IOException if there's an error reading the CSV file * @throws DotSecurityException if there's a security violation during job creation */ - private Job createTestJob(final File csvFile, final String cmd, final String contentTypeId, - final String workflowActionId) throws IOException, DotSecurityException { + private Job createTestJob(final File csvFile, final String cmd, final String language, + final String contentType, final String workflowActionId) + throws IOException, DotSecurityException { final Map jobParameters = new HashMap<>(); @@ -221,9 +409,11 @@ private Job createTestJob(final File csvFile, final String cmd, final String con jobParameters.put("userId", systemUser.getUserId()); jobParameters.put("siteName", defaultSite.getHostname()); jobParameters.put("siteIdentifier", defaultSite.getIdentifier()); - jobParameters.put("contentType", contentTypeId); + jobParameters.put("contentType", contentType); jobParameters.put("workflowActionId", workflowActionId); - jobParameters.put("language", "1"); + if (language != null) { + jobParameters.put("language", language); + } final TempFileAPI tempFileAPI = APILocator.getTempFileAPI(); try (final var fileInputStream = new FileInputStream(csvFile)) { diff --git a/dotcms-postman/src/main/resources/postman/JobQueueResourceAPITests.postman_collection.json b/dotcms-postman/src/main/resources/postman/JobQueueResourceAPITests.postman_collection.json index cbc8bc3971d6..14ecba3220a9 100644 --- a/dotcms-postman/src/main/resources/postman/JobQueueResourceAPITests.postman_collection.json +++ b/dotcms-postman/src/main/resources/postman/JobQueueResourceAPITests.postman_collection.json @@ -1,6 +1,6 @@ { "info": { - "_postman_id": "cc2de2d8-aecf-4063-a97c-089965ba573d", + "_postman_id": "be9c354a-6c94-4b10-be0e-bc0c1b324c24", "name": "JobQueueResource API Tests", "description": "Postman collection for testing the JobQueueResource API endpoints.", "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json", @@ -772,7 +772,7 @@ "response": [] }, { - "name": "Cancel Job", + "name": "Create Failing Job", "event": [ { "listen": "test", @@ -782,24 +782,10 @@ " pm.response.to.have.status(200);", "});", "", - "// Check if cancellation message is returned", "var jsonData = pm.response.json();", - "pm.test(\"Job cancelled successfully\", function () {", - " pm.expect(jsonData.entity).to.include('Cancellation request successfully sent to job');", - "});", - "", - "var jobId = pm.collectionVariables.get(\"jobId\");", - "console.log(\" At the time this request was sent \" + jobId);", - "pm.collectionVariables.set(\"cancelledJobId\",jobId);" - ], - "type": "text/javascript", - "packages": {} - } - }, - { - "listen": "prerequest", - "script": { - "exec": [ + "pm.expect(jsonData.entity).to.be.a('String');", + "// Save jobId to environment variable", + "pm.environment.set(\"failingJobId\", jsonData.entity);", "" ], "type": "text/javascript", @@ -810,8 +796,17 @@ "request": { "method": "POST", "header": [], + "body": { + "mode": "raw", + "raw": "{\n \"fail\": true\n}", + "options": { + "raw": { + "language": "json" + } + } + }, "url": { - "raw": "{{baseUrl}}/api/v1/jobs/{{jobId}}/cancel", + "raw": "{{baseUrl}}/api/v1/jobs/failSuccess", "host": [ "{{baseUrl}}" ], @@ -819,16 +814,15 @@ "api", "v1", "jobs", - "{{jobId}}", - "cancel" + "failSuccess" ] }, - "description": "Cancels a specific job." + "description": "Creates a new job in the specified queue (Create Failing Job)" }, "response": [] }, { - "name": "Create Failing Job", + "name": "Create Success Job", "event": [ { "listen": "test", @@ -841,7 +835,7 @@ "var jsonData = pm.response.json();", "pm.expect(jsonData.entity).to.be.a('String');", "// Save jobId to environment variable", - "pm.environment.set(\"failingJobId\", jsonData.entity);", + "pm.environment.set(\"successJobId\", jsonData.entity);", "" ], "type": "text/javascript", @@ -854,7 +848,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"fail\": true\n}", + "raw": "{}", "options": { "raw": { "language": "json" @@ -873,12 +867,32 @@ "failSuccess" ] }, - "description": "Creates a new job in the specified queue (Create Failing Job)" + "description": "Creates a new job in the specified queue (Create a job that will finish sucessfully)" }, "response": [] }, { - "name": "Create Success Job", + "name": "Allow job to run before stopping", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "https://postman-echo.com/delay/10", + "protocol": "https", + "host": [ + "postman-echo", + "com" + ], + "path": [ + "delay", + "10" + ] + } + }, + "response": [] + }, + { + "name": "Cancel Job", "event": [ { "listen": "test", @@ -888,10 +902,24 @@ " pm.response.to.have.status(200);", "});", "", + "// Check if cancellation message is returned", "var jsonData = pm.response.json();", - "pm.expect(jsonData.entity).to.be.a('String');", - "// Save jobId to environment variable", - "pm.environment.set(\"successJobId\", jsonData.entity);", + "pm.test(\"Job cancelled successfully\", function () {", + " pm.expect(jsonData.entity).to.include('Cancellation request successfully sent to job');", + "});", + "", + "var jobId = pm.collectionVariables.get(\"jobId\");", + "console.log(\" At the time this request was sent \" + jobId);", + "pm.environment.set(\"cancelledJobId\",jobId);" + ], + "type": "text/javascript", + "packages": {} + } + }, + { + "listen": "prerequest", + "script": { + "exec": [ "" ], "type": "text/javascript", @@ -902,17 +930,8 @@ "request": { "method": "POST", "header": [], - "body": { - "mode": "raw", - "raw": "{}", - "options": { - "raw": { - "language": "json" - } - } - }, "url": { - "raw": "{{baseUrl}}/api/v1/jobs/failSuccess", + "raw": "{{baseUrl}}/api/v1/jobs/{{jobId}}/cancel", "host": [ "{{baseUrl}}" ], @@ -920,10 +939,11 @@ "api", "v1", "jobs", - "failSuccess" + "{{jobId}}", + "cancel" ] }, - "description": "Creates a new job in the specified queue (Create a job that will finish sucessfully)" + "description": "Cancels a specific job." }, "response": [] }, @@ -1018,27 +1038,79 @@ "status" ] }, - "description": "Retrieves the status of a specific job." + "description": "Waits for the job to finish" }, "response": [] }, { - "name": "Monitor Non Existing Job", + "name": "Waiting Job to fail", "event": [ { "listen": "test", "script": { "exec": [ - "pm.test(\"Status code is 200\", function () {", + "const maxTimeout = 30000; // 10 seconds", + "const maxRetries = 10; // Maximum number of retry attempts", + "const startTime = parseInt(pm.environment.get(\"startTime\"));", + "const retryCount = parseInt(pm.environment.get(\"retryCount\"));", + "const elapsedTime = Date.now() - startTime;", + "", + "console.log(`Attempt ${retryCount + 1}, Elapsed time: ${elapsedTime}ms`);", + "", + "var response = pm.response.json();", + "console.log(\"Current job state:\", response.entity.state);", + " ", + "// Check if job status is \"FAILED\"", + "if (response.entity.state === \"FAILED\") {", + " // Clear environment variables once done", + " pm.environment.unset(\"startTime\");", + " pm.environment.unset(\"retryCount\");", + "} else if (elapsedTime < maxTimeout && retryCount < maxRetries) {", + " // Increment retry count", + " pm.environment.set(\"retryCount\", retryCount + 1);", + " ", + " setTimeout(function(){", + " console.log(\"Sleeping for 3 seconds before next request.\");", + " }, 3000);", + " postman.setNextRequest(\"Waiting Job to fail\");", + " console.log(`Job still processing, retrying... (${maxTimeout - elapsedTime}ms remaining)`);", + "} else {", + " // If we exceed the max timeout or max retries, fail the test", + " const timeoutReason = elapsedTime >= maxTimeout ? \"timeout\" : \"max retries\";", + " pm.environment.unset(\"startTime\");", + " pm.environment.unset(\"retryCount\");", + " pm.test(`Job state check failed due to ${timeoutReason}`, function () {", + " pm.expect.fail(`${timeoutReason} reached after ${elapsedTime}ms. Job still in processing state after ${retryCount} attempts`);", + " });", + "}", + "", + "// Add response validation", + "pm.test(\"Response is successful\", function () {", + " pm.response.to.be.success;", " pm.response.to.have.status(200);", "});", "", - "pm.test(\"Response contains job-not-found event and 404 data\", function () {", - " const responseText = pm.response.text();", - " pm.expect(responseText).to.include(\"event: job-not-found\");", - " pm.expect(responseText).to.include(\"data: 404\");", - "});", - "" + "pm.test(\"Response has the correct structure\", function () {", + " const response = pm.response.json();", + " pm.expect(response).to.have.property('entity');", + " pm.expect(response.entity).to.have.property('state');", + "});" + ], + "type": "text/javascript", + "packages": {} + } + }, + { + "listen": "prerequest", + "script": { + "exec": [ + "if (!pm.environment.get(\"startTime\")) {", + " pm.environment.set(\"startTime\", Date.now());", + "}", + "", + "if (!pm.environment.get(\"retryCount\")) {", + " pm.environment.set(\"retryCount\", 0);", + "}" ], "type": "text/javascript", "packages": {} @@ -1047,14 +1119,9 @@ ], "request": { "method": "GET", - "header": [ - { - "key": "Accept", - "value": "text/event-stream" - } - ], + "header": [], "url": { - "raw": "{{baseUrl}}/api/v1/jobs/nonExistingJob/monitor", + "raw": "{{baseUrl}}/api/v1/jobs/{{failingJobId}}/status", "host": [ "{{baseUrl}}" ], @@ -1062,99 +1129,67 @@ "api", "v1", "jobs", - "nonExistingJob", - "monitor" + "{{failingJobId}}", + "status" ] }, - "description": "Monitors a specific job using Server-Sent Events (SSE)." + "description": "Waits for the job to finish" }, "response": [] }, { - "name": "Get Job Status Expect Cancel", + "name": "Waiting Job to be canceled", "event": [ { "listen": "test", "script": { "exec": [ - "// Configuration", - "const maxRetries = 5; // Number of times to retry", - "const retryDelay = 4000; // Delay between retries in milliseconds", - "", - "// Get server URL from environment variable", - "const serverURL = pm.environment.get('serverURL') || pm.collectionVariables.get('baseUrl'); // fallback to baseURL if serverURL is not defined", - "", - "// Assuming 'jobId' is set as an environment variable", - "const jobId = pm.collectionVariables.get('cancelledJobId');", - "const checkUrl = `${serverURL}/api/v1/jobs/${jobId}/status`;", - "const cancelUrl = `${serverURL}/api/v1/jobs/${jobId}/cancel`;", - "", - "// Function to check the job state", - "function checkJobState(retriesLeft) {", - "", - " console.log(\"checkURL url :: \"+checkUrl); ", - " console.log(\"cancelUrl url :: \"+cancelUrl); ", - " const token = pm.collectionVariables.get('jwt');", - "", - " pm.sendRequest({", - " url:checkUrl,", - " method:'GET',", - " header: {", - " 'Authorization': `Bearer ${token}`, // Add Bearer token in Authorization header", - " 'Content-Type': 'application/json'", - " }", - " }, ", - " function (err, response) {", - " if (err) {", - " console.error(\"Error retrieving job status:\", err);", - " return;", - " }", - " ", - " let jsonData = response.json();", - " const jobState = jsonData.entity.state;", - " console.log(jobState);", - " ", - " // Test for \"CANCELED\" state", - " if (jobState === \"CANCELED\") {", - " pm.test(\"Job has been CANCELED\", function () {", - " pm.expect(jobState).to.eql(\"CANCELED\");", - " });", - " console.log(\"Job has been successfully canceled.\");", - " } else if (retriesLeft > 0) {", - " console.log(\" retriesLeft :: \"+retriesLeft);", - " // Send a cancel POST request and retry", - " pm.sendRequest({", - " url: cancelUrl,", - " method: 'POST',", - " header: {", - " 'Authorization': `Bearer ${token}`, // Add Bearer token in Authorization header", - " 'Content-Type': 'application/json'", - " }", - " }, function (cancelErr, cancelResponse) {", - " if (cancelErr) {", - " console.error(\"Error sending cancel request:\", cancelErr);", - " } else {", - " console.log(`Cancel request sent. Status: ${cancelResponse.status}`);", - " }", - " ", - " // Wait for a delay and then check the status again", - " setTimeout(function () {", - " checkJobState(retriesLeft - 1);", - " }, retryDelay);", - " });", - " } else {", - " // If maximum retries are reached and job is still not canceled", - " pm.test(\"Job has not been CANCELED after maximum retries\", function () {", - " pm.expect(jobState).to.eql(\"CANCELED\");", - " });", - " console.warn(\"Job status is still not 'CANCELED' after maximum retries.\");", - " }", + "const maxTimeout = 30000; // 10 seconds", + "const maxRetries = 10; // Maximum number of retry attempts", + "const startTime = parseInt(pm.environment.get(\"startTime\"));", + "const retryCount = parseInt(pm.environment.get(\"retryCount\"));", + "const elapsedTime = Date.now() - startTime;", + "", + "console.log(`Attempt ${retryCount + 1}, Elapsed time: ${elapsedTime}ms`);", + "", + "var response = pm.response.json();", + "console.log(\"Current job state:\", response.entity.state);", + " ", + "// Check if job status is \"CANCELED\"", + "if (response.entity.state === \"CANCELED\") {", + " // Clear environment variables once done", + " pm.environment.unset(\"startTime\");", + " pm.environment.unset(\"retryCount\");", + "} else if (elapsedTime < maxTimeout && retryCount < maxRetries) {", + " // Increment retry count", + " pm.environment.set(\"retryCount\", retryCount + 1);", + " ", + " setTimeout(function(){", + " console.log(\"Sleeping for 3 seconds before next request.\");", + " }, 3000);", + " postman.setNextRequest(\"Waiting Job to be canceled\");", + " console.log(`Job still processing, retrying... (${maxTimeout - elapsedTime}ms remaining)`);", + "} else {", + " // If we exceed the max timeout or max retries, fail the test", + " const timeoutReason = elapsedTime >= maxTimeout ? \"timeout\" : \"max retries\";", + " pm.environment.unset(\"startTime\");", + " pm.environment.unset(\"retryCount\");", + " pm.test(`Job state check failed due to ${timeoutReason}`, function () {", + " pm.expect.fail(`${timeoutReason} reached after ${elapsedTime}ms. Job still in processing state after ${retryCount} attempts`);", " });", "}", "", - "// Initial job state check", - "checkJobState(maxRetries);", - "" + "// Add response validation", + "pm.test(\"Response is successful\", function () {", + " pm.response.to.be.success;", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Response has the correct structure\", function () {", + " const response = pm.response.json();", + " pm.expect(response).to.have.property('entity');", + " pm.expect(response.entity).to.have.property('state');", + "});" ], "type": "text/javascript", "packages": {} @@ -1164,15 +1199,13 @@ "listen": "prerequest", "script": { "exec": [ - "function sleep(milliseconds) {", - " const start = Date.now();", - " while (Date.now() - start < milliseconds) {", - " // Busy-wait loop that blocks the execution", - " }", + "if (!pm.environment.get(\"startTime\")) {", + " pm.environment.set(\"startTime\", Date.now());", "}", "", - "", - "sleep(9000);" + "if (!pm.environment.get(\"retryCount\")) {", + " pm.environment.set(\"retryCount\", 0);", + "}" ], "type": "text/javascript", "packages": {} @@ -1195,7 +1228,55 @@ "status" ] }, - "description": "Retrieves the status of a specific job. We expect to get one in status Canceled." + "description": "Waits for the job to finish" + }, + "response": [] + }, + { + "name": "Monitor Non Existing Job", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Response contains job-not-found event and 404 data\", function () {", + " const responseText = pm.response.text();", + " pm.expect(responseText).to.include(\"event: job-not-found\");", + " pm.expect(responseText).to.include(\"data: 404\");", + "});", + "" + ], + "type": "text/javascript", + "packages": {} + } + } + ], + "request": { + "method": "GET", + "header": [ + { + "key": "Accept", + "value": "text/event-stream" + } + ], + "url": { + "raw": "{{baseUrl}}/api/v1/jobs/nonExistingJob/monitor", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "v1", + "jobs", + "nonExistingJob", + "monitor" + ] + }, + "description": "Monitors a specific job using Server-Sent Events (SSE)." }, "response": [] }, @@ -1207,7 +1288,7 @@ "script": { "exec": [ "// Get the expected job ID from collection variables", - "const jobId = pm.collectionVariables.get('cancelledJobId');", + "const jobId = pm.environment.get('cancelledJobId');", "", "// Parse the response JSON", "const response = pm.response.json();", @@ -1284,6 +1365,9 @@ "listen": "test", "script": { "exec": [ + "// Get the expected job ID from collection variables", + "const jobId = pm.environment.get('successJobId');", + "", "// Parse the response JSON", "const response = pm.response.json();", "", @@ -1305,7 +1389,7 @@ "", "// Validate that the job ID in the response matches the expected job ID", "pm.test(\"Job ID should match expected job ID\", function () {", - " pm.expect(response.entity.jobs[0].id).to.eql(pm.environment.get('successJobId'));", + " pm.expect(response.entity.jobs[0].id).to.eql(jobId);", "});" ], "type": "text/javascript", @@ -1359,74 +1443,32 @@ "listen": "test", "script": { "exec": [ - "// Configuration", - "const maxRetries = 5; // Maximum number of retries", - "const retryDelay = 2000; // Delay between retries in milliseconds", - "", - "// Get server URL from environment variables", - "const serverURL = pm.environment.get('serverURL') || pm.collectionVariables.get('baseUrl'); // Fallback to baseURL if serverURL is not defined", - "", - "// Define the URL for job status verification", - "const checkUrl = `${serverURL}/api/v1/jobs/failed`;", - "", - "// Function to check the status of jobs", - "function checkJobState(retriesLeft) {", - "", - " console.log(\"Checking jobs URL: \" + checkUrl);", - " const token = pm.collectionVariables.get('jwt'); ", - " // Send a GET request to fetch job statuses", - " pm.sendRequest({", - " url: checkUrl,", - " method: 'GET',", - " header: {", - " 'Authorization': `Bearer ${token}`, // Add Bearer token in Authorization header", - " 'Content-Type': 'application/json'", - " }", - " }, function (err, response) {", - " if (err) {", - " console.error(\"Error retrieving job statuses:\", err);", - " return;", - " }", - "", - " let jsonData = response.json();", - " let jobs = jsonData.entity.jobs;", - "", - " if (jobs.length > 0) {", - " // Check if all jobs have the \"FAILED\" status", - " const allFailed = jobs.every(job => job.state === \"FAILED\");", - "", - " if (allFailed) {", - " // Postman test to validate that all jobs are in the \"FAILED\" state", - " pm.test(\"All jobs are in 'FAILED' state\", function () {", - " pm.expect(allFailed).to.be.true;", - " });", - " console.log(\"All jobs are in 'FAILED' state.\");", - " } else {", - " // If any job is not in the \"FAILED\" state", - " pm.test(\"Some jobs are not in 'FAILED' state\", function () {", - " pm.expect(allFailed).to.be.true; // This will fail if not all jobs are \"FAILED\"", - " });", - " console.warn(\"Not all jobs are in 'FAILED' state.\");", - " }", - " } else if (retriesLeft > 0) {", - " // If no jobs are found and retries are left, wait and retry", - " console.log(\"No jobs available, retries left: \" + retriesLeft);", - " setTimeout(function () {", - " checkJobState(retriesLeft - 1);", - " }, retryDelay);", - " } else {", - " // If no jobs and no retries are left", - " pm.test(\"Maximum retries reached, no jobs received.\", function () {", - " pm.expect(jobs.length).to.be.greaterThan(0); // This will fail if no jobs are found", - " });", - " console.warn(\"No jobs found after maximum retries.\");", - " }", - " });", - "}", + "// Get the expected job ID from collection variables", + "const jobId = pm.environment.get('failingJobId');", "", - "// Start job status check with the maximum number of retries", - "checkJobState(maxRetries);", - "" + "// Parse the response JSON", + "const response = pm.response.json();", + "", + "// Validate that the response status is 200 OK", + "pm.test(\"Response status is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "// Validate that the response contains an \"entity.jobs\" array", + "pm.test(\"Response should contain jobs array\", function () {", + " pm.expect(response.entity).to.have.property(\"jobs\");", + " pm.expect(response.entity.jobs).to.be.an(\"array\");", + "});", + "", + "// Validate that the jobs array contains only one job", + "pm.test(\"Jobs array should contain only one job\", function () {", + " pm.expect(response.entity.jobs.length).to.eql(1);", + "});", + "", + "// Validate that the job ID in the response matches the expected job ID", + "pm.test(\"Job ID should match expected job ID\", function () {", + " pm.expect(response.entity.jobs[0].id).to.eql(jobId);", + "});" ], "type": "text/javascript", "packages": {} @@ -1465,83 +1507,50 @@ "response": [] }, { - "name": "List Jobs Expect Fail and Cancelled", + "name": "List Jobs Expect Fail, Completed and Cancelled", "event": [ { "listen": "test", "script": { "exec": [ - "// Configuration", - "const maxRetries = 5; // Maximum number of retries", - "const retryDelay = 2000; // Delay between retries in milliseconds", - "", - "// Get server URL from environment variables", - "const serverURL = pm.environment.get('serverURL') || pm.collectionVariables.get('baseUrl'); // Use baseURL as fallback if serverURL is not defined", - "", - "// Define the URL to check job statuses", - "const checkUrl = `${serverURL}/api/v1/jobs`;", - "", - "// Function to check if there are jobs in \"FAILED\" or \"CANCELED\" state", - "function checkJobState(retriesLeft) {", - "", - " console.log(\"Checking jobs URL: \" + checkUrl);", - " const token = pm.collectionVariables.get(\"jwt\");", - " // Send a GET request to get the job statuses", - " pm.sendRequest({", - " url: checkUrl,", - " method: 'GET',", - " header: {", - " 'Authorization': `Bearer ${token}`, // Add Bearer token in Authorization header", - " 'Content-Type': 'application/json'", - " }", - " }, function (err, response) {", - " if (err) {", - " console.error(\"Error retrieving job statuses:\", err);", - " return;", - " }", - "", - " let jsonData = response.json();", - " let jobs = jsonData.entity.jobs;", - "", - " if (jobs.length > 0) {", - " // Check if there are jobs with \"FAILED\" and \"CANCELED\" status", - " const hasFailed = jobs.some(job => job.state === \"FAILED\");", - " const hasCanceled = jobs.some(job => job.state === \"CANCELED\");", - "", - " // Postman test to validate that there are jobs with \"FAILED\" statuses", - " pm.test(\"There are jobs in 'FAILED' state\", function () {", - " pm.expect(hasFailed).to.be.true; ", - " });", - "", - " // Postman test to validate that there are jobs with \"CANCELED\" statuses", - " pm.test(\"There are jobs in 'CANCELED' state\", function () { ", - " pm.expect(hasCanceled).to.be.true;", - " });", - "", - " if (hasFailed && hasCanceled) {", - " console.log(\"Found jobs in 'FAILED' and 'CANCELED' state.\");", - " } else {", - " console.warn(\"Did not find jobs in both 'FAILED' and 'CANCELED' states.\");", - " }", - " } else if (retriesLeft > 0) {", - " // If no jobs are found and retries are left, wait and retry", - " console.log(\"No jobs available, retries left: \" + retriesLeft);", - " setTimeout(function () {", - " checkJobState(retriesLeft - 1);", - " }, retryDelay);", - " } else {", - " // If no jobs are found and no retries are left", - " pm.test(\"Maximum retries reached, no jobs received.\", function () {", - " pm.expect(jobs.length).to.be.greaterThan(0); // This will fail if no jobs are found", - " });", - " console.warn(\"No jobs found after reaching maximum retries.\");", - " }", - " });", - "}", + "// Parse the response JSON", + "const response = pm.response.json();", "", - "// Start checking job statuses with the maximum number of retries", - "checkJobState(maxRetries);", - "" + "// Validate that the response status is 200 OK", + "pm.test(\"Response status is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "// Validate that the response contains an \"entity.jobs\" array", + "pm.test(\"Response should contain jobs array\", function () {", + " pm.expect(response.entity).to.have.property(\"jobs\");", + " pm.expect(response.entity.jobs).to.be.an(\"array\");", + "});", + "", + "// Validate that the jobs array contains jobs", + "pm.test(\"Jobs array should have data\", function () {", + " pm.expect(response.entity.jobs.length).to.eql(3);", + "});", + "", + "// Check if there are jobs with \"FAILED\" and \"CANCELED\" status", + "const hasFailed = response.entity.jobs.some(job => job.state === \"FAILED\");", + "const hasCanceled = response.entity.jobs.some(job => job.state === \"CANCELED\");", + "const hasCompleted = response.entity.jobs.some(job => job.state === \"COMPLETED\");", + "", + "// Postman test to validate that there are jobs with \"FAILED\" statuses", + "pm.test(\"There are jobs in 'FAILED' state\", function () {", + " pm.expect(hasFailed).to.be.true; ", + "});", + "", + "// Postman test to validate that there are jobs with \"CANCELED\" statuses", + "pm.test(\"There are jobs in 'CANCELED' state\", function () { ", + " pm.expect(hasCanceled).to.be.true;", + "});", + "", + "// Postman test to validate that there are jobs with \"COMPLETED\" statuses", + "pm.test(\"There are jobs in 'COMPLETED' state\", function () { ", + " pm.expect(hasCompleted).to.be.true;", + "});" ], "type": "text/javascript", "packages": {} @@ -1693,6 +1702,14 @@ { "key": "jwt", "value": "" + }, + { + "key": "successJobId", + "value": "" + }, + { + "key": "failingJobId", + "value": "" } ] } \ No newline at end of file