From a9329c4ec96304038f0bfb4bde3ea84a5e600bb8 Mon Sep 17 00:00:00 2001 From: Jonathan Gamba Date: Wed, 13 Nov 2024 08:17:40 -0600 Subject: [PATCH] Feat (Core): Migrate import contentlets action to job processor - Feedback improvements (#30617) This pull request includes several changes to the job queue management system, focusing on improving the handling of job cancellation requests and updating exception handling. The key changes include the introduction of a new event for job cancellation requests, modifications to exception types for consistency, and enhancements to job state management. ### Improvements to job cancellation handling: * Added a new `JobCancelRequestEvent` class to represent job cancellation requests and handle them appropriately (`dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancelRequestEvent.java`). * Introduced a method `onCancelRequestJob` to handle job cancellation requests and updated the `cancelJob` method to use this new method (`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`). [[1]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R180-R184) [[2]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R862-R909) ### Exception handling improvements: * Replaced `JobQueueDataException` with `DotDataException` in various methods to standardize exception handling (`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java`). [[1]](diffhunk://#diff-97639376a50922f533c812eb8848f70e1913df8f14f0fb5bb582243f5660e465L10) [[2]](diffhunk://#diff-97639376a50922f533c812eb8848f70e1913df8f14f0fb5bb582243f5660e465L97-R99) [[3]](diffhunk://#diff-97639376a50922f533c812eb8848f70e1913df8f14f0fb5bb582243f5660e465L118-R149) * Updated exception handling in `getActiveJobs`, `getCompletedJobs`, `getCanceledJobs`, and `getFailedJobs` methods to throw `DotDataException` instead of `JobQueueDataException` (`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`). [[1]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L307-R323) [[2]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L327-R434) ### Job state management: * Added a method `getJobState` to fetch the state of a job and ensure the latest state is used during job progress updates (`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`). * Enhanced the `handleJobCompletion` method to check if a job has been in a canceling state and update its status accordingly (`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`). ### Code cleanup and imports: * Removed unused import `JobQueueDataException` and added necessary imports for new event handling (`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java`). * Added imports for new event handling and exception classes (`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`). [[1]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R3-R8) [[2]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R20) [[3]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R37-R44) --- .../business/api/JobQueueConfigProducer.java | 2 +- .../jobs/business/api/JobQueueManagerAPI.java | 21 +- .../business/api/JobQueueManagerAPIImpl.java | 227 ++++--- .../api/events/JobCancelRequestEvent.java | 38 ++ .../api/events/RealTimeJobMonitor.java | 98 ++- .../error/JobCancellationException.java | 10 + .../dotcms/jobs/business/job/JobCache.java | 64 ++ .../jobs/business/job/JobCacheImpl.java | 90 +++ .../dotcms/jobs/business/job/JobState.java | 15 +- .../impl/ImportContentletsProcessor.java | 385 +++++------- .../processor/impl/LargeFileReader.java | 12 +- .../dotcms/jobs/business/queue/JobQueue.java | 18 +- .../jobs/business/queue/PostgresJobQueue.java | 65 +- .../dotcms/jobs/business/util/JobUtil.java | 18 + .../rest/api/v1/job/JobQueueHelper.java | 97 ++- .../rest/api/v1/job/JobQueueResource.java | 72 ++- .../rest/api/v1/job/SSEConnectionManager.java | 270 ++++++++ .../dotmarketing/business/CacheLocator.java | 15 +- .../JobQueueManagerAPIIntegrationTest.java | 10 +- .../business/api/JobQueueManagerAPITest.java | 246 +++++--- ...rtContentletsProcessorIntegrationTest.java | 204 +++++- ...ueResourceAPITests.postman_collection.json | 589 +++++++++--------- 22 files changed, 1800 insertions(+), 766 deletions(-) create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancelRequestEvent.java create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/job/JobCache.java create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/job/JobCacheImpl.java create mode 100644 dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java index 8e10fe7e47b9..58aa8a9869a3 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java @@ -18,7 +18,7 @@ public class JobQueueConfigProducer { // The interval in milliseconds to poll for job updates. static final int DEFAULT_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS = Config.getIntProperty( - "JOB_QUEUE_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS", 1000 + "JOB_QUEUE_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS", 3000 ); /** diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java index 86f5b24ca77d..8ada82da14c0 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java @@ -7,7 +7,6 @@ import com.dotcms.jobs.business.job.JobPaginatedResult; import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.queue.JobQueue; -import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotmarketing.exception.DotDataException; import java.util.Map; import java.util.Optional; @@ -94,10 +93,10 @@ String createJob(String queueName, Map parameters) * @param page The page number * @param pageSize The number of jobs per page * @return A result object containing the list of active jobs and pagination information. - * @throws JobQueueDataException if there's an error fetching the jobs + * @throws DotDataException if there's an error fetching the jobs */ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) - throws JobQueueDataException; + throws DotDataException; /** * Retrieves a list of jobs. @@ -115,9 +114,9 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) * @param page The page number * @param pageSize The number of jobs per page * @return A result object containing the list of active jobs and pagination information. - * @throws JobQueueDataException if there's an error fetching the jobs + * @throws DotDataException if there's an error fetching the jobs */ - JobPaginatedResult getActiveJobs(int page, int pageSize) throws JobQueueDataException; + JobPaginatedResult getActiveJobs(int page, int pageSize) throws DotDataException; /** * Retrieves a list of completed jobs @@ -125,9 +124,9 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) * @param page The page number * @param pageSize The number of jobs per page * @return A result object containing the list of completed jobs and pagination information. - * @throws JobQueueDataException if there's an error fetching the jobs + * @throws DotDataException if there's an error fetching the jobs */ - JobPaginatedResult getCompletedJobs(int page, int pageSize) throws JobQueueDataException; + JobPaginatedResult getCompletedJobs(int page, int pageSize) throws DotDataException; /** * Retrieves a list of canceled jobs @@ -135,9 +134,9 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) * @param page The page number * @param pageSize The number of jobs per page * @return A result object containing the list of canceled jobs and pagination information. - * @throws JobQueueDataException if there's an error fetching the jobs + * @throws DotDataException if there's an error fetching the jobs */ - JobPaginatedResult getCanceledJobs(int page, int pageSize) throws JobQueueDataException; + JobPaginatedResult getCanceledJobs(int page, int pageSize) throws DotDataException; /** * Retrieves a list of failed jobs @@ -145,9 +144,9 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) * @param page The page number * @param pageSize The number of jobs per page * @return A result object containing the list of failed jobs and pagination information. - * @throws JobQueueDataException if there's an error fetching the jobs + * @throws DotDataException if there's an error fetching the jobs */ - JobPaginatedResult getFailedJobs(int page, int pageSize) throws JobQueueDataException; + JobPaginatedResult getFailedJobs(int page, int pageSize) throws DotDataException; /** * Cancels a job. diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index ffb777cf8791..0c74f378d4e6 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -1,8 +1,11 @@ package com.dotcms.jobs.business.api; +import com.dotcms.api.system.event.Payload; +import com.dotcms.api.system.event.SystemEventType; import com.dotcms.business.CloseDBIfOpened; import com.dotcms.business.WrapInTransaction; import com.dotcms.jobs.business.api.events.EventProducer; +import com.dotcms.jobs.business.api.events.JobCancelRequestEvent; import com.dotcms.jobs.business.api.events.JobCanceledEvent; import com.dotcms.jobs.business.api.events.JobCancellingEvent; import com.dotcms.jobs.business.api.events.JobCompletedEvent; @@ -14,6 +17,7 @@ import com.dotcms.jobs.business.api.events.RealTimeJobMonitor; import com.dotcms.jobs.business.error.CircuitBreaker; import com.dotcms.jobs.business.error.ErrorDetail; +import com.dotcms.jobs.business.error.JobCancellationException; import com.dotcms.jobs.business.error.JobProcessorNotFoundException; import com.dotcms.jobs.business.error.RetryPolicyProcessor; import com.dotcms.jobs.business.error.RetryStrategy; @@ -30,11 +34,14 @@ import com.dotcms.jobs.business.queue.error.JobNotFoundException; import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotcms.jobs.business.queue.error.JobQueueException; +import com.dotcms.system.event.local.model.EventSubscriber; +import com.dotmarketing.business.APILocator; import com.dotmarketing.exception.DoesNotExistException; import com.dotmarketing.exception.DotDataException; import com.dotmarketing.exception.DotRuntimeException; import com.dotmarketing.util.Logger; import com.google.common.annotations.VisibleForTesting; +import io.vavr.control.Try; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Arrays; @@ -170,6 +177,11 @@ public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue, // Events this.realTimeJobMonitor = realTimeJobMonitor; this.eventProducer = eventProducer; + + APILocator.getLocalSystemEventsAPI().subscribe( + JobCancelRequestEvent.class, + (EventSubscriber) this::onCancelRequestJob + ); } @Override @@ -304,11 +316,11 @@ public Job getJob(final String jobId) throws DotDataException { @CloseDBIfOpened @Override public JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) - throws JobQueueDataException { + throws DotDataException { try { return jobQueue.getActiveJobs(queueName, page, pageSize); } catch (JobQueueDataException e) { - throw new JobQueueDataException("Error fetching active jobs", e); + throw new DotDataException("Error fetching active jobs", e); } } @@ -324,45 +336,41 @@ public JobPaginatedResult getJobs(final int page, final int pageSize) throws Dot @CloseDBIfOpened @Override - public JobPaginatedResult getActiveJobs(int page, int pageSize) - throws JobQueueDataException { + public JobPaginatedResult getActiveJobs(int page, int pageSize) throws DotDataException { try { return jobQueue.getActiveJobs(page, pageSize); } catch (JobQueueDataException e) { - throw new JobQueueDataException("Error fetching active jobs", e); + throw new DotDataException("Error fetching active jobs", e); } } @CloseDBIfOpened @Override - public JobPaginatedResult getCompletedJobs(int page, int pageSize) - throws JobQueueDataException { + public JobPaginatedResult getCompletedJobs(int page, int pageSize) throws DotDataException { try { return jobQueue.getCompletedJobs(page, pageSize); } catch (JobQueueDataException e) { - throw new JobQueueDataException("Error fetching completed jobs", e); + throw new DotDataException("Error fetching completed jobs", e); } } @CloseDBIfOpened @Override - public JobPaginatedResult getCanceledJobs(int page, int pageSize) - throws JobQueueDataException { + public JobPaginatedResult getCanceledJobs(int page, int pageSize) throws DotDataException { try { return jobQueue.getCanceledJobs(page, pageSize); } catch (JobQueueDataException e) { - throw new JobQueueDataException("Error fetching canceled jobs", e); + throw new DotDataException("Error fetching canceled jobs", e); } } @CloseDBIfOpened @Override - public JobPaginatedResult getFailedJobs(int page, int pageSize) - throws JobQueueDataException { + public JobPaginatedResult getFailedJobs(int page, int pageSize) throws DotDataException { try { return jobQueue.getFailedJobs(page, pageSize); } catch (JobQueueDataException e) { - throw new JobQueueDataException("Error fetching failed jobs", e); + throw new DotDataException("Error fetching failed jobs", e); } } @@ -370,37 +378,60 @@ public JobPaginatedResult getFailedJobs(int page, int pageSize) @Override public void cancelJob(final String jobId) throws DotDataException { - final Job job; - try { - job = jobQueue.getJob(jobId); - } catch (JobNotFoundException e) { - throw new DoesNotExistException(e); - } catch (JobQueueDataException e) { - throw new DotDataException("Error fetching job", e); + final Job job = getJob(jobId); + + if (job.state() == JobState.PENDING || job.state() == JobState.RUNNING) { + handleJobCancelRequest(job); + } else { + Logger.warn(this, "Job " + job.id() + " is not in a cancellable state. " + + "Current state: " + job.state()); } - final Optional instance = getInstance(jobId); - if (instance.isPresent()) { - final var processor = instance.get(); - if (processor instanceof Cancellable) { - try { - Logger.info(this, "Cancelling job " + jobId); - ((Cancellable) processor).cancel(job); - handleJobCancelling(job, processor); - } catch (Exception e) { - final var error = new DotDataException("Error cancelling job " + jobId, e); - Logger.error(JobQueueManagerAPIImpl.class, error); - throw error; + } + + /** + * Handles the cancellation of a job based on the given JobCancelRequestEvent. Retrieves the job + * and checks its state. If the job is in a PENDING or RUNNING state, attempts to cancel it by + * leveraging the job's associated processor. Logs and throws exceptions if any issues occur + * during the cancellation process. + * + * @param event The event that triggers the job cancellation request. + */ + @VisibleForTesting + @WrapInTransaction + void onCancelRequestJob(final JobCancelRequestEvent event) { + + try { + + final var job = getJob(event.getJob().id()); + if (job.state() == JobState.PENDING + || job.state() == JobState.RUNNING + || job.state() == JobState.CANCEL_REQUESTED) { + + final Optional instance = getInstance(job.id()); + if (instance.isPresent()) { + final var processor = instance.get(); + if (processor instanceof Cancellable) { + handleJobCancelling(job, processor); + } else { + final var error = new JobCancellationException( + job.id(), "Job is not Cancellable"); + Logger.error(JobQueueManagerAPIImpl.class, error); + throw error; + } + } else { + // In a cluster, the job may be running on another server + Logger.debug(this, + "Job cancellation requested. No processor found for job " + job.id()); } } else { - final var error = new DotDataException("Job " + jobId + " cannot be canceled"); - Logger.error(JobQueueManagerAPIImpl.class, error); - throw error; + Logger.warn(this, "Job " + job.id() + " is not in a cancellable state. " + + "Current state: " + job.state()); } - } else { - Logger.error(this, "No processor found for job " + jobId); - throw new JobProcessorNotFoundException(job.queueName(), jobId); + } catch (DotDataException e) { + throw new JobCancellationException(event.getJob().id(), e); } + } @Override @@ -465,6 +496,24 @@ private void pollJobUpdates() { } } + /** + * Fetches the state of a job from the job queue using the provided job ID. + * + * @param jobId the unique identifier of the job whose state is to be fetched + * @return the current state of the job + * @throws DotDataException if there is an error accessing the job state data + */ + @CloseDBIfOpened + private JobState getJobState(final String jobId) throws DotDataException { + try { + return jobQueue.getJobState(jobId); + } catch (JobNotFoundException e) { + throw new DoesNotExistException(e); + } catch (JobQueueDataException e) { + throw new DotDataException("Error fetching job state", e); + } + } + /** * Updates the progress of a job and notifies its watchers. * @@ -484,7 +533,11 @@ private float updateJobProgress(final Job job, final ProgressTracker progressTra // Only update progress if it has changed if (progress > previousProgress) { - Job updatedJob = job.withProgress(progress); + // Make sure we have the latest state, the job of the running processor won't + // be updated with changes on the state, like a cancel request. + final var latestState = getJobState(job.id()); + + Job updatedJob = job.withProgress(progress).withState(latestState); jobQueue.updateJobProgress(job.id(), updatedJob.progress()); eventProducer.getEvent(JobProgressUpdatedEvent.class).fire( @@ -708,19 +761,14 @@ private void processJob(final Job job) throws DotDataException { } catch (DotDataException e) { throw new DotRuntimeException("Error updating job progress", e); } - }, 0, 1, TimeUnit.SECONDS + }, 0, 2, TimeUnit.SECONDS ); // Process the job processor.process(runningJob); - if (jobQueue.hasJobBeenInState(runningJob.id(), JobState.CANCELLING)) { - handleJobCancellation(runningJob, processor); - } else { - handleJobCompletion(runningJob, processor); - } - //Free up resources - removeInstanceRef(runningJob.id()); + // The job finished processing + handleJobCompletion(runningJob, processor); } catch (Exception e) { Logger.error(this, @@ -729,6 +777,9 @@ private void processJob(final Job job) throws DotDataException { handleJobFailure( runningJob, processor, e, "Job execution" ); + } finally { + //Free up resources + removeInstanceRef(runningJob.id()); } } else { @@ -808,27 +859,54 @@ private void handleJobCompletion(final Job job, final JobProcessor processor) final float progress = getJobProgress(job); - final Job completedJob = job.markAsCompleted(jobResult).withProgress(progress); - updateJobStatus(completedJob); - eventProducer.getEvent(JobCompletedEvent.class).fire( - new JobCompletedEvent(completedJob, LocalDateTime.now()) - ); + try { + if (jobQueue.hasJobBeenInState(job.id(), JobState.CANCEL_REQUESTED, JobState.CANCELLING)) { + Job canceledJob = job.markAsCanceled(jobResult).withProgress(progress); + updateJobStatus(canceledJob); + eventProducer.getEvent(JobCanceledEvent.class).fire( + new JobCanceledEvent(canceledJob, LocalDateTime.now()) + ); + } else { + final Job completedJob = job.markAsCompleted(jobResult).withProgress(progress); + updateJobStatus(completedJob); + eventProducer.getEvent(JobCompletedEvent.class).fire( + new JobCompletedEvent(completedJob, LocalDateTime.now()) + ); + } + } catch (JobQueueDataException e) { + final var errorMessage = "Error updating job status"; + Logger.error(this, errorMessage, e); + throw new DotDataException(errorMessage, e); + } } /** - * Handles the cancellation of a job. + * Handles the request to cancel a job. * - * @param job The job that was canceled. - * @param processor The processor that handled the job. + * @param job The job to cancel. */ @WrapInTransaction - private void handleJobCancelling(final Job job, final JobProcessor processor) - throws DotDataException { + private void handleJobCancelRequest(final Job job) throws DotDataException { - Job cancelJob = job.withState(JobState.CANCELLING); + Job cancelJob = job.withState(JobState.CANCEL_REQUESTED); updateJobStatus(cancelJob); - eventProducer.getEvent(JobCancellingEvent.class).fire( - new JobCancellingEvent(cancelJob, LocalDateTime.now()) + + // Prepare the cancel request events + final JobCancelRequestEvent cancelRequestEvent = new JobCancelRequestEvent( + cancelJob, LocalDateTime.now() + ); + + // LOCAL event + APILocator.getLocalSystemEventsAPI().notify(cancelRequestEvent); + + // CLUSTER WIDE event + Try.run(() -> APILocator.getSystemEventsAPI() + .push(SystemEventType.CLUSTER_WIDE_EVENT, new Payload(cancelRequestEvent))) + .onFailure(e -> Logger.error(JobQueueManagerAPIImpl.this, e.getMessage())); + + // CDI event + eventProducer.getEvent(JobCancelRequestEvent.class).fire( + cancelRequestEvent ); } @@ -839,23 +917,22 @@ private void handleJobCancelling(final Job job, final JobProcessor processor) * @param processor The processor that handled the job. */ @WrapInTransaction - private void handleJobCancellation(final Job job, final JobProcessor processor) - throws DotDataException { + private void handleJobCancelling(final Job job, final JobProcessor processor) { - final var resultMetadata = processor.getResultMetadata(job); + try { + Logger.info(this, "Cancelling job " + job.id()); + ((Cancellable) processor).cancel(job); - JobResult jobResult = null; - if (resultMetadata != null && !resultMetadata.isEmpty()) { - jobResult = JobResult.builder().metadata(resultMetadata).build(); + Job cancelJob = job.withState(JobState.CANCELLING); + updateJobStatus(cancelJob); + eventProducer.getEvent(JobCancellingEvent.class).fire( + new JobCancellingEvent(cancelJob, LocalDateTime.now()) + ); + } catch (DotDataException e) { + final var error = new JobCancellationException(job.id(), e); + Logger.error(this, error); + throw error; } - - final float progress = getJobProgress(job); - - Job canceledJob = job.markAsCanceled(jobResult).withProgress(progress); - updateJobStatus(canceledJob); - eventProducer.getEvent(JobCanceledEvent.class).fire( - new JobCanceledEvent(canceledJob, LocalDateTime.now()) - ); } /** diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancelRequestEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancelRequestEvent.java new file mode 100644 index 000000000000..9554b1e8f573 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancelRequestEvent.java @@ -0,0 +1,38 @@ +package com.dotcms.jobs.business.api.events; + +import com.dotcms.jobs.business.job.Job; +import java.time.LocalDateTime; + +/** + * Event fired when there is a request to cancel a job. + */ +public class JobCancelRequestEvent { + + private final Job job; + private final LocalDateTime canceledOn; + + /** + * Constructs a new JobCancelRequestEvent. + * + * @param job The job to cancel. + * @param canceledOn The timestamp when the cancel request was made. + */ + public JobCancelRequestEvent(Job job, LocalDateTime canceledOn) { + this.job = job; + this.canceledOn = canceledOn; + } + + /** + * @return The job to cancel. + */ + public Job getJob() { + return job; + } + + /** + * @return The timestamp when the cancel request was made. + */ + public LocalDateTime getCanceledOn() { + return canceledOn; + } +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java index 29c9eab2e0b7..4c761158fa0e 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java @@ -3,14 +3,13 @@ import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobState; import com.dotmarketing.util.Logger; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; import java.util.function.Predicate; import javax.enterprise.context.ApplicationScoped; @@ -77,35 +76,90 @@ public class RealTimeJobMonitor { * its own filter predicate. Watchers are automatically removed when a job reaches a final state * (completed, cancelled, or removed).

* + *

Thread Safety:

+ *
    + *
  • This method is thread-safe and can be called concurrently from multiple threads.
  • + *
  • Internally uses {@link CopyOnWriteArrayList} to store watchers, which provides: + *
      + *
    • Thread-safe reads without synchronization - all iteration operations use an immutable snapshot
    • + *
    • Thread-safe modifications - each modification creates a new internal copy
    • + *
    • Memory consistency effects - actions in a thread prior to placing an object into a + * CopyOnWriteArrayList happen-before actions subsequent to the access or removal + * of that element from the CopyOnWriteArrayList in another thread
    • + *
    + *
  • + *
  • The trade-off is that modifications (adding/removing watchers) are more expensive as they + * create a new copy of the internal array, but this is acceptable since: + *
      + *
    • Reads (notifications) are much more frequent than writes (registering/removing watchers)
    • + *
    • The number of watchers per job is typically small
    • + *
    • Registration/removal of watchers is not in the critical path
    • + *
    + *
  • + *
+ * * @param jobId The ID of the job to watch * @param watcher The consumer to be notified of job updates * @param filter Optional predicate to filter job updates (null means receive all updates) - * @throws IllegalArgumentException if jobId or watcher is null + * @throws NullPointerException if jobId or watcher is null * @see Predicates for common filter predicates + * @see CopyOnWriteArrayList for more details about the thread-safety guarantees */ public void registerWatcher(String jobId, Consumer watcher, Predicate filter) { + + Objects.requireNonNull(jobId, "jobId cannot be null"); + Objects.requireNonNull(watcher, "watcher cannot be null"); + jobWatchers.compute(jobId, (key, existingWatchers) -> { List watchers = Objects.requireNonNullElseGet( existingWatchers, - () -> Collections.synchronizedList(new ArrayList<>()) + CopyOnWriteArrayList::new ); - final var jobWatcher = JobWatcher.builder() + watchers.add(JobWatcher.builder() .watcher(watcher) - .filter(filter != null ? filter : job -> true).build(); + .filter(filter != null ? filter : job -> true) + .build()); + + Logger.debug(this, String.format( + "Added watcher for job %s. Total watchers: %d", jobId, watchers.size())); - watchers.add(jobWatcher); return watchers; }); } /** - * Registers a watcher for a specific job that receives all updates. - * This is a convenience method equivalent to calling {@code registerWatcher(jobId, watcher, null)}. + * Registers a watcher for a specific job. The watcher receives all updates for the job. + * + *

Multiple watchers can be registered for the same job. Watchers are automatically removed + * when a job reaches a final state (completed, cancelled, or removed).

+ * + *

Thread Safety:

+ *
    + *
  • This method is thread-safe and can be called concurrently from multiple threads.
  • + *
  • Internally uses {@link CopyOnWriteArrayList} to store watchers, which provides: + *
      + *
    • Thread-safe reads without synchronization - all iteration operations use an immutable snapshot
    • + *
    • Thread-safe modifications - each modification creates a new internal copy
    • + *
    • Memory consistency effects - actions in a thread prior to placing an object into a + * CopyOnWriteArrayList happen-before actions subsequent to the access or removal + * of that element from the CopyOnWriteArrayList in another thread
    • + *
    + *
  • + *
  • The trade-off is that modifications (adding/removing watchers) are more expensive as they + * create a new copy of the internal array, but this is acceptable since: + *
      + *
    • Reads (notifications) are much more frequent than writes (registering/removing watchers)
    • + *
    • The number of watchers per job is typically small
    • + *
    • Registration/removal of watchers is not in the critical path
    • + *
    + *
  • + *
* * @param jobId The ID of the job to watch * @param watcher The consumer to be notified of job updates - * @throws IllegalArgumentException if jobId or watcher is null + * @throws NullPointerException if jobId or watcher is null + * @see CopyOnWriteArrayList for more details about the thread-safety guarantees */ public void registerWatcher(String jobId, Consumer watcher) { registerWatcher(jobId, watcher, null); @@ -150,7 +204,14 @@ private void updateWatchers(Job job) { } } catch (Exception e) { Logger.error(this, "Error notifying job watcher for job " + job.id(), e); + + // Direct remove is thread-safe with CopyOnWriteArrayList watchers.remove(jobWatcher); + + // If this was the last watcher, clean up the map entry + if (watchers.isEmpty()) { + jobWatchers.remove(job.id()); + } } }); } @@ -162,7 +223,13 @@ private void updateWatchers(Job job) { * @param jobId The ID of the job whose watcher is to be removed. */ private void removeWatcher(String jobId) { - jobWatchers.remove(jobId); + + List removed = jobWatchers.remove(jobId); + if (removed != null) { + Logger.debug(this, + String.format("Removed all watchers for job %s. Watchers removed: %d", + jobId, removed.size())); + } } /** @@ -174,6 +241,15 @@ public void onJobStarted(@Observes JobStartedEvent event) { updateWatchers(event.getJob()); } + /** + * Handles the job cancel request event. + * + * @param event The JobCancelRequestEvent. + */ + public void onJobCancelRequest(@Observes JobCancelRequestEvent event) { + updateWatchers(event.getJob()); + } + /** * Handles the job cancelling event. * diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobCancellationException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobCancellationException.java index 449ce08aee29..3b4ac80b19de 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobCancellationException.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobCancellationException.java @@ -6,6 +6,16 @@ */ public class JobCancellationException extends RuntimeException { + /** + * Constructs a new JobCancellationException with the specified job ID and cause. + * + * @param jobId The ID of the job that encountered an error during cancellation + * @param cause The underlying cause of the error (can be null) + */ + public JobCancellationException(String jobId, Throwable cause) { + super("Failed to cancel job " + jobId + ".", cause); + } + /** * Constructs a new JobCancellationException with the specified job ID and reason. * diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobCache.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobCache.java new file mode 100644 index 000000000000..85daa8869659 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobCache.java @@ -0,0 +1,64 @@ +package com.dotcms.jobs.business.job; + +import com.dotmarketing.business.Cachable; + +/** + * Interface for caching job objects. This interface extends the {@link Cachable} interface and + * provides methods for adding, retrieving, and removing jobs from the cache. + */ +public interface JobCache extends Cachable { + + /** + * Adds a job to the cache. + * + * @param job the job to be added to the cache + */ + void put(Job job); + + /** + * Adds a job state to the cache. + * + * @param jobId the ID of the job state to add to the cache + * @param jobState the state of the job to add to the cache + */ + void putState(String jobId, JobState jobState); + + /** + * Retrieves a job from the cache by its ID. + * + * @param jobId the ID of the job to be retrieved + * @return the job associated with the given ID, or null if no such job exists in the cache + */ + Job get(String jobId); + + /** + * Retrieves the state of a job from the cache by its ID. + * + * @param jobId the ID of the job whose state is to be retrieved + * @return the state of the job associated with the given ID, or null if no such state exists in + * the cache + */ + JobState getState(String jobId); + + /** + * Removes a job from the cache. + * + * @param job the job to be removed from the cache + */ + void remove(Job job); + + /** + * Removes a job from the cache by its ID. + * + * @param jobId the ID of the job to be removed + */ + void remove(String jobId); + + /** + * Removes a job state from the cache by its ID. + * + * @param jobId the ID of the job state to be removed + */ + void removeState(final String jobId); + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobCacheImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobCacheImpl.java new file mode 100644 index 000000000000..68d8cf3bb46e --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobCacheImpl.java @@ -0,0 +1,90 @@ +package com.dotcms.jobs.business.job; + +import com.dotcms.util.DotPreconditions; +import com.dotmarketing.business.CacheLocator; +import com.dotmarketing.business.DotCacheAdministrator; +import com.dotmarketing.business.DotCacheException; + +public class JobCacheImpl implements JobCache { + + static final String JOB_BY_ID = "JOB_BY_ID_"; + static final String JOB_STATE_BY_ID = "JOB_STATE_BY_ID_"; + + @Override + public String getPrimaryGroup() { + return "JobCacheImpl"; + } + + @Override + public String[] getGroups() { + return new String[0]; + } + + private String getKeyById(final String id) { + return JOB_BY_ID + id; + } + + private String getStateKeyById(final String id) { + return JOB_STATE_BY_ID + id; + } + + @Override + public void put(final Job job) { + DotCacheAdministrator cache = CacheLocator.getCacheAdministrator(); + cache.put(getKeyById(job.id()), job, getPrimaryGroup()); + } + + @Override + public void putState(final String jobId, final JobState jobState) { + DotCacheAdministrator cache = CacheLocator.getCacheAdministrator(); + cache.put(getStateKeyById(jobId), jobState, getPrimaryGroup()); + } + + @Override + public Job get(final String jobId) { + DotPreconditions.checkNotNull(jobId); + + try { + DotCacheAdministrator cache = CacheLocator.getCacheAdministrator(); + return (Job) cache.get(getKeyById(jobId), getPrimaryGroup()); + } catch (DotCacheException e) { + return null; + } + } + + @Override + public JobState getState(final String jobId) { + DotPreconditions.checkNotNull(jobId); + + try { + DotCacheAdministrator cache = CacheLocator.getCacheAdministrator(); + return (JobState) cache.get(getStateKeyById(jobId), getPrimaryGroup()); + } catch (DotCacheException e) { + return null; + } + } + + @Override + public void remove(final Job job) { + remove(job.id()); + } + + @Override + public void remove(final String jobId) { + DotCacheAdministrator cache = CacheLocator.getCacheAdministrator(); + cache.remove(getKeyById(jobId), getPrimaryGroup()); + } + + @Override + public void removeState(final String jobId) { + DotCacheAdministrator cache = CacheLocator.getCacheAdministrator(); + cache.remove(getStateKeyById(jobId), getPrimaryGroup()); + } + + @Override + public void clearCache() { + DotCacheAdministrator cache = CacheLocator.getCacheAdministrator(); + cache.flushGroup(getPrimaryGroup()); + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java index 0bbdb4c50a0f..c13b8a6d3587 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java @@ -15,11 +15,6 @@ public enum JobState { */ RUNNING, - /** - * The job is currently being canceled. - */ - CANCELLING, - /** * The job has finished executing successfully. */ @@ -30,6 +25,16 @@ public enum JobState { */ FAILED, + /** + * The job is waiting to be canceled. + */ + CANCEL_REQUESTED, + + /** + * The job is currently being canceled. + */ + CANCELLING, + /** * The job was canceled before it could complete. */ diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java index 5c586ee62a59..3c8b446f8b39 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java @@ -1,5 +1,6 @@ package com.dotcms.jobs.business.processor.impl; +import com.dotcms.contenttype.exception.NotFoundInDbException; import com.dotcms.contenttype.model.type.ContentType; import com.dotcms.jobs.business.error.JobCancellationException; import com.dotcms.jobs.business.error.JobProcessingException; @@ -20,8 +21,8 @@ import com.dotmarketing.exception.DotHibernateException; import com.dotmarketing.exception.DotSecurityException; import com.dotmarketing.portlets.contentlet.action.ImportAuditUtil; +import com.dotmarketing.portlets.languagesmanager.model.Language; import com.dotmarketing.util.AdminLogger; -import com.dotmarketing.util.FileUtil; import com.dotmarketing.util.ImportUtil; import com.dotmarketing.util.Logger; import com.google.common.hash.Hashing; @@ -138,11 +139,8 @@ public void process(final Job job) throws JobProcessingException { // Validate the job has the required data validate(job); - final var language = getLanguage(job); final var fileToImport = tempFile.get().file; final long totalLines = totalLines(job, fileToImport); - final Charset charset = language == -1 ? - Charset.defaultCharset() : FileUtil.detectEncodeType(fileToImport); // Create a progress callback function final var progressTracker = job.progressTracker().orElseThrow( @@ -154,11 +152,10 @@ public void process(final Job job) throws JobProcessingException { progressTracker.updateProgress(Math.min(1.0f, Math.max(0.0f, progressPercentage))); }; - if (CMD_PREVIEW.equals(command)) { - handlePreview(job, language, fileToImport, charset, user, progressCallback); - } else if (CMD_PUBLISH.equals(command)) { - handlePublish(job, language, fileToImport, charset, user, progressCallback); - } + // Handle the import operation based on the command, by default any command that is not + // "publish" is considered preview. + final boolean isPublish = CMD_PUBLISH.equals(command); + handleImport(!isPublish, job, fileToImport, user, progressCallback); if (!cancellationRequested.get()) { // Ensure the progress is at 100% when the job is done @@ -200,76 +197,34 @@ public Map getResultMetadata(Job job) { } /** - * Handles the preview phase of content import. This method analyzes the CSV file and provides - * information about potential issues without actually importing the content. + * Handles the content import. Depending on the preview flag, this method will either analyze + * the content for potential issues or perform the actual import operation. * + * @param preview Flag indicating whether the operation is a preview or publish * @param job The import job configuration - * @param language The target language for import * @param fileToImport The CSV file to be imported - * @param charset The character encoding of the import file * @param user The user performing the import * @param progressCallback Callback for tracking import progress */ - private void handlePreview(final Job job, long language, final File fileToImport, - final Charset charset, final User user, final LongConsumer progressCallback) { - - try { - try (Reader reader = new BufferedReader( - new InputStreamReader(new FileInputStream(fileToImport), charset))) { - - CsvReader csvReader = createCsvReader(reader); - CsvHeaderInfo headerInfo = processHeadersBasedOnLanguage(job, language, csvReader); - - final var previewResult = generatePreview(job, user, - headerInfo.headers, csvReader, headerInfo.languageCodeColumn, - headerInfo.countryCodeColumn, progressCallback); - resultMetadata = new HashMap<>(previewResult); - } - } catch (Exception e) { + private void handleImport(final boolean preview, final Job job, final File fileToImport, + final User user, final LongConsumer progressCallback) { - try { - HibernateUtil.rollbackTransaction(); - } catch (DotHibernateException he) { - Logger.error(this, he.getMessage(), he); - } - - final var errorMessage = "An error occurred when analyzing the CSV file."; - Logger.error(this, errorMessage, e); - throw new JobProcessingException(job.id(), errorMessage, e); + if (!preview) { + AdminLogger.log( + ImportContentletsProcessor.class, "process", + "Importing Contentlets", user + ); } - } - - /** - * Handles the publish phase of content import. This method performs the actual content import - * operation, creating or updating content based on the CSV file. - * - * @param job The import job configuration - * @param language The target language for import - * @param fileToImport The CSV file to be imported - * @param charset The character encoding of the import file - * @param user The user performing the import - * @param progressCallback Callback for tracking import progress - */ - private void handlePublish(final Job job, long language, final File fileToImport, - final Charset charset, final User user, final LongConsumer progressCallback) { - AdminLogger.log( - ImportContentletsProcessor.class, "process", - "Importing Contentlets", user - ); + try (Reader reader = new BufferedReader( + new InputStreamReader(new FileInputStream(fileToImport), + Charset.defaultCharset()))) { - try { - try (Reader reader = new BufferedReader( - new InputStreamReader(new FileInputStream(fileToImport), charset))) { + CsvReader csvReader = createCsvReader(reader); - CsvReader csvReader = createCsvReader(reader); - CsvHeaderInfo headerInfo = readPublishHeaders(language, csvReader); - - final var importResults = processFile(job, user, headerInfo.headers, csvReader, - headerInfo.languageCodeColumn, headerInfo.countryCodeColumn, - progressCallback); - resultMetadata = new HashMap<>(importResults); - } + final var importResults = processImport(preview, job, user, csvReader, + progressCallback); + resultMetadata = new HashMap<>(importResults); } catch (Exception e) { try { @@ -278,7 +233,8 @@ private void handlePublish(final Job job, long language, final File fileToImport Logger.error(this, he.getMessage(), he); } - final var errorMessage = "An error occurred when importing the CSV file."; + final var errorMessage = String.format("An error occurred when %s the CSV file.", + preview ? "analyzing" : "importing"); Logger.error(this, errorMessage, e); throw new JobProcessingException(job.id(), errorMessage, e); } finally { @@ -288,75 +244,42 @@ private void handlePublish(final Job job, long language, final File fileToImport } /** - * Reads and analyzes the content of the CSV import file to determine potential errors, - * inconsistencies or warnings, and provide the user with useful information regarding the - * contents of the file. - * - * @param job - The {@link Job} being processed. - * @param user - The {@link User} performing this action. - * @param csvHeaders - The headers that make up the CSV file. - * @param csvReader - The actual data contained in the CSV file. - * @param languageCodeHeaderColumn - The column name containing the language code. - * @param countryCodeHeaderColumn - The column name containing the country code. - * @param progressCallback - The callback function to update the progress of the job. - * @throws DotDataException An error occurred when analyzing the CSV file. - */ - private Map> generatePreview(final Job job, final User user, - final String[] csvHeaders, final CsvReader csvReader, - final int languageCodeHeaderColumn, int countryCodeHeaderColumn, - final LongConsumer progressCallback) throws DotDataException { - - final var currentSiteId = getSiteIdentifier(job); - final var currentSiteName = getSiteName(job); - final var contentType = getContentType(job); - final var fields = getFields(job); - final var language = getLanguage(job); - final var workflowActionId = getWorkflowActionId(job); - final var httpReq = JobUtil.generateMockRequest(user, currentSiteName); - - Logger.info(this, "-------- Starting Content Import Preview -------- "); - Logger.info(this, String.format("-> Content Type ID: %s", contentType)); - - return ImportUtil.importFile(0L, currentSiteId, contentType, fields, true, - (language == -1), user, language, csvHeaders, csvReader, languageCodeHeaderColumn, - countryCodeHeaderColumn, workflowActionId, httpReq, progressCallback); - } - - /** - * Executes the content import process after the review process has been run and displayed to - * the user. + * Executes the content import for a preview or publish operation. This method processes the + * CSV file and imports the content into dotCMS or reviews the content for potential issues. * + * @param preview - Flag indicating whether the operation is a preview or publish * @param job - The {@link Job} being processed. * @param user - The {@link User} performing this action. - * @param csvHeaders - The headers that make up the CSV file. * @param csvReader - The actual data contained in the CSV file. - * @param languageCodeHeaderColumn - The column name containing the language code. - * @param countryCodeHeaderColumn - The column name containing the country code. * @param progressCallback - The callback function to update the progress of the job. * @return The status of the content import performed by dotCMS. This provides information * regarding inconsistencies, errors, warnings and/or precautions to the user. * @throws DotDataException An error occurred when importing the CSV file. */ - private Map> processFile(final Job job, final User user, - final String[] csvHeaders, final CsvReader csvReader, - final int languageCodeHeaderColumn, final int countryCodeHeaderColumn, - final LongConsumer progressCallback) throws DotDataException { + private Map> processImport(final boolean preview, final Job job, + final User user, final CsvReader csvReader, final LongConsumer progressCallback) + throws DotDataException, IOException, DotSecurityException { final var currentSiteId = getSiteIdentifier(job); final var currentSiteName = getSiteName(job); - final var contentType = getContentType(job); + final var contentType = findContentType(job); final var fields = getFields(job); - final var language = getLanguage(job); + final var language = findLanguage(job); final var workflowActionId = getWorkflowActionId(job); final var httpReq = JobUtil.generateMockRequest(user, currentSiteName); final var importId = jobIdToLong(job.id()); - Logger.info(this, "-------- Starting Content Import Process -------- "); - Logger.info(this, String.format("-> Content Type ID: %s", contentType)); + // Read headers and process language columns for multilingual imports + CsvHeaderInfo headerInfo = readHeaders(job, language == null, csvReader); - return ImportUtil.importFile(importId, currentSiteId, contentType, fields, false, - (language == -1), user, language, csvHeaders, csvReader, languageCodeHeaderColumn, - countryCodeHeaderColumn, workflowActionId, httpReq, progressCallback); + Logger.info(this, String.format("-------- Starting Content Import %s -------- ", + preview ? "Preview" : "Process")); + Logger.info(this, String.format("-> Content Type: %s", contentType.variable())); + + return ImportUtil.importFile(importId, currentSiteId, contentType.id(), fields, preview, + language == null, user, language == null ? -1 : language.getId(), + headerInfo.headers, csvReader, headerInfo.languageCodeColumn, + headerInfo.countryCodeColumn, workflowActionId, httpReq, progressCallback); } /** @@ -428,26 +351,19 @@ private String getWorkflowActionId(final Job job) { } /** - * Retrieves the language setting from the job parameters. Handles both string and long - * parameter types. + * Retrieves the language from the job parameters. * * @param job The job containing the parameters - * @return The language ID as a long, or -1 if not specified + * @return An optional containing the language string, or an empty optional if not present */ - private long getLanguage(final Job job) { + private Optional getLanguage(final Job job) { if (!job.parameters().containsKey(PARAMETER_LANGUAGE) || job.parameters().get(PARAMETER_LANGUAGE) == null) { - return -1; + return Optional.empty(); } - final Object language = job.parameters().get(PARAMETER_LANGUAGE); - - if (language instanceof String) { - return Long.parseLong((String) language); - } - - return (long) language; + return Optional.of((String) job.parameters().get(PARAMETER_LANGUAGE)); } /** @@ -481,23 +397,38 @@ public String[] getFields(final Job job) { */ private void validate(final Job job) { + // Validating the language (will throw an exception if it doesn't) + final Language language = findLanguage(job); + if (getContentType(job) != null && getContentType(job).isEmpty()) { - Logger.error(this.getClass(), "A Content Type is required"); - throw new JobValidationException(job.id(), "A Content Type is required"); + final var errorMessage = "A Content Type id or variable is required"; + Logger.error(this.getClass(), errorMessage); + throw new JobValidationException(job.id(), errorMessage); } else if (getWorkflowActionId(job) != null && getWorkflowActionId(job).isEmpty()) { - Logger.error(this.getClass(), "Workflow action type is required"); - throw new JobValidationException(job.id(), "Workflow action type is required"); + final var errorMessage = "A Workflow Action id is required"; + Logger.error(this.getClass(), errorMessage); + throw new JobValidationException(job.id(), errorMessage); + } else if (language == null && getFields(job).length == 0) { + final var errorMessage = + "A key identifying the different Language versions of the same " + + "content must be defined when importing multilingual files."; + Logger.error(this, errorMessage); + throw new JobValidationException(job.id(), errorMessage); } - // Security measure to prevent invalid attempts to import a host. try { + + // Make sure the content type exist (will throw an exception if it doesn't) + final var contentTypeFound = findContentType(job); + + // Security measure to prevent invalid attempts to import a host. final ContentType hostContentType = APILocator.getContentTypeAPI( - APILocator.systemUser()).find(Host.HOST_VELOCITY_VAR_NAME - ); - final boolean isHost = (hostContentType.id().equals(getContentType(job))); + APILocator.systemUser()).find(Host.HOST_VELOCITY_VAR_NAME); + final boolean isHost = (hostContentType.id().equals(contentTypeFound.id())); if (isHost) { - Logger.error(this, "Invalid attempt to import a host."); - throw new JobValidationException(job.id(), "Invalid attempt to import a host."); + final var errorMessage = "Invalid attempt to import a host."; + Logger.error(this, errorMessage); + throw new JobValidationException(job.id(), errorMessage); } } catch (DotSecurityException | DotDataException e) { throw new JobProcessingException(job.id(), "Error validating content type", e); @@ -547,28 +478,33 @@ private Long totalLines(final Job job, final File dotTempFile) { } /** - * Reads and processes headers for publishing operation. + * Reads and processes headers from the CSV file. Handles both single and multilingual content + * imports. * - * @param language The target language for import - * @param csvreader The CSV reader containing the file data + * @param job The current import job + * @param isMultilingual Flag indicating whether the import is multilingual + * @param csvReader The CSV reader containing the file data * @return CsvHeaderInfo containing processed header information * @throws IOException if an error occurs reading the CSV file */ - private CsvHeaderInfo readPublishHeaders(long language, CsvReader csvreader) + private CsvHeaderInfo readHeaders(final Job job, boolean isMultilingual, CsvReader csvReader) throws IOException { - if (language == -1 && csvreader.readHeaders()) { - return findLanguageColumnsInHeaders(csvreader.getHeaders()); + + if (isMultilingual) { + return processMultilingualHeaders(job, csvReader); } + return new CsvHeaderInfo(null, -1, -1); } /** * Locates language-related columns in CSV headers. * + * @param job The current import job * @param headers Array of CSV header strings * @return CsvHeaderInfo containing the positions of language and country code columns */ - private CsvHeaderInfo findLanguageColumnsInHeaders(String[] headers) { + private CsvHeaderInfo findLanguageColumnsInHeaders(Job job, String[] headers) { int languageCodeColumn = -1; int countryCodeColumn = -1; @@ -585,6 +521,7 @@ private CsvHeaderInfo findLanguageColumnsInHeaders(String[] headers) { } } + validateLanguageColumns(job, languageCodeColumn, countryCodeColumn); return new CsvHeaderInfo(headers, languageCodeColumn, countryCodeColumn); } @@ -600,39 +537,6 @@ private CsvReader createCsvReader(final Reader reader) { return csvreader; } - /** - * Processes CSV headers based on the specified language configuration. - * - * @param job The current import job - * @param language The target language for import - * @param csvReader The CSV reader to process headers from - * @return CsvHeaderInfo containing processed header information - * @throws IOException if an error occurs reading the CSV file - */ - private CsvHeaderInfo processHeadersBasedOnLanguage(final Job job, final long language, - final CsvReader csvReader) throws IOException { - if (language != -1) { - validateLanguage(job, language); - return new CsvHeaderInfo(null, -1, -1); - } - - return processMultilingualHeaders(job, csvReader); - } - - /** - * Validates the language configuration for import operations. - * - * @param job The current import job - * @param language The language identifier to validate - */ - private void validateLanguage(Job job, long language) { - if (language == 0) { - final var errorMessage = "Please select a valid Language."; - Logger.error(this, errorMessage); - throw new JobValidationException(job.id(), errorMessage); - } - } - /** * Processes headers for multilingual content imports. * @@ -644,14 +548,6 @@ private void validateLanguage(Job job, long language) { private CsvHeaderInfo processMultilingualHeaders(final Job job, final CsvReader csvReader) throws IOException { - if (getFields(job).length == 0) { - final var errorMessage = - "A key identifying the different Language versions of the same " - + "content must be defined when importing multilingual files."; - Logger.error(this, errorMessage); - throw new JobValidationException(job.id(), errorMessage); - } - if (!csvReader.readHeaders()) { final var errorMessage = "An error occurred when attempting to read the CSV file headers."; Logger.error(this, errorMessage); @@ -659,35 +555,7 @@ private CsvHeaderInfo processMultilingualHeaders(final Job job, final CsvReader } String[] headers = csvReader.getHeaders(); - return findLanguageColumns(job, headers); - } - - /** - * Locates language-related columns in CSV headers. - * - * @param headers Array of CSV header strings - * @return CsvHeaderInfo containing the positions of language and country code columns - */ - private CsvHeaderInfo findLanguageColumns(Job job, String[] headers) - throws JobProcessingException { - - int languageCodeColumn = -1; - int countryCodeColumn = -1; - - for (int column = 0; column < headers.length; ++column) { - if (headers[column].equals(LANGUAGE_CODE_HEADER)) { - languageCodeColumn = column; - } - if (headers[column].equals(COUNTRY_CODE_HEADER)) { - countryCodeColumn = column; - } - if (languageCodeColumn != -1 && countryCodeColumn != -1) { - break; - } - } - - validateLanguageColumns(job, languageCodeColumn, countryCodeColumn); - return new CsvHeaderInfo(headers, languageCodeColumn, countryCodeColumn); + return findLanguageColumnsInHeaders(job, headers); } /** @@ -708,6 +576,85 @@ private void validateLanguageColumns(Job job, int languageCodeColumn, int countr } } + /** + * Retrieves the existing content type based on an id or variable. + * + * @param job The current import job. + * @return The existing content type if found, otherwise fails with an exception. + * @throws DotSecurityException If there are security restrictions preventing the evaluation. + */ + private ContentType findContentType(final Job job) + throws DotSecurityException { + + final var contentTypeIdOrVar = getContentType(job); + final User user; + + // Retrieving the user requesting the import + try { + user = getUser(job); + } catch (DotDataException e) { + final var errorMessage = "Error retrieving user."; + Logger.error(this.getClass(), errorMessage); + throw new JobProcessingException(job.id(), errorMessage, e); + } + + try { + return APILocator.getContentTypeAPI(user, true) + .find(contentTypeIdOrVar); + } catch (NotFoundInDbException e) { + final var errorMessage = String.format( + "Content Type [%s] not found.", contentTypeIdOrVar + ); + Logger.error(this.getClass(), errorMessage); + throw new JobValidationException(job.id(), errorMessage); + } catch (DotDataException e) { + final var errorMessage = String.format( + "Error finding Content Type [%s].", contentTypeIdOrVar + ); + Logger.error(this.getClass(), errorMessage); + throw new JobProcessingException(job.id(), errorMessage, e); + } + } + + /** + * Retrieves the existing language based on an id or ISO code. + * + * @param job The current import job. + * @return The existing language if found, otherwise fails with an exception. + */ + private Language findLanguage(final Job job) { + + // Read the language from the job parameters + final var languageIsoOrIdOptional = getLanguage(job); + if (languageIsoOrIdOptional.isEmpty()) { + return null; + } + + final var languageIsoOrId = languageIsoOrIdOptional.get(); + if (languageIsoOrId.equals("-1")) { + return null; + } + + // Retrieve the language based on the provided ISO code or ID + Language foundLanguage; + if (!languageIsoOrId.contains("-")) { + foundLanguage = APILocator.getLanguageAPI().getLanguage(languageIsoOrId); + } else { + final String[] codes = languageIsoOrId.split("[_|-]"); + foundLanguage = APILocator.getLanguageAPI().getLanguage(codes[0], codes[1]); + } + + if (foundLanguage != null && foundLanguage.getId() > 0) { + return foundLanguage; + } + + final var errorMessage = String.format( + "Language [%s] not found.", languageIsoOrId + ); + Logger.error(this.getClass(), errorMessage); + throw new JobValidationException(job.id(), errorMessage); + } + /** * Container class for CSV header information, particularly for handling language-related * columns in multilingual imports. diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/LargeFileReader.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/LargeFileReader.java index 36245a0c4ba6..1fc4b9708984 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/LargeFileReader.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/LargeFileReader.java @@ -45,6 +45,12 @@ public void process(Job job) { final DotTempFile dotTempFile = tempFile.get(); doReadLargeFile(dotTempFile, nLines, maxLines, job); + + if (!working) { + Logger.info(this.getClass(), "Job cancelled: " + job.id()); + // Adding some delay to simulate some cancellation processing, this demo is too fast + delay(3000); + } } /** @@ -77,7 +83,7 @@ private void doReadLargeFile(DotTempFile dotTempFile, int nLines, int maxLines , if (lineCount == nLines) { lineCount = 0; // Reset the counter Logger.debug(this.getClass(), line); - delay(); + delay(1000); } final float progressPercentage = ((float) readCount / totalCount); progressTracker.ifPresent(tracker -> tracker.updateProgress(progressPercentage)); @@ -112,9 +118,9 @@ private Long countLines(DotTempFile dotTempFile) { return totalCount; } - private void delay() { + private void delay(final long millis) { Try.of(()->{ - Thread.sleep(1000); + Thread.sleep(millis); return null; }).onFailure(e->Logger.error(this.getClass(), "Error during delay", e)); } diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java index af9d434bfcb7..0e792c1424fc 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java @@ -40,6 +40,20 @@ String createJob(String queueName, Map parameters) */ Job getJob(String jobId) throws JobNotFoundException, JobQueueDataException; + /** + * Retrieves the current state of a specific job. + *

+ * If only the status is required, this method has better performance than + * {@link #getJob(String)} as it uses a cache that is only cleared on status changes, whereas + * {@link #getJob(String)} uses a cache that is cleared on any job change. + * + * @param jobId The ID of the job whose state is being queried. + * @return The current state of the job as a JobState enum. + * @throws JobNotFoundException if the job with the given ID is not found. + * @throws JobQueueDataException if there's a data storage error while fetching the job state. + */ + JobState getJobState(final String jobId) throws JobNotFoundException, JobQueueDataException; + /** * Retrieves a list of active jobs for a specific queue. * @@ -176,10 +190,10 @@ List getUpdatedJobsSince(Set jobIds, LocalDateTime since) * Checks if a job has ever been in a specific state. * * @param jobId The ID of the job to check. - * @param state The state to check for. + * @param states The states to check for. * @return true if the job has been in the specified state, false otherwise. * @throws JobQueueDataException if there's an error accessing the job data. */ - boolean hasJobBeenInState(String jobId, JobState state) throws JobQueueDataException; + boolean hasJobBeenInState(String jobId, JobState... states) throws JobQueueDataException; } \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java index dfc179106563..10298c44e578 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java @@ -8,9 +8,11 @@ import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotcms.jobs.business.queue.error.JobQueueException; import com.dotmarketing.business.APILocator; +import com.dotmarketing.business.CacheLocator; import com.dotmarketing.common.db.DotConnect; import com.dotmarketing.exception.DotDataException; import com.dotmarketing.util.Logger; +import com.dotmarketing.util.UtilMethods; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; @@ -148,7 +150,7 @@ public class PostgresJobQueue implements JobQueue { + " WHERE id = ?"; private static final String HAS_JOB_BEEN_IN_STATE_QUERY = "SELECT " - + "EXISTS (SELECT 1 FROM job_history WHERE job_id = ? AND state = ?)"; + + "EXISTS (SELECT 1 FROM job_history WHERE job_id = ? AND state IN $??$)"; private static final String COLUMN_TOTAL_COUNT = "total_count"; @@ -224,6 +226,12 @@ public String createJob(final String queueName, final Map parame @Override public Job getJob(final String jobId) throws JobNotFoundException, JobQueueDataException { + // Check cache first + Job job = CacheLocator.getJobCache().get(jobId); + if (UtilMethods.isSet(job)) { + return job; + } + try { DotConnect dc = new DotConnect(); dc.setSQL(SELECT_JOB_BY_ID_QUERY); @@ -231,7 +239,13 @@ public Job getJob(final String jobId) throws JobNotFoundException, JobQueueDataE List> results = dc.loadObjectResults(); if (!results.isEmpty()) { - return DBJobTransformer.toJob(results.get(0)); + + job = DBJobTransformer.toJob(results.get(0)); + + // Cache the job + CacheLocator.getJobCache().put(job); + + return job; } Logger.warn(this, "Job with id: " + jobId + " not found"); @@ -242,6 +256,24 @@ public Job getJob(final String jobId) throws JobNotFoundException, JobQueueDataE } } + @Override + public JobState getJobState(final String jobId) + throws JobNotFoundException, JobQueueDataException { + + // Check cache first + JobState jobState = CacheLocator.getJobCache().getState(jobId); + if (UtilMethods.isSet(jobState)) { + return jobState; + } + + final var job = getJob(jobId); + + // Cache the job state + CacheLocator.getJobCache().putState(job.id(), job.state()); + + return job.state(); + } + @Override public JobPaginatedResult getActiveJobs(final String queueName, final int page, final int pageSize) throws JobQueueDataException { @@ -463,6 +495,11 @@ public void updateJobStatus(final Job job) throws JobQueueDataException { || job.state() == JobState.CANCELED) { removeJobFromQueue(job.id()); } + + // Cleanup cache + CacheLocator.getJobCache().remove(job); + CacheLocator.getJobCache().removeState(job.id()); + } catch (DotDataException e) { Logger.error(this, "Database error while updating job status", e); throw new JobQueueDataException("Database error while updating job status", e); @@ -554,6 +591,10 @@ public void updateJobProgress(final String jobId, final float progress) dc.addParam(Timestamp.valueOf(LocalDateTime.now())); dc.addParam(jobId); dc.loadResult(); + + // Cleanup cache + CacheLocator.getJobCache().remove(jobId); + } catch (DotDataException e) { Logger.error(this, "Database error while updating job progress", e); throw new JobQueueDataException("Database error while updating job progress", e); @@ -575,13 +616,25 @@ public void removeJobFromQueue(final String jobId) throws JobQueueDataException } @Override - public boolean hasJobBeenInState(String jobId, JobState state) throws JobQueueDataException { + public boolean hasJobBeenInState(final String jobId, final JobState... states) + throws JobQueueDataException { + + if (states.length == 0) { + return false; + } + + String parameters = String.join(", ", Collections.nCopies(states.length, "?")); + + var query = HAS_JOB_BEEN_IN_STATE_QUERY + .replace(REPLACE_TOKEN_PARAMETERS, "(" + parameters + ")"); try { DotConnect dc = new DotConnect(); - dc.setSQL(HAS_JOB_BEEN_IN_STATE_QUERY); + dc.setSQL(query); dc.addParam(jobId); - dc.addParam(state.name()); + for (JobState state : states) { + dc.addParam(state.name()); + } List> results = dc.loadObjectResults(); if (!results.isEmpty()) { @@ -604,7 +657,7 @@ public boolean hasJobBeenInState(String jobId, JobState state) throws JobQueueDa * @return A JobPaginatedResult instance * @throws DotDataException If there is an error loading the query results */ - private static JobPaginatedResult jobPaginatedResult( + private JobPaginatedResult jobPaginatedResult( int page, int pageSize, DotConnect dc) throws DotDataException { final var results = dc.loadObjectResults(); diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java b/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java index 6a6ec3650e4e..c19e19056153 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java @@ -12,6 +12,8 @@ import com.dotmarketing.util.UtilMethods; import com.dotmarketing.util.WebKeys; import com.liferay.portal.model.User; +import java.math.BigDecimal; +import java.math.RoundingMode; import java.util.List; import java.util.Map; import java.util.Optional; @@ -102,4 +104,20 @@ public static HttpServletRequest generateMockRequest(final User user, final Stri return requestProxy; } + /** + * Helper method to round the progress to 3 decimal places. + * + * @param progress The progress value to round + * @return The rounded progress value + */ + public static float roundedProgress(final float progress) { + + // Round the progress to 3 decimal places + final var roundedProgress = BigDecimal.valueOf(progress) + .setScale(3, RoundingMode.HALF_UP) + .floatValue(); + + return Math.round(roundedProgress * 1000f) / 1000f; + } + } diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java index 1902207ce186..534986fb7b37 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java @@ -1,5 +1,7 @@ package com.dotcms.rest.api.v1.job; +import static com.dotcms.jobs.business.util.JobUtil.roundedProgress; + import com.dotcms.jobs.business.api.JobProcessorScanner; import com.dotcms.jobs.business.api.JobQueueManagerAPI; import com.dotcms.jobs.business.error.JobProcessorNotFoundException; @@ -8,7 +10,6 @@ import com.dotcms.jobs.business.job.JobState; import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.Queue; -import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotcms.rest.api.v1.temp.DotTempFile; import com.dotcms.rest.api.v1.temp.TempFileAPI; import com.dotmarketing.business.APILocator; @@ -19,6 +20,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; import com.liferay.portal.model.User; +import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Constructor; import java.time.format.DateTimeFormatter; @@ -32,7 +34,10 @@ import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.MediaType; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; +import org.glassfish.jersey.media.sse.EventOutput; +import org.glassfish.jersey.media.sse.OutboundEvent; /** * Helper class for interacting with the job queue system. This class provides methods for creating, cancelling, and listing jobs. @@ -241,7 +246,7 @@ void watchJob(String jobId, Consumer watcher) { JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) { try { return jobQueueManagerAPI.getActiveJobs(queueName, page, pageSize); - } catch (JobQueueDataException e) { + } catch (DotDataException e) { Logger.error(this.getClass(), "Error fetching active jobs", e); } return JobPaginatedResult.builder().build(); @@ -273,7 +278,7 @@ JobPaginatedResult getJobs(int page, int pageSize) { JobPaginatedResult getActiveJobs(int page, int pageSize) { try { return jobQueueManagerAPI.getActiveJobs(page, pageSize); - } catch (JobQueueDataException e) { + } catch (DotDataException e) { Logger.error(this.getClass(), "Error fetching active jobs", e); } return JobPaginatedResult.builder().build(); @@ -289,7 +294,7 @@ JobPaginatedResult getActiveJobs(int page, int pageSize) { JobPaginatedResult getCompletedJobs(int page, int pageSize) { try { return jobQueueManagerAPI.getCompletedJobs(page, pageSize); - } catch (JobQueueDataException e) { + } catch (DotDataException e) { Logger.error(this.getClass(), "Error fetching completed jobs", e); } return JobPaginatedResult.builder().build(); @@ -305,7 +310,7 @@ JobPaginatedResult getCompletedJobs(int page, int pageSize) { JobPaginatedResult getCanceledJobs(int page, int pageSize) { try { return jobQueueManagerAPI.getCanceledJobs(page, pageSize); - } catch (JobQueueDataException e) { + } catch (DotDataException e) { Logger.error(this.getClass(), "Error fetching canceled jobs", e); } return JobPaginatedResult.builder().build(); @@ -321,7 +326,7 @@ JobPaginatedResult getCanceledJobs(int page, int pageSize) { JobPaginatedResult getFailedJobs(int page, int pageSize) { try { return jobQueueManagerAPI.getFailedJobs(page, pageSize); - } catch (JobQueueDataException e) { + } catch (DotDataException e) { Logger.error(this.getClass(), "Error fetching failed jobs", e); } return JobPaginatedResult.builder().build(); @@ -363,9 +368,9 @@ void handleUploadIfPresent(final JobParams form, Map params, Htt * @param job The job * @return true if the job is watchable, false otherwise */ - public boolean isNotWatchable(Job job){ + boolean isNotWatchable(Job job) { return JobState.PENDING != job.state() && JobState.RUNNING != job.state() - && JobState.CANCELLING != job.state(); + && JobState.CANCEL_REQUESTED != job.state() && JobState.CANCELLING != job.state(); } /** @@ -373,14 +378,82 @@ public boolean isNotWatchable(Job job){ * @param job The job * @return The status info */ - public Map getJobStatusInfo(Job job) { + Map getJobStatusInfo(Job job) { final DateTimeFormatter isoFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME; return Map.of( - "startedAt", job.startedAt().map(isoFormatter::format).orElse("N/A"), - "finishedAt", job.completedAt().map(isoFormatter::format).orElse("N/A"), "state", job.state(), - "progress", job.progress() + "progress", roundedProgress(job.progress()), + "startedAt", job.startedAt().map(isoFormatter::format).orElse("N/A"), + "finishedAt", job.completedAt().map(isoFormatter::format).orElse("N/A") ); } + /** + * Get the job for the given ID + * + * @param jobId The ID of the job + * @return The job or null if it doesn't exist + */ + Job getJobForSSE(final String jobId) throws DotDataException { + + Job job = null; + + try { + job = getJob(jobId); + } catch (DoesNotExistException e) { + // ignore + } + + return job; + } + + /** + * Send an error event and close the connection + * + * @param errorName The name of the error event + * @param errorCode The error code + * @param eventOutput The event output + */ + void sendErrorAndClose(final String errorName, final String errorCode, + final EventOutput eventOutput) { + + try { + OutboundEvent event = new OutboundEvent.Builder() + .mediaType(MediaType.TEXT_HTML_TYPE) + .name(errorName) + .data(String.class, errorCode) + .build(); + eventOutput.write(event); + closeSSEConnection(eventOutput); + } catch (IOException e) { + Logger.error(this, "Error sending error event", e); + closeSSEConnection(eventOutput); + } + } + + /** + * Close the SSE connection + * + * @param eventOutput The event output + */ + void closeSSEConnection(final EventOutput eventOutput) { + try { + eventOutput.close(); + } catch (IOException e) { + Logger.error(this, "Error closing SSE connection", e); + } + } + + /** + * Check if a job is in a terminal state + * + * @param state The state of the job + * @return true if the job is in a terminal state, false otherwise + */ + boolean isTerminalState(final JobState state) { + return state == JobState.COMPLETED || + state == JobState.FAILED || + state == JobState.CANCELED; + } + } diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java index c298d24d50a6..d2b537e94f03 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java @@ -4,15 +4,14 @@ import com.dotcms.jobs.business.job.JobPaginatedResult; import com.dotcms.rest.ResponseEntityView; import com.dotcms.rest.WebResource; -import com.dotmarketing.exception.DoesNotExistException; import com.dotmarketing.exception.DotDataException; -import com.dotmarketing.exception.DotRuntimeException; import com.dotmarketing.util.Logger; import com.fasterxml.jackson.core.JsonProcessingException; import graphql.VisibleForTesting; import java.io.IOException; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; import javax.inject.Inject; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.BeanParam; @@ -34,18 +33,21 @@ public class JobQueueResource { private final WebResource webResource; - private final JobQueueHelper helper; + private final SSEConnectionManager sseConnectionManager; @Inject - public JobQueueResource(final JobQueueHelper helper) { - this(new WebResource(), helper); + public JobQueueResource(final JobQueueHelper helper, + final SSEConnectionManager sseConnectionManager) { + this(new WebResource(), helper, sseConnectionManager); } @VisibleForTesting - public JobQueueResource(WebResource webResource, JobQueueHelper helper) { + public JobQueueResource(WebResource webResource, JobQueueHelper helper, + SSEConnectionManager sseConnectionManager) { this.webResource = webResource; this.helper = helper; + this.sseConnectionManager = sseConnectionManager; } @POST @@ -162,30 +164,29 @@ public EventOutput monitorJob(@Context HttpServletRequest request, .rejectWhenNoUser(true) .init(); - Job job = null; + final EventOutput eventOutput = new EventOutput(); + try { - job = helper.getJob(jobId); - } catch (DotDataException | DoesNotExistException e) { - // ignore - } + Job job = helper.getJobForSSE(jobId); - final EventOutput eventOutput = new EventOutput(); + if (job == null) { + helper.sendErrorAndClose("job-not-found", "404", eventOutput); + return eventOutput; + } + + if (helper.isNotWatchable(job)) { + helper.sendErrorAndClose(String.format("job-not-watchable [%s]", + job.state()), "400", eventOutput); + return eventOutput; + } - if (job == null || helper.isNotWatchable(job)) { - try { - OutboundEvent event = new OutboundEvent.Builder() - .mediaType(MediaType.TEXT_HTML_TYPE) - .name("job-not-found") - .data(String.class, "404") - .build(); - eventOutput.write(event); - eventOutput.close(); - } catch (IOException e) { - Logger.error(this, "Error closing SSE connection", e); + if (!sseConnectionManager.canAcceptNewConnection(jobId)) { + helper.sendErrorAndClose("too-many-connections", "429", eventOutput); + return eventOutput; } - } else { + // Callback for watching job updates and sending them to the client - helper.watchJob(job.id(), watched -> { + Consumer jobWatcher = watched -> { if (!eventOutput.isClosed()) { try { OutboundEvent event = new OutboundEvent.Builder() @@ -194,13 +195,30 @@ public EventOutput monitorJob(@Context HttpServletRequest request, .data(Map.class, helper.getJobStatusInfo(watched)) .build(); eventOutput.write(event); + + // If job is complete/failed/cancelled, close the connection + if (helper.isTerminalState(watched.state())) { + sseConnectionManager.closeJobConnections(jobId); + } + } catch (IOException e) { Logger.error(this, "Error writing SSE event", e); - throw new DotRuntimeException(e); + sseConnectionManager.closeJobConnections(jobId); } } - }); + }; + + // Register the connection and watcher + sseConnectionManager.addConnection(jobId, eventOutput); + + // Start watching the job + helper.watchJob(job.id(), jobWatcher); + + } catch (DotDataException e) { + Logger.error(this, "Error setting up job monitor", e); + helper.closeSSEConnection(eventOutput); } + return eventOutput; } diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java new file mode 100644 index 000000000000..104b824f6429 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java @@ -0,0 +1,270 @@ +package com.dotcms.rest.api.v1.job; + +import com.dotmarketing.util.Config; +import com.dotmarketing.util.Logger; +import io.vavr.Lazy; +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import javax.enterprise.context.ApplicationScoped; +import org.glassfish.jersey.media.sse.EventOutput; + +/** + * Manages Server-Sent Events (SSE) connections for job monitoring. This class provides + * functionality for tracking, limiting, and cleaning up SSE connections across multiple jobs. + * + *

Key features include: + *

    + *
  • Connection limits per job and system-wide
  • + *
  • Automatic connection timeout and cleanup
  • + *
  • Thread-safe connection management
  • + *
  • Proper resource cleanup on shutdown
  • + *
+ * + *

Configuration properties: + *

    + *
  • {@code MAX_SSE_CONNECTIONS_PER_JOB} - Maximum number of concurrent connections per job (default: 5)
  • + *
  • {@code MAX_SSE_TOTAL_CONNECTIONS} - Maximum total concurrent connections across all jobs (default: 50)
  • + *
  • {@code SSE_CONNECTION_TIMEOUT_MINUTES} - Connection timeout in minutes (default: 30)
  • + *
+ * + *

Usage example: + *

{@code
+ * SSEConnectionManager manager = new SSEConnectionManager();
+ *
+ * // Check if new connection can be accepted
+ * if (manager.canAcceptNewConnection(jobId)) {
+ *     // Add new connection
+ *     manager.addConnection(jobId, eventOutput);
+ * }
+ *
+ * // Close connections when job completes
+ * manager.closeJobConnections(jobId);
+ * }
+ */ +@ApplicationScoped +public class SSEConnectionManager { + + // Add status tracking + private volatile boolean isShutdown = false; + + private static final Lazy MAX_SSE_CONNECTIONS_PER_JOB = + Lazy.of(() -> Config.getIntProperty("MAX_SSE_CONNECTIONS_PER_JOB", 5)); + + private static final Lazy MAX_SSE_TOTAL_CONNECTIONS = + Lazy.of(() -> Config.getIntProperty("MAX_SSE_TOTAL_CONNECTIONS", 50)); + + private static final Lazy SSE_CONNECTION_TIMEOUT_MINUTES = + Lazy.of(() -> Config.getIntProperty("SSE_CONNECTION_TIMEOUT_MINUTES", 30)); + + private final ConcurrentMap> jobConnections = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService timeoutExecutor = + Executors.newSingleThreadScheduledExecutor(); + + /** + * Shuts down the SSE connection manager and cleans up all resources. This method closes all + * active connections and shuts down the timeout executor. After shutdown, no new connections + * can be added. + */ + @PreDestroy + public void shutdown() { + + isShutdown = true; + + try { + closeAllConnections(); + } finally { + timeoutExecutor.shutdown(); + try { + if (!timeoutExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + timeoutExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + timeoutExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + + /** + * Checks if a new SSE connection can be accepted for the given job. This method verifies both + * per-job and system-wide connection limits. + * + * @param jobId The ID of the job for which to check connection availability + * @return true if a new connection can be accepted, false otherwise + */ + public boolean canAcceptNewConnection(String jobId) { + if (getTotalConnections() >= MAX_SSE_TOTAL_CONNECTIONS.get()) { + return false; + } + + Set connections = jobConnections.get(jobId); + return connections == null || connections.size() < MAX_SSE_CONNECTIONS_PER_JOB.get(); + } + + /** + * Adds a new SSE connection for a job. The connection will be automatically closed after the + * configured timeout period. + * + * @param jobId The ID of the job to monitor + * @param eventOutput The EventOutput instance representing the SSE connection + * @throws IllegalStateException if the manager is shut down + */ + public void addConnection(String jobId, EventOutput eventOutput) { + + if (isShutdown) { + throw new IllegalStateException("SSEConnectionManager is shut down"); + } + + SSEConnection connection = new SSEConnection(jobId, eventOutput); + jobConnections.computeIfAbsent(jobId, k -> ConcurrentHashMap.newKeySet()).add(connection); + + // Schedule connection timeout + timeoutExecutor.schedule(() -> { + try { + removeConnection(jobId, connection); + } catch (Exception e) { + Logger.error(this, "Error removing expired connection", e); + } + }, SSE_CONNECTION_TIMEOUT_MINUTES.get(), TimeUnit.MINUTES); + } + + /** + * Removes a specific SSE connection for a job. If this was the last connection for the job, the + * job entry is removed from tracking. + * + * @param jobId The ID of the job + * @param connection The connection to remove + */ + public void removeConnection(String jobId, SSEConnection connection) { + Set connections = jobConnections.get(jobId); + if (connections != null) { + connections.remove(connection); + connection.close(); + + if (connections.isEmpty()) { + jobConnections.remove(jobId); + } + } + } + + /** + * Gets the total number of active SSE connections across all jobs. + * + * @return The total number of active connections + */ + private int getTotalConnections() { + return jobConnections.values().stream() + .mapToInt(Set::size) + .sum(); + } + + /** + * Closes all active SSE connections and clears connection tracking. + */ + private void closeAllConnections() { + jobConnections.values().forEach(connections -> + connections.forEach(SSEConnection::close) + ); + jobConnections.clear(); + } + + /** + * Closes all SSE connections for a specific job. + * + * @param jobId The ID of the job whose connections should be closed + */ + public void closeJobConnections(String jobId) { + Set connections = jobConnections.remove(jobId); + if (connections != null) { + connections.forEach(SSEConnection::close); + } + } + + /** + * Gets the number of active connections for a specific job. + * + * @param jobId The ID of the job + * @return The number of active connections for the job + */ + public int getConnectionCount(String jobId) { + Set connections = jobConnections.get(jobId); + return connections != null ? connections.size() : 0; + } + + /** + * Gets information about the current state of SSE connections. + * + * @return A map containing connection statistics: + * - totalConnections: Total number of active connections + * - activeJobs: Number of jobs with active connections + */ + public Map getConnectionInfo() { + return Map.of( + "totalConnections", getTotalConnections(), + "activeJobs", jobConnections.size() + ); + } + + /** + * Represents a single SSE connection for a job. Each connection tracks its creation time and + * handles its own cleanup. + */ + public static class SSEConnection { + + private final String jobId; + private final EventOutput eventOutput; + private final LocalDateTime createdAt; + + /** + * Creates a new SSE connection. + * + * @param jobId The ID of the job this connection is monitoring + * @param eventOutput The EventOutput instance representing the SSE connection + */ + public SSEConnection(String jobId, EventOutput eventOutput) { + this.jobId = jobId; + this.eventOutput = eventOutput; + this.createdAt = LocalDateTime.now(); + } + + /** + * Closes this SSE connection. + */ + public void close() { + try { + eventOutput.close(); + } catch (IOException e) { + Logger.error(SSEConnection.class, "Error closing SSE connection", e); + } + } + + /** + * Checks if this connection has exceeded its timeout period. + * + * @return true if the connection has expired, false otherwise + */ + public boolean isExpired() { + return LocalDateTime.now().isAfter( + createdAt.plusMinutes(SSE_CONNECTION_TIMEOUT_MINUTES.get())); + } + + /** + * Gets the ID of the job this connection is monitoring. + * + * @return The job ID + */ + public String getJobId() { + return jobId; + } + } + +} diff --git a/dotCMS/src/main/java/com/dotmarketing/business/CacheLocator.java b/dotCMS/src/main/java/com/dotmarketing/business/CacheLocator.java index 81bd05fe5b06..d99cbf8f3ed5 100644 --- a/dotCMS/src/main/java/com/dotmarketing/business/CacheLocator.java +++ b/dotCMS/src/main/java/com/dotmarketing/business/CacheLocator.java @@ -15,6 +15,8 @@ import com.dotcms.experiments.business.ExperimentsCacheImpl; import com.dotcms.graphql.GraphQLCache; import com.dotcms.graphql.business.GraphQLSchemaCache; +import com.dotcms.jobs.business.job.JobCache; +import com.dotcms.jobs.business.job.JobCacheImpl; import com.dotcms.notifications.business.NewNotificationCache; import com.dotcms.notifications.business.NewNotificationCacheImpl; import com.dotcms.publisher.assets.business.PushedAssetsCache; @@ -364,6 +366,13 @@ public static ExperimentsCache getExperimentsCache() { return (ExperimentsCache) getInstance(CacheIndex.EXPERIMENTS_CACHE); } + /** + * This will get you an instance of the {@link JobCache} singleton cache. + */ + public static JobCache getJobCache() { + return (JobCache) getInstance(CacheIndex.JOB_CACHE); + } + /** * The legacy cache administrator will invalidate cache entries within a cluster * on a put where the non legacy one will not. @@ -476,8 +485,8 @@ enum CacheIndex VariantCache("VariantCache"), EXPERIMENTS_CACHE("ExperimentsCache"), CHAINABLE_404_STORAGE_CACHE("Chainable404StorageCache"), - - Javascript("Javascript"); + Javascript("Javascript"), + JOB_CACHE("JobCache"); Cachable create() { switch(this) { @@ -533,7 +542,7 @@ Cachable create() { case EXPERIMENTS_CACHE: return new ExperimentsCacheImpl(); case CHAINABLE_404_STORAGE_CACHE: return new Chainable404StorageCache(); case Javascript: return new JsCache(); - + case JOB_CACHE: return new JobCacheImpl(); } throw new AssertionError("Unknown Cache index: " + this); } diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java index 210028bdee69..97c3387fde78 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java @@ -28,7 +28,6 @@ import javax.inject.Inject; import org.awaitility.Awaitility; import org.jboss.weld.junit5.EnableWeld; -import org.jboss.weld.junit5.WeldJunit5Extension; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -38,7 +37,6 @@ import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; import org.junit.jupiter.api.TestMethodOrder; -import org.junit.jupiter.api.extension.ExtendWith; /** * Integration tests for the JobQueueManagerAPI. @@ -50,6 +48,8 @@ @TestInstance(Lifecycle.PER_CLASS) public class JobQueueManagerAPIIntegrationTest extends com.dotcms.Junit5WeldBaseTest { + private static int attempts = 0; + @Inject JobQueueManagerAPI jobQueueManagerAPI; @@ -85,6 +85,9 @@ void reset() { if(null != jobQueueManagerAPI) { jobQueueManagerAPI.getCircuitBreaker().reset(); } + + // Reset retry attempts + attempts = 0; } /** @@ -185,8 +188,6 @@ void test_JobRetry() throws Exception { }); } - - /** * Method to test: Job failure handling in JobQueueManagerAPI * Given Scenario: A job is created that is designed to fail @@ -473,7 +474,6 @@ public Map getResultMetadata(Job job) { static class RetryingJobProcessor implements JobProcessor { public static final int MAX_RETRIES = 3; - private int attempts = 0; public RetryingJobProcessor() { // needed for instantiation purposes diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java index 8525440886d3..ccfe76f90c60 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java @@ -13,9 +13,11 @@ import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; @@ -23,6 +25,7 @@ import static org.mockito.Mockito.when; import com.dotcms.jobs.business.api.events.EventProducer; +import com.dotcms.jobs.business.api.events.JobCancelRequestEvent; import com.dotcms.jobs.business.api.events.RealTimeJobMonitor; import com.dotcms.jobs.business.error.CircuitBreaker; import com.dotcms.jobs.business.error.ErrorDetail; @@ -43,6 +46,8 @@ import com.dotcms.jobs.business.queue.error.JobNotFoundException; import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotcms.jobs.business.queue.error.JobQueueException; +import com.dotcms.system.event.local.business.LocalSystemEventsAPI; +import com.dotmarketing.business.APILocator; import com.dotmarketing.exception.DotDataException; import java.time.LocalDateTime; import java.util.ArrayList; @@ -65,6 +70,7 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; +import org.mockito.MockedStatic; public class JobQueueManagerAPITest { @@ -1182,116 +1188,170 @@ public void test_CircuitBreaker_Reset() throws Exception { public void test_simple_cancelJob2() throws DotDataException, JobQueueException, JobCancellationException { - // Set up the job queue manager to return our mock cancellable processor - final String testQueue = "CancellableTestQueue"; + try (MockedStatic apiLocator = mockStatic(APILocator.class)) { - // Create a mock cancellable processor - final String jobIdIn = "job2"; - when(mockJobQueue.createJob(anyString(), anyMap())).thenReturn(jobIdIn); - // Create a mock job - Job mockJob = mock(Job.class); - when(mockJobQueue.getJob(jobIdIn)).thenReturn(mockJob); - when(mockJob.queueName()).thenReturn(testQueue); - when(mockJob.id()).thenReturn(jobIdIn); - when(mockJob.withState(any())).thenReturn(mockJob); + // Set up the job queue manager to return our mock cancellable processor + final String testQueue = "CancellableTestQueue"; + final String jobIdIn = "job2"; - jobQueueManagerAPI.registerProcessor(testQueue, SimpleCancellableJobProcessor.class); - // Create a job so we can cancel it - final String jobIdOut = jobQueueManagerAPI.createJob(testQueue, Map.of()); - assertEquals(jobIdIn, jobIdOut); - // Perform the cancellation - jobQueueManagerAPI.cancelJob(jobIdOut); - // Verify that the cancel method was called on our mock processor - verify(mockCancellableProcessor).cancel(mockJob); - } + // Create a mock job + Job mockJob = mock(Job.class); + when(mockJob.queueName()).thenReturn(testQueue); + when(mockJob.id()).thenReturn(jobIdIn); + when(mockJob.state()).thenReturn(JobState.RUNNING); + when(mockJob.withState(any(JobState.class))).thenReturn(mockJob); - /** - * Method to test: Job cancellation in JobQueueManagerAPI - * Given Scenario: Running job is canceled - * ExpectedResult: Job is successfully canceled and its state transitions are correct - */ - @Test - public void test_complex_cancelJob() throws Exception { + // Mock job queue operations + when(mockJobQueue.createJob(anyString(), anyMap())).thenReturn(jobIdIn); + when(mockJobQueue.getJob(anyString())).thenReturn(mockJob); + doNothing().when(mockJobQueue).updateJobStatus(any(Job.class)); - // Create a mock job - Job mockJob = mock(Job.class); - final String jobId = "job5644"; - when(mockJob.id()).thenReturn(jobId); - final String testQueue = "myTestQueue"; - when(mockJob.queueName()).thenReturn(testQueue); + // Create Mock Job Event + JobCancelRequestEvent mockEvent = mock(JobCancelRequestEvent.class); + when(mockEvent.getJob()).thenReturn(mockJob); - // Configure JobQueue - when(mockJobQueue.getJob(jobId)).thenReturn(mockJob); - when(mockJobQueue.nextJob()).thenReturn(mockJob).thenReturn(null); - when(mockJobQueue.hasJobBeenInState(any(), eq(JobState.CANCELLING))).thenReturn(true); - when(mockJobQueue.createJob(anyString(), anyMap())).thenReturn(jobId); + // Mock system events + LocalSystemEventsAPI localSystemEventsAPI = mock(LocalSystemEventsAPI.class); + apiLocator.when(APILocator::getLocalSystemEventsAPI).thenReturn(localSystemEventsAPI); - // List to capture job state updates - List stateUpdates = new CopyOnWriteArrayList<>(); - - when(mockJob.withState(any())).thenAnswer(inv -> { - stateUpdates.add(inv.getArgument(0)); - return mockJob; - }); - when(mockJob.markAsRunning()).thenAnswer(inv -> { - stateUpdates.add(JobState.RUNNING); - return mockJob; - }); - when(mockJob.markAsCanceled(any())).thenAnswer(inv -> { - stateUpdates.add(JobState.CANCELED); - return mockJob; - }); - when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { - stateUpdates.add(JobState.COMPLETED); - return mockJob; - }); - when(mockJob.markAsFailed(any())).thenAnswer(inv -> { - stateUpdates.add(JobState.FAILED); - return mockJob; - }); - when(mockJob.progress()).thenReturn(0f); - when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); - when(mockJob.withProgressTracker(any(DefaultProgressTracker.class))).thenReturn(mockJob); + // Handle the event notification + doAnswer(invocation -> { + JobCancelRequestEvent event = invocation.getArgument(0); + ((JobQueueManagerAPIImpl) jobQueueManagerAPI).onCancelRequestJob(event); + return null; + }).when(localSystemEventsAPI).notify(any(JobCancelRequestEvent.class)); - when(mockJobQueue.getUpdatedJobsSince(anySet(), any(LocalDateTime.class))) - .thenAnswer(invocation -> Collections.singletonList(mockJob)); + jobQueueManagerAPI.registerProcessor(testQueue, SimpleCancellableJobProcessor.class); - // Register the test processor - jobQueueManagerAPI.registerProcessor(testQueue, ComplexCancellableJobProcessor.class); + // Create a job so we can cancel it + final String jobIdOut = jobQueueManagerAPI.createJob(testQueue, Map.of()); + assertEquals(jobIdIn, jobIdOut); - // Configure circuit breaker - when(mockCircuitBreaker.allowRequest()).thenReturn(true); + // Perform the cancellation + jobQueueManagerAPI.cancelJob(jobIdOut); - // Start the job queue manager - jobQueueManagerAPI.start(); + // Verify that the cancel method was called on our mock processor + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + verify(mockCancellableProcessor).cancel(mockJob); + }); - final String jobIdOut = jobQueueManagerAPI.createJob(testQueue, Map.of()); + } + } - final Optional instance = jobQueueManagerAPI.getInstance(jobIdOut); - assertTrue(instance.isPresent()); - // Use our TestJobProcessor - final ComplexCancellableJobProcessor testJobProcessor = (ComplexCancellableJobProcessor) instance.get(); + /** + * Method to test: Job cancellation in JobQueueManagerAPI + * Given Scenario: Running job is canceled + * ExpectedResult: Job is successfully canceled and its state transitions are correct + */ + @Test + public void test_complex_cancelJob() throws Exception { - // Wait for the job to start processing - Awaitility.await() - .atMost(5, TimeUnit.SECONDS) - .until(() -> testJobProcessor.awaitProcessingStart(100, TimeUnit.MILLISECONDS)); + try (MockedStatic apiLocator = mockStatic(APILocator.class)) { + + final String testQueue = "myTestQueue"; + final String jobId = "job5644"; + + // Create a mock job + Job mockJob = mock(Job.class); + when(mockJob.queueName()).thenReturn(testQueue); + when(mockJob.id()).thenReturn(jobId); + when(mockJob.state()).thenReturn(JobState.RUNNING); + when(mockJob.withState(any(JobState.class))).thenReturn(mockJob); + + // Configure JobQueue + when(mockJobQueue.getJob(jobId)).thenReturn(mockJob); + when(mockJobQueue.nextJob()).thenReturn(mockJob).thenReturn(null); + when(mockJobQueue.hasJobBeenInState(any(), eq(JobState.CANCEL_REQUESTED), + eq(JobState.CANCELLING))).thenReturn(true); + when(mockJobQueue.createJob(anyString(), anyMap())).thenReturn(jobId); + + // List to capture job state updates + List stateUpdates = new CopyOnWriteArrayList<>(); + + when(mockJob.withState(any())).thenAnswer(inv -> { + stateUpdates.add(inv.getArgument(0)); + return mockJob; + }); + when(mockJob.markAsRunning()).thenAnswer(inv -> { + stateUpdates.add(JobState.RUNNING); + return mockJob; + }); + when(mockJob.markAsCanceled(any())).thenAnswer(inv -> { + stateUpdates.add(JobState.CANCELED); + return mockJob; + }); + when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { + stateUpdates.add(JobState.COMPLETED); + return mockJob; + }); + when(mockJob.markAsFailed(any())).thenAnswer(inv -> { + stateUpdates.add(JobState.FAILED); + return mockJob; + }); + when(mockJob.progress()).thenReturn(0f); + when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); + when(mockJob.withProgressTracker(any(DefaultProgressTracker.class))).thenReturn( + mockJob); + + when(mockJobQueue.getUpdatedJobsSince(anySet(), any(LocalDateTime.class))) + .thenAnswer(invocation -> Collections.singletonList(mockJob)); + + // Create Mock Job Event + JobCancelRequestEvent mockEvent = mock(JobCancelRequestEvent.class); + when(mockEvent.getJob()).thenReturn(mockJob); + + // Mock system events + LocalSystemEventsAPI localSystemEventsAPI = mock(LocalSystemEventsAPI.class); + apiLocator.when(APILocator::getLocalSystemEventsAPI).thenReturn(localSystemEventsAPI); + + // Handle the event notification + doAnswer(invocation -> { + JobCancelRequestEvent event = invocation.getArgument(0); + ((JobQueueManagerAPIImpl) jobQueueManagerAPI).onCancelRequestJob(event); + return null; + }).when(localSystemEventsAPI).notify(any(JobCancelRequestEvent.class)); + + // Register the test processor + jobQueueManagerAPI.registerProcessor(testQueue, ComplexCancellableJobProcessor.class); + + // Configure circuit breaker + when(mockCircuitBreaker.allowRequest()).thenReturn(true); + + // Start the job queue manager + jobQueueManagerAPI.start(); + + final String jobIdOut = jobQueueManagerAPI.createJob(testQueue, Map.of()); + + final Optional instance = jobQueueManagerAPI.getInstance(jobIdOut); + assertTrue(instance.isPresent()); + // Use our TestJobProcessor + final ComplexCancellableJobProcessor testJobProcessor = (ComplexCancellableJobProcessor) instance.get(); + + // Wait for the job to start processing + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .until(() -> testJobProcessor.awaitProcessingStart(100, TimeUnit.MILLISECONDS)); - // Cancel the job - jobQueueManagerAPI.cancelJob(jobId); + // Cancel the job + jobQueueManagerAPI.cancelJob(jobId); - // Wait for the job to complete (which should be due to cancellation) - Awaitility.await() - .atMost(10, TimeUnit.SECONDS) - .until(() -> testJobProcessor.awaitProcessingCompleted(100, TimeUnit.MILLISECONDS)); + // Wait for the job to complete (which should be due to cancellation) + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .until(() -> testJobProcessor.awaitProcessingCompleted(100, + TimeUnit.MILLISECONDS)); - Awaitility.await() - .atMost(10, TimeUnit.SECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> stateUpdates.contains(JobState.CANCELED)); + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> stateUpdates.contains(JobState.CANCELED)); - // Clean up - jobQueueManagerAPI.close(); + // Clean up + jobQueueManagerAPI.close(); + } } /** diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java index e2889dc18f46..e25ee222c34d 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java @@ -5,6 +5,7 @@ import com.dotcms.contenttype.model.type.ContentType; import com.dotcms.datagen.TestDataUtils; +import com.dotcms.jobs.business.error.JobValidationException; import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobState; import com.dotcms.jobs.business.processor.DefaultProgressTracker; @@ -27,6 +28,7 @@ import java.util.Map; import javax.servlet.http.HttpServletRequest; import org.jboss.weld.junit5.EnableWeld; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -73,6 +75,188 @@ static void setUp() throws Exception { request = JobUtil.generateMockRequest(systemUser, defaultSite.getHostname()); } + /** + * Tests the preview mode of the content import process. This test: + *
    + *
  • Creates a test content type
  • + *
  • Generates a test CSV file with sample content
  • + *
  • Processes the import in preview mode
  • + *
  • Verifies the preview results and metadata
  • + *
  • Verifies there is no content creation in the database
  • + *
+ * + *

The test ensures that preview mode properly validates the content + * without actually creating it in the system using the content type variable instead of the ID. + * + * @throws Exception if there's an error during the test execution + */ + @Test + void test_process_preview_using_content_type_variable() throws Exception { + + ContentType testContentType = null; + + try { + // Initialize processor + final var processor = new ImportContentletsProcessor(); + + // Create test content type + testContentType = createTestContentType(); + + // Create test CSV file + File csvFile = createTestCsvFile(); + + // Create test job + final var testJob = createTestJob( + csvFile, "preview", "1", testContentType.variable(), + "b9d89c80-3d88-4311-8365-187323c96436" + ); + + // Process the job in preview mode + processor.process(testJob); + + // Verify preview results + Map metadata = processor.getResultMetadata(testJob); + assertNotNull(metadata, "Preview metadata should not be null"); + assertNotNull(metadata.get("errors"), "Preview metadata errors should not be null"); + assertNotNull(metadata.get("results"), "Preview metadata results should not be null"); + assertEquals(0, ((ArrayList) metadata.get("errors")).size(), + "Preview metadata errors should be empty"); + + // Verify no content was created + final var importedContent = findImportedContent(testContentType.id()); + assertNotNull(importedContent, "Imported content should not be null"); + assertEquals(0, importedContent.size(), "Imported content should have no items"); + + } finally { + if (testContentType != null) { + // Clean up test content type + APILocator.getContentTypeAPI(systemUser).delete(testContentType); + } + } + } + + /** + * Scenario: Test the preview mode of the content import process with an invalid content type. + *

+ * Expected: A JobValidationException should be thrown. + * + * @throws Exception if there's an error during the test execution + */ + @Test + void test_process_preview_invalid_content_type_variable() throws Exception { + + // Initialize processor + final var processor = new ImportContentletsProcessor(); + + // Create test CSV file + File csvFile = createTestCsvFile(); + + // Create test job + final var testJob = createTestJob( + csvFile, "preview", "1", "doesNotExist", + "b9d89c80-3d88-4311-8365-187323c96436" + ); + + try { + // Process the job in preview mode + processor.process(testJob); + Assertions.fail("A JobValidationException should have been thrown here."); + } catch (Exception e) { + Assertions.assertInstanceOf(JobValidationException.class, e); + } + } + + /** + * Tests the preview mode of the content import process. This test: + *

    + *
  • Creates a test content type
  • + *
  • Generates a test CSV file with sample content
  • + *
  • Processes the import in preview mode
  • + *
  • Verifies the preview results and metadata
  • + *
  • Verifies there is no content creation in the database
  • + *
+ * + *

The test ensures that preview mode properly validates the content + * without actually creating it in the system using the language ISO code instead of the ID. + * + * @throws Exception if there's an error during the test execution + */ + @Test + void test_process_preview_using_language_iso_code() throws Exception { + + ContentType testContentType = null; + + try { + // Initialize processor + final var processor = new ImportContentletsProcessor(); + + // Create test content type + testContentType = createTestContentType(); + + // Create test CSV file + File csvFile = createTestCsvFile(); + + // Create test job + final var testJob = createTestJob( + csvFile, "preview", "en-us", testContentType.variable(), + "b9d89c80-3d88-4311-8365-187323c96436" + ); + + // Process the job in preview mode + processor.process(testJob); + + // Verify preview results + Map metadata = processor.getResultMetadata(testJob); + assertNotNull(metadata, "Preview metadata should not be null"); + assertNotNull(metadata.get("errors"), "Preview metadata errors should not be null"); + assertNotNull(metadata.get("results"), "Preview metadata results should not be null"); + assertEquals(0, ((ArrayList) metadata.get("errors")).size(), + "Preview metadata errors should be empty"); + + // Verify no content was created + final var importedContent = findImportedContent(testContentType.id()); + assertNotNull(importedContent, "Imported content should not be null"); + assertEquals(0, importedContent.size(), "Imported content should have no items"); + + } finally { + if (testContentType != null) { + // Clean up test content type + APILocator.getContentTypeAPI(systemUser).delete(testContentType); + } + } + } + + /** + * Scenario: Test the preview mode of the content import process with an invalid language. + *

+ * Expected: A JobValidationException should be thrown. + * + * @throws Exception if there's an error during the test execution + */ + @Test + void test_process_preview_invalid_language() throws Exception { + + // Initialize processor + final var processor = new ImportContentletsProcessor(); + + // Create test CSV file + File csvFile = createTestCsvFile(); + + // Create test job + final var testJob = createTestJob( + csvFile, "preview", "12345", "doesNotExist", + "b9d89c80-3d88-4311-8365-187323c96436" + ); + + try { + // Process the job in preview mode + processor.process(testJob); + Assertions.fail("A JobValidationException should have been thrown here."); + } catch (Exception e) { + Assertions.assertInstanceOf(JobValidationException.class, e); + } + } + /** * Tests the preview mode of the content import process. This test: *

    @@ -105,7 +289,8 @@ void test_process_preview() throws Exception { // Create test job final var testJob = createTestJob( - csvFile, "preview", testContentType.id(), "b9d89c80-3d88-4311-8365-187323c96436" + csvFile, "preview", "1", testContentType.id(), + "b9d89c80-3d88-4311-8365-187323c96436" ); // Process the job in preview mode @@ -163,7 +348,8 @@ void test_process_publish() throws Exception { // Create test job final var testJob = createTestJob( - csvFile, "publish", testContentType.id(), "b9d89c80-3d88-4311-8365-187323c96436" + csvFile, "publish", "1", testContentType.id(), + "b9d89c80-3d88-4311-8365-187323c96436" ); // Process the job in preview mode @@ -205,14 +391,16 @@ private ContentType createTestContentType() { * * @param csvFile The CSV file containing the content to be imported * @param cmd The command to execute ('preview' or 'publish') - * @param contentTypeId The ID of the content type for the imported content + * @param contentType The content type for the imported content + * @param language The language of the imported content * @param workflowActionId The ID of the workflow action to be applied * @return A configured {@link Job} instance ready for processing * @throws IOException if there's an error reading the CSV file * @throws DotSecurityException if there's a security violation during job creation */ - private Job createTestJob(final File csvFile, final String cmd, final String contentTypeId, - final String workflowActionId) throws IOException, DotSecurityException { + private Job createTestJob(final File csvFile, final String cmd, final String language, + final String contentType, final String workflowActionId) + throws IOException, DotSecurityException { final Map jobParameters = new HashMap<>(); @@ -221,9 +409,11 @@ private Job createTestJob(final File csvFile, final String cmd, final String con jobParameters.put("userId", systemUser.getUserId()); jobParameters.put("siteName", defaultSite.getHostname()); jobParameters.put("siteIdentifier", defaultSite.getIdentifier()); - jobParameters.put("contentType", contentTypeId); + jobParameters.put("contentType", contentType); jobParameters.put("workflowActionId", workflowActionId); - jobParameters.put("language", "1"); + if (language != null) { + jobParameters.put("language", language); + } final TempFileAPI tempFileAPI = APILocator.getTempFileAPI(); try (final var fileInputStream = new FileInputStream(csvFile)) { diff --git a/dotcms-postman/src/main/resources/postman/JobQueueResourceAPITests.postman_collection.json b/dotcms-postman/src/main/resources/postman/JobQueueResourceAPITests.postman_collection.json index cbc8bc3971d6..14ecba3220a9 100644 --- a/dotcms-postman/src/main/resources/postman/JobQueueResourceAPITests.postman_collection.json +++ b/dotcms-postman/src/main/resources/postman/JobQueueResourceAPITests.postman_collection.json @@ -1,6 +1,6 @@ { "info": { - "_postman_id": "cc2de2d8-aecf-4063-a97c-089965ba573d", + "_postman_id": "be9c354a-6c94-4b10-be0e-bc0c1b324c24", "name": "JobQueueResource API Tests", "description": "Postman collection for testing the JobQueueResource API endpoints.", "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json", @@ -772,7 +772,7 @@ "response": [] }, { - "name": "Cancel Job", + "name": "Create Failing Job", "event": [ { "listen": "test", @@ -782,24 +782,10 @@ " pm.response.to.have.status(200);", "});", "", - "// Check if cancellation message is returned", "var jsonData = pm.response.json();", - "pm.test(\"Job cancelled successfully\", function () {", - " pm.expect(jsonData.entity).to.include('Cancellation request successfully sent to job');", - "});", - "", - "var jobId = pm.collectionVariables.get(\"jobId\");", - "console.log(\" At the time this request was sent \" + jobId);", - "pm.collectionVariables.set(\"cancelledJobId\",jobId);" - ], - "type": "text/javascript", - "packages": {} - } - }, - { - "listen": "prerequest", - "script": { - "exec": [ + "pm.expect(jsonData.entity).to.be.a('String');", + "// Save jobId to environment variable", + "pm.environment.set(\"failingJobId\", jsonData.entity);", "" ], "type": "text/javascript", @@ -810,8 +796,17 @@ "request": { "method": "POST", "header": [], + "body": { + "mode": "raw", + "raw": "{\n \"fail\": true\n}", + "options": { + "raw": { + "language": "json" + } + } + }, "url": { - "raw": "{{baseUrl}}/api/v1/jobs/{{jobId}}/cancel", + "raw": "{{baseUrl}}/api/v1/jobs/failSuccess", "host": [ "{{baseUrl}}" ], @@ -819,16 +814,15 @@ "api", "v1", "jobs", - "{{jobId}}", - "cancel" + "failSuccess" ] }, - "description": "Cancels a specific job." + "description": "Creates a new job in the specified queue (Create Failing Job)" }, "response": [] }, { - "name": "Create Failing Job", + "name": "Create Success Job", "event": [ { "listen": "test", @@ -841,7 +835,7 @@ "var jsonData = pm.response.json();", "pm.expect(jsonData.entity).to.be.a('String');", "// Save jobId to environment variable", - "pm.environment.set(\"failingJobId\", jsonData.entity);", + "pm.environment.set(\"successJobId\", jsonData.entity);", "" ], "type": "text/javascript", @@ -854,7 +848,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"fail\": true\n}", + "raw": "{}", "options": { "raw": { "language": "json" @@ -873,12 +867,32 @@ "failSuccess" ] }, - "description": "Creates a new job in the specified queue (Create Failing Job)" + "description": "Creates a new job in the specified queue (Create a job that will finish sucessfully)" }, "response": [] }, { - "name": "Create Success Job", + "name": "Allow job to run before stopping", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "https://postman-echo.com/delay/10", + "protocol": "https", + "host": [ + "postman-echo", + "com" + ], + "path": [ + "delay", + "10" + ] + } + }, + "response": [] + }, + { + "name": "Cancel Job", "event": [ { "listen": "test", @@ -888,10 +902,24 @@ " pm.response.to.have.status(200);", "});", "", + "// Check if cancellation message is returned", "var jsonData = pm.response.json();", - "pm.expect(jsonData.entity).to.be.a('String');", - "// Save jobId to environment variable", - "pm.environment.set(\"successJobId\", jsonData.entity);", + "pm.test(\"Job cancelled successfully\", function () {", + " pm.expect(jsonData.entity).to.include('Cancellation request successfully sent to job');", + "});", + "", + "var jobId = pm.collectionVariables.get(\"jobId\");", + "console.log(\" At the time this request was sent \" + jobId);", + "pm.environment.set(\"cancelledJobId\",jobId);" + ], + "type": "text/javascript", + "packages": {} + } + }, + { + "listen": "prerequest", + "script": { + "exec": [ "" ], "type": "text/javascript", @@ -902,17 +930,8 @@ "request": { "method": "POST", "header": [], - "body": { - "mode": "raw", - "raw": "{}", - "options": { - "raw": { - "language": "json" - } - } - }, "url": { - "raw": "{{baseUrl}}/api/v1/jobs/failSuccess", + "raw": "{{baseUrl}}/api/v1/jobs/{{jobId}}/cancel", "host": [ "{{baseUrl}}" ], @@ -920,10 +939,11 @@ "api", "v1", "jobs", - "failSuccess" + "{{jobId}}", + "cancel" ] }, - "description": "Creates a new job in the specified queue (Create a job that will finish sucessfully)" + "description": "Cancels a specific job." }, "response": [] }, @@ -1018,27 +1038,79 @@ "status" ] }, - "description": "Retrieves the status of a specific job." + "description": "Waits for the job to finish" }, "response": [] }, { - "name": "Monitor Non Existing Job", + "name": "Waiting Job to fail", "event": [ { "listen": "test", "script": { "exec": [ - "pm.test(\"Status code is 200\", function () {", + "const maxTimeout = 30000; // 10 seconds", + "const maxRetries = 10; // Maximum number of retry attempts", + "const startTime = parseInt(pm.environment.get(\"startTime\"));", + "const retryCount = parseInt(pm.environment.get(\"retryCount\"));", + "const elapsedTime = Date.now() - startTime;", + "", + "console.log(`Attempt ${retryCount + 1}, Elapsed time: ${elapsedTime}ms`);", + "", + "var response = pm.response.json();", + "console.log(\"Current job state:\", response.entity.state);", + " ", + "// Check if job status is \"FAILED\"", + "if (response.entity.state === \"FAILED\") {", + " // Clear environment variables once done", + " pm.environment.unset(\"startTime\");", + " pm.environment.unset(\"retryCount\");", + "} else if (elapsedTime < maxTimeout && retryCount < maxRetries) {", + " // Increment retry count", + " pm.environment.set(\"retryCount\", retryCount + 1);", + " ", + " setTimeout(function(){", + " console.log(\"Sleeping for 3 seconds before next request.\");", + " }, 3000);", + " postman.setNextRequest(\"Waiting Job to fail\");", + " console.log(`Job still processing, retrying... (${maxTimeout - elapsedTime}ms remaining)`);", + "} else {", + " // If we exceed the max timeout or max retries, fail the test", + " const timeoutReason = elapsedTime >= maxTimeout ? \"timeout\" : \"max retries\";", + " pm.environment.unset(\"startTime\");", + " pm.environment.unset(\"retryCount\");", + " pm.test(`Job state check failed due to ${timeoutReason}`, function () {", + " pm.expect.fail(`${timeoutReason} reached after ${elapsedTime}ms. Job still in processing state after ${retryCount} attempts`);", + " });", + "}", + "", + "// Add response validation", + "pm.test(\"Response is successful\", function () {", + " pm.response.to.be.success;", " pm.response.to.have.status(200);", "});", "", - "pm.test(\"Response contains job-not-found event and 404 data\", function () {", - " const responseText = pm.response.text();", - " pm.expect(responseText).to.include(\"event: job-not-found\");", - " pm.expect(responseText).to.include(\"data: 404\");", - "});", - "" + "pm.test(\"Response has the correct structure\", function () {", + " const response = pm.response.json();", + " pm.expect(response).to.have.property('entity');", + " pm.expect(response.entity).to.have.property('state');", + "});" + ], + "type": "text/javascript", + "packages": {} + } + }, + { + "listen": "prerequest", + "script": { + "exec": [ + "if (!pm.environment.get(\"startTime\")) {", + " pm.environment.set(\"startTime\", Date.now());", + "}", + "", + "if (!pm.environment.get(\"retryCount\")) {", + " pm.environment.set(\"retryCount\", 0);", + "}" ], "type": "text/javascript", "packages": {} @@ -1047,14 +1119,9 @@ ], "request": { "method": "GET", - "header": [ - { - "key": "Accept", - "value": "text/event-stream" - } - ], + "header": [], "url": { - "raw": "{{baseUrl}}/api/v1/jobs/nonExistingJob/monitor", + "raw": "{{baseUrl}}/api/v1/jobs/{{failingJobId}}/status", "host": [ "{{baseUrl}}" ], @@ -1062,99 +1129,67 @@ "api", "v1", "jobs", - "nonExistingJob", - "monitor" + "{{failingJobId}}", + "status" ] }, - "description": "Monitors a specific job using Server-Sent Events (SSE)." + "description": "Waits for the job to finish" }, "response": [] }, { - "name": "Get Job Status Expect Cancel", + "name": "Waiting Job to be canceled", "event": [ { "listen": "test", "script": { "exec": [ - "// Configuration", - "const maxRetries = 5; // Number of times to retry", - "const retryDelay = 4000; // Delay between retries in milliseconds", - "", - "// Get server URL from environment variable", - "const serverURL = pm.environment.get('serverURL') || pm.collectionVariables.get('baseUrl'); // fallback to baseURL if serverURL is not defined", - "", - "// Assuming 'jobId' is set as an environment variable", - "const jobId = pm.collectionVariables.get('cancelledJobId');", - "const checkUrl = `${serverURL}/api/v1/jobs/${jobId}/status`;", - "const cancelUrl = `${serverURL}/api/v1/jobs/${jobId}/cancel`;", - "", - "// Function to check the job state", - "function checkJobState(retriesLeft) {", - "", - " console.log(\"checkURL url :: \"+checkUrl); ", - " console.log(\"cancelUrl url :: \"+cancelUrl); ", - " const token = pm.collectionVariables.get('jwt');", - "", - " pm.sendRequest({", - " url:checkUrl,", - " method:'GET',", - " header: {", - " 'Authorization': `Bearer ${token}`, // Add Bearer token in Authorization header", - " 'Content-Type': 'application/json'", - " }", - " }, ", - " function (err, response) {", - " if (err) {", - " console.error(\"Error retrieving job status:\", err);", - " return;", - " }", - " ", - " let jsonData = response.json();", - " const jobState = jsonData.entity.state;", - " console.log(jobState);", - " ", - " // Test for \"CANCELED\" state", - " if (jobState === \"CANCELED\") {", - " pm.test(\"Job has been CANCELED\", function () {", - " pm.expect(jobState).to.eql(\"CANCELED\");", - " });", - " console.log(\"Job has been successfully canceled.\");", - " } else if (retriesLeft > 0) {", - " console.log(\" retriesLeft :: \"+retriesLeft);", - " // Send a cancel POST request and retry", - " pm.sendRequest({", - " url: cancelUrl,", - " method: 'POST',", - " header: {", - " 'Authorization': `Bearer ${token}`, // Add Bearer token in Authorization header", - " 'Content-Type': 'application/json'", - " }", - " }, function (cancelErr, cancelResponse) {", - " if (cancelErr) {", - " console.error(\"Error sending cancel request:\", cancelErr);", - " } else {", - " console.log(`Cancel request sent. Status: ${cancelResponse.status}`);", - " }", - " ", - " // Wait for a delay and then check the status again", - " setTimeout(function () {", - " checkJobState(retriesLeft - 1);", - " }, retryDelay);", - " });", - " } else {", - " // If maximum retries are reached and job is still not canceled", - " pm.test(\"Job has not been CANCELED after maximum retries\", function () {", - " pm.expect(jobState).to.eql(\"CANCELED\");", - " });", - " console.warn(\"Job status is still not 'CANCELED' after maximum retries.\");", - " }", + "const maxTimeout = 30000; // 10 seconds", + "const maxRetries = 10; // Maximum number of retry attempts", + "const startTime = parseInt(pm.environment.get(\"startTime\"));", + "const retryCount = parseInt(pm.environment.get(\"retryCount\"));", + "const elapsedTime = Date.now() - startTime;", + "", + "console.log(`Attempt ${retryCount + 1}, Elapsed time: ${elapsedTime}ms`);", + "", + "var response = pm.response.json();", + "console.log(\"Current job state:\", response.entity.state);", + " ", + "// Check if job status is \"CANCELED\"", + "if (response.entity.state === \"CANCELED\") {", + " // Clear environment variables once done", + " pm.environment.unset(\"startTime\");", + " pm.environment.unset(\"retryCount\");", + "} else if (elapsedTime < maxTimeout && retryCount < maxRetries) {", + " // Increment retry count", + " pm.environment.set(\"retryCount\", retryCount + 1);", + " ", + " setTimeout(function(){", + " console.log(\"Sleeping for 3 seconds before next request.\");", + " }, 3000);", + " postman.setNextRequest(\"Waiting Job to be canceled\");", + " console.log(`Job still processing, retrying... (${maxTimeout - elapsedTime}ms remaining)`);", + "} else {", + " // If we exceed the max timeout or max retries, fail the test", + " const timeoutReason = elapsedTime >= maxTimeout ? \"timeout\" : \"max retries\";", + " pm.environment.unset(\"startTime\");", + " pm.environment.unset(\"retryCount\");", + " pm.test(`Job state check failed due to ${timeoutReason}`, function () {", + " pm.expect.fail(`${timeoutReason} reached after ${elapsedTime}ms. Job still in processing state after ${retryCount} attempts`);", " });", "}", "", - "// Initial job state check", - "checkJobState(maxRetries);", - "" + "// Add response validation", + "pm.test(\"Response is successful\", function () {", + " pm.response.to.be.success;", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Response has the correct structure\", function () {", + " const response = pm.response.json();", + " pm.expect(response).to.have.property('entity');", + " pm.expect(response.entity).to.have.property('state');", + "});" ], "type": "text/javascript", "packages": {} @@ -1164,15 +1199,13 @@ "listen": "prerequest", "script": { "exec": [ - "function sleep(milliseconds) {", - " const start = Date.now();", - " while (Date.now() - start < milliseconds) {", - " // Busy-wait loop that blocks the execution", - " }", + "if (!pm.environment.get(\"startTime\")) {", + " pm.environment.set(\"startTime\", Date.now());", "}", "", - "", - "sleep(9000);" + "if (!pm.environment.get(\"retryCount\")) {", + " pm.environment.set(\"retryCount\", 0);", + "}" ], "type": "text/javascript", "packages": {} @@ -1195,7 +1228,55 @@ "status" ] }, - "description": "Retrieves the status of a specific job. We expect to get one in status Canceled." + "description": "Waits for the job to finish" + }, + "response": [] + }, + { + "name": "Monitor Non Existing Job", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Response contains job-not-found event and 404 data\", function () {", + " const responseText = pm.response.text();", + " pm.expect(responseText).to.include(\"event: job-not-found\");", + " pm.expect(responseText).to.include(\"data: 404\");", + "});", + "" + ], + "type": "text/javascript", + "packages": {} + } + } + ], + "request": { + "method": "GET", + "header": [ + { + "key": "Accept", + "value": "text/event-stream" + } + ], + "url": { + "raw": "{{baseUrl}}/api/v1/jobs/nonExistingJob/monitor", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "v1", + "jobs", + "nonExistingJob", + "monitor" + ] + }, + "description": "Monitors a specific job using Server-Sent Events (SSE)." }, "response": [] }, @@ -1207,7 +1288,7 @@ "script": { "exec": [ "// Get the expected job ID from collection variables", - "const jobId = pm.collectionVariables.get('cancelledJobId');", + "const jobId = pm.environment.get('cancelledJobId');", "", "// Parse the response JSON", "const response = pm.response.json();", @@ -1284,6 +1365,9 @@ "listen": "test", "script": { "exec": [ + "// Get the expected job ID from collection variables", + "const jobId = pm.environment.get('successJobId');", + "", "// Parse the response JSON", "const response = pm.response.json();", "", @@ -1305,7 +1389,7 @@ "", "// Validate that the job ID in the response matches the expected job ID", "pm.test(\"Job ID should match expected job ID\", function () {", - " pm.expect(response.entity.jobs[0].id).to.eql(pm.environment.get('successJobId'));", + " pm.expect(response.entity.jobs[0].id).to.eql(jobId);", "});" ], "type": "text/javascript", @@ -1359,74 +1443,32 @@ "listen": "test", "script": { "exec": [ - "// Configuration", - "const maxRetries = 5; // Maximum number of retries", - "const retryDelay = 2000; // Delay between retries in milliseconds", - "", - "// Get server URL from environment variables", - "const serverURL = pm.environment.get('serverURL') || pm.collectionVariables.get('baseUrl'); // Fallback to baseURL if serverURL is not defined", - "", - "// Define the URL for job status verification", - "const checkUrl = `${serverURL}/api/v1/jobs/failed`;", - "", - "// Function to check the status of jobs", - "function checkJobState(retriesLeft) {", - "", - " console.log(\"Checking jobs URL: \" + checkUrl);", - " const token = pm.collectionVariables.get('jwt'); ", - " // Send a GET request to fetch job statuses", - " pm.sendRequest({", - " url: checkUrl,", - " method: 'GET',", - " header: {", - " 'Authorization': `Bearer ${token}`, // Add Bearer token in Authorization header", - " 'Content-Type': 'application/json'", - " }", - " }, function (err, response) {", - " if (err) {", - " console.error(\"Error retrieving job statuses:\", err);", - " return;", - " }", - "", - " let jsonData = response.json();", - " let jobs = jsonData.entity.jobs;", - "", - " if (jobs.length > 0) {", - " // Check if all jobs have the \"FAILED\" status", - " const allFailed = jobs.every(job => job.state === \"FAILED\");", - "", - " if (allFailed) {", - " // Postman test to validate that all jobs are in the \"FAILED\" state", - " pm.test(\"All jobs are in 'FAILED' state\", function () {", - " pm.expect(allFailed).to.be.true;", - " });", - " console.log(\"All jobs are in 'FAILED' state.\");", - " } else {", - " // If any job is not in the \"FAILED\" state", - " pm.test(\"Some jobs are not in 'FAILED' state\", function () {", - " pm.expect(allFailed).to.be.true; // This will fail if not all jobs are \"FAILED\"", - " });", - " console.warn(\"Not all jobs are in 'FAILED' state.\");", - " }", - " } else if (retriesLeft > 0) {", - " // If no jobs are found and retries are left, wait and retry", - " console.log(\"No jobs available, retries left: \" + retriesLeft);", - " setTimeout(function () {", - " checkJobState(retriesLeft - 1);", - " }, retryDelay);", - " } else {", - " // If no jobs and no retries are left", - " pm.test(\"Maximum retries reached, no jobs received.\", function () {", - " pm.expect(jobs.length).to.be.greaterThan(0); // This will fail if no jobs are found", - " });", - " console.warn(\"No jobs found after maximum retries.\");", - " }", - " });", - "}", + "// Get the expected job ID from collection variables", + "const jobId = pm.environment.get('failingJobId');", "", - "// Start job status check with the maximum number of retries", - "checkJobState(maxRetries);", - "" + "// Parse the response JSON", + "const response = pm.response.json();", + "", + "// Validate that the response status is 200 OK", + "pm.test(\"Response status is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "// Validate that the response contains an \"entity.jobs\" array", + "pm.test(\"Response should contain jobs array\", function () {", + " pm.expect(response.entity).to.have.property(\"jobs\");", + " pm.expect(response.entity.jobs).to.be.an(\"array\");", + "});", + "", + "// Validate that the jobs array contains only one job", + "pm.test(\"Jobs array should contain only one job\", function () {", + " pm.expect(response.entity.jobs.length).to.eql(1);", + "});", + "", + "// Validate that the job ID in the response matches the expected job ID", + "pm.test(\"Job ID should match expected job ID\", function () {", + " pm.expect(response.entity.jobs[0].id).to.eql(jobId);", + "});" ], "type": "text/javascript", "packages": {} @@ -1465,83 +1507,50 @@ "response": [] }, { - "name": "List Jobs Expect Fail and Cancelled", + "name": "List Jobs Expect Fail, Completed and Cancelled", "event": [ { "listen": "test", "script": { "exec": [ - "// Configuration", - "const maxRetries = 5; // Maximum number of retries", - "const retryDelay = 2000; // Delay between retries in milliseconds", - "", - "// Get server URL from environment variables", - "const serverURL = pm.environment.get('serverURL') || pm.collectionVariables.get('baseUrl'); // Use baseURL as fallback if serverURL is not defined", - "", - "// Define the URL to check job statuses", - "const checkUrl = `${serverURL}/api/v1/jobs`;", - "", - "// Function to check if there are jobs in \"FAILED\" or \"CANCELED\" state", - "function checkJobState(retriesLeft) {", - "", - " console.log(\"Checking jobs URL: \" + checkUrl);", - " const token = pm.collectionVariables.get(\"jwt\");", - " // Send a GET request to get the job statuses", - " pm.sendRequest({", - " url: checkUrl,", - " method: 'GET',", - " header: {", - " 'Authorization': `Bearer ${token}`, // Add Bearer token in Authorization header", - " 'Content-Type': 'application/json'", - " }", - " }, function (err, response) {", - " if (err) {", - " console.error(\"Error retrieving job statuses:\", err);", - " return;", - " }", - "", - " let jsonData = response.json();", - " let jobs = jsonData.entity.jobs;", - "", - " if (jobs.length > 0) {", - " // Check if there are jobs with \"FAILED\" and \"CANCELED\" status", - " const hasFailed = jobs.some(job => job.state === \"FAILED\");", - " const hasCanceled = jobs.some(job => job.state === \"CANCELED\");", - "", - " // Postman test to validate that there are jobs with \"FAILED\" statuses", - " pm.test(\"There are jobs in 'FAILED' state\", function () {", - " pm.expect(hasFailed).to.be.true; ", - " });", - "", - " // Postman test to validate that there are jobs with \"CANCELED\" statuses", - " pm.test(\"There are jobs in 'CANCELED' state\", function () { ", - " pm.expect(hasCanceled).to.be.true;", - " });", - "", - " if (hasFailed && hasCanceled) {", - " console.log(\"Found jobs in 'FAILED' and 'CANCELED' state.\");", - " } else {", - " console.warn(\"Did not find jobs in both 'FAILED' and 'CANCELED' states.\");", - " }", - " } else if (retriesLeft > 0) {", - " // If no jobs are found and retries are left, wait and retry", - " console.log(\"No jobs available, retries left: \" + retriesLeft);", - " setTimeout(function () {", - " checkJobState(retriesLeft - 1);", - " }, retryDelay);", - " } else {", - " // If no jobs are found and no retries are left", - " pm.test(\"Maximum retries reached, no jobs received.\", function () {", - " pm.expect(jobs.length).to.be.greaterThan(0); // This will fail if no jobs are found", - " });", - " console.warn(\"No jobs found after reaching maximum retries.\");", - " }", - " });", - "}", + "// Parse the response JSON", + "const response = pm.response.json();", "", - "// Start checking job statuses with the maximum number of retries", - "checkJobState(maxRetries);", - "" + "// Validate that the response status is 200 OK", + "pm.test(\"Response status is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "// Validate that the response contains an \"entity.jobs\" array", + "pm.test(\"Response should contain jobs array\", function () {", + " pm.expect(response.entity).to.have.property(\"jobs\");", + " pm.expect(response.entity.jobs).to.be.an(\"array\");", + "});", + "", + "// Validate that the jobs array contains jobs", + "pm.test(\"Jobs array should have data\", function () {", + " pm.expect(response.entity.jobs.length).to.eql(3);", + "});", + "", + "// Check if there are jobs with \"FAILED\" and \"CANCELED\" status", + "const hasFailed = response.entity.jobs.some(job => job.state === \"FAILED\");", + "const hasCanceled = response.entity.jobs.some(job => job.state === \"CANCELED\");", + "const hasCompleted = response.entity.jobs.some(job => job.state === \"COMPLETED\");", + "", + "// Postman test to validate that there are jobs with \"FAILED\" statuses", + "pm.test(\"There are jobs in 'FAILED' state\", function () {", + " pm.expect(hasFailed).to.be.true; ", + "});", + "", + "// Postman test to validate that there are jobs with \"CANCELED\" statuses", + "pm.test(\"There are jobs in 'CANCELED' state\", function () { ", + " pm.expect(hasCanceled).to.be.true;", + "});", + "", + "// Postman test to validate that there are jobs with \"COMPLETED\" statuses", + "pm.test(\"There are jobs in 'COMPLETED' state\", function () { ", + " pm.expect(hasCompleted).to.be.true;", + "});" ], "type": "text/javascript", "packages": {} @@ -1693,6 +1702,14 @@ { "key": "jwt", "value": "" + }, + { + "key": "successJobId", + "value": "" + }, + { + "key": "failingJobId", + "value": "" } ] } \ No newline at end of file