From 2ed0951e28fd0c267aa5e07286cffa8aa210cfe3 Mon Sep 17 00:00:00 2001 From: Sep Taheri Date: Wed, 5 Feb 2025 14:25:57 +0000 Subject: [PATCH 01/12] first commit --- .gitignore | 1 + gradle.properties | 2 +- .../com/transferwise/tasks/ITasksService.java | 13 ++++++ .../com/transferwise/tasks/TasksService.java | 43 +++++++++++++++++++ .../tasks/entrypoints/EntryPointsNames.java | 1 + .../tasks/helpers/CoreMetricsTemplate.java | 12 ++++++ .../tasks/helpers/ICoreMetricsTemplate.java | 5 +++ 7 files changed, 76 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 15cd1813..ca1ea9ed 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ build .idea /demoapp/src/main/resources/application-rds.yml +*.iml \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 64974ca5..5493b308 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ -version=1.49.0 +version=1.49.1 org.gradle.internal.http.socketTimeout=120000 diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java index 065b618c..180ec143 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java @@ -151,4 +151,17 @@ enum TasksProcessingState { STARTED, STOPPED, STOP_IN_PROGRESS } + /** + * Cancels a task in WAITING state. + * + *

If the task is not found or not in WAITING state, false is returned. + */ + boolean cancelTask(CancelTaskRequest request); + + @Data + @Accessors(chain = true) + class CancelTaskRequest { + private UUID taskId; + private long version; + } } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java index c32b8077..d9e8bf8d 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java @@ -289,6 +289,49 @@ public ITasksService.TasksProcessingState getTasksProcessingState(String bucketI return tasksExecutionTriggerer.getTasksProcessingState(bucketId); } + + @Override + @EntryPoint(usesExisting = true) + @Transactional(rollbackFor = Exception.class) + public boolean cancelTask(CancelTaskRequest request) { + return entryPointsHelper.continueOrCreate(EntryPointsGroups.TW_TASKS_ENGINE, EntryPointsNames.CANCEL_TASK, + () -> { + UUID taskId = request.getTaskId(); + mdcService.put(request.getTaskId(), request.getVersion()); + + FullTaskRecord task = taskDao.getTask(taskId, FullTaskRecord.class); + + if (task == null) { + log.debug("Cannot cancel task '" + taskId + "' as it was not found."); + return false; + } + + mdcService.put(task); + + long version = task.getVersion(); + + if (version != request.getVersion()) { + coreMetricsTemplate.registerTaskCancelled(null, task.getType()); + log.debug("Expected version " + request.getVersion() + " does not match " + version + "."); + return false; + } + + if (task.getStatus().equals(TaskStatus.WAITING.name())) { + if (!taskDao.deleteTask(taskId, version)) { + coreMetricsTemplate.registerTaskCancelledFailure(null, task.getType()); + return false; + } else { + coreMetricsTemplate.registerTaskCancelled(null, task.getType()); + return true; + } + } + + coreMetricsTemplate.registerTaskCancelledFailure(null, task.getType()); + return false; + }); + } + + /** * We register an after commit hook here, so as soon as transaction has finished, the task will be triggered. * diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsNames.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsNames.java index e95a6e15..52c63dd7 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsNames.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsNames.java @@ -17,6 +17,7 @@ public final class EntryPointsNames { public static final String ASYNC_HANDLE_SUCCESS = "asyncHandleSuccess"; public static final String ASYNC_HANDLE_FAIL = "asyncHandleFail"; public static final String RESCHEDULE_TASK = "rescheduleTask"; + public static final String CANCEL_TASK = "cancelTask"; public static final String GET_TASK = "getTask"; private EntryPointsNames() { diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java index ee96331d..3c4f5720 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java @@ -54,6 +54,8 @@ public class CoreMetricsTemplate implements ICoreMetricsTemplate { public static final String METRIC_TASKS_RESUMINGS_COUNT = METRIC_PREFIX + "tasks.resumingsCount"; public static final String METRIC_TASKS_MARKED_AS_FAILED_COUNT = METRIC_PREFIX + "tasks.markedAsFailedCount"; public static final String METRIC_TASKS_RESCHEDULED_COUNT = METRIC_PREFIX + "tasks.rescheduledCount"; + public static final String METRIC_TASKS_CANCELLED_COUNT = METRIC_PREFIX + "tasks.cancelledCount"; + public static final String METRIC_TASKS_FAILED_CANCELLATION_COUNT = METRIC_PREFIX + "tasks.failedCancellationCount"; public static final String METRIC_TASKS_FAILED_NEXT_EVENT_TIME_CHANGE_COUNT = METRIC_PREFIX + "tasks.failedNextEventTimeChangeCount"; public static final String METRIC_TASKS_ADDINGS_COUNT = METRIC_PREFIX + "task.addings.count"; public static final String GAUGE_TASKS_SERVICE_IN_PROGRESS_TRIGGERINGS_COUNT = METRIC_PREFIX + "tasksService.inProgressTriggeringsCount"; @@ -534,4 +536,14 @@ protected static class MetricHandle { private Meter meter; } + + public void registerTaskCancelled(String bucketId, String taskType) { + meterCache.counter(METRIC_TASKS_CANCELLED_COUNT, TagsSet.of(TAG_BUCKET_ID, resolveBucketId(bucketId), TAG_TASK_TYPE, taskType)) + .increment(); + } + + public void registerTaskCancelledFailure(String bucketId, String taskType) { + meterCache.counter(METRIC_TASKS_FAILED_CANCELLATION_COUNT, TagsSet.of(TAG_BUCKET_ID, resolveBucketId(bucketId), TAG_TASK_TYPE, taskType)) + .increment(); + } } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java index a11464d3..80ca6ef0 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java @@ -126,4 +126,9 @@ void registerTasksCleanerTasksDeletion(TaskStatus status, int deletableTasksCoun @SuppressWarnings("rawtypes") void registerKafkaProducer(Producer producer); + + void registerTaskCancelled(String bucketId, String taskType); + + void registerTaskCancelledFailure(String bucketId, String taskType); + } From b1b4a5317abee87669b05ed8a5a3fb0d38551e16 Mon Sep 17 00:00:00 2001 From: Sep Taheri Date: Wed, 5 Feb 2025 14:39:25 +0000 Subject: [PATCH 02/12] Rename to delete --- .../java/com/transferwise/tasks/ITasksService.java | 8 ++++---- .../java/com/transferwise/tasks/TasksService.java | 14 +++++++------- .../tasks/entrypoints/EntryPointsNames.java | 2 +- .../tasks/helpers/CoreMetricsTemplate.java | 12 ++++++------ .../tasks/helpers/ICoreMetricsTemplate.java | 4 ++-- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java index 180ec143..deedabd3 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java @@ -152,15 +152,15 @@ enum TasksProcessingState { } /** - * Cancels a task in WAITING state. + * Delete a task * - *

If the task is not found or not in WAITING state, false is returned. + *

If the task is not found, false is returned. */ - boolean cancelTask(CancelTaskRequest request); + boolean deleteTask(DeleteTaskRequest request); @Data @Accessors(chain = true) - class CancelTaskRequest { + class DeleteTaskRequest { private UUID taskId; private long version; } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java index d9e8bf8d..1879a193 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java @@ -293,8 +293,8 @@ public ITasksService.TasksProcessingState getTasksProcessingState(String bucketI @Override @EntryPoint(usesExisting = true) @Transactional(rollbackFor = Exception.class) - public boolean cancelTask(CancelTaskRequest request) { - return entryPointsHelper.continueOrCreate(EntryPointsGroups.TW_TASKS_ENGINE, EntryPointsNames.CANCEL_TASK, + public boolean deleteTask(deleteTaskRequest request) { + return entryPointsHelper.continueOrCreate(EntryPointsGroups.TW_TASKS_ENGINE, EntryPointsNames.DELETE_TASK, () -> { UUID taskId = request.getTaskId(); mdcService.put(request.getTaskId(), request.getVersion()); @@ -302,7 +302,7 @@ public boolean cancelTask(CancelTaskRequest request) { FullTaskRecord task = taskDao.getTask(taskId, FullTaskRecord.class); if (task == null) { - log.debug("Cannot cancel task '" + taskId + "' as it was not found."); + log.debug("Cannot delete task '" + taskId + "' as it was not found."); return false; } @@ -311,22 +311,22 @@ public boolean cancelTask(CancelTaskRequest request) { long version = task.getVersion(); if (version != request.getVersion()) { - coreMetricsTemplate.registerTaskCancelled(null, task.getType()); + coreMetricsTemplate.registerTaskDeleted(null, task.getType()); log.debug("Expected version " + request.getVersion() + " does not match " + version + "."); return false; } if (task.getStatus().equals(TaskStatus.WAITING.name())) { if (!taskDao.deleteTask(taskId, version)) { - coreMetricsTemplate.registerTaskCancelledFailure(null, task.getType()); + coreMetricsTemplate.registerTaskDeletedFailure(null, task.getType()); return false; } else { - coreMetricsTemplate.registerTaskCancelled(null, task.getType()); + coreMetricsTemplate.registerTaskDeleted(null, task.getType()); return true; } } - coreMetricsTemplate.registerTaskCancelledFailure(null, task.getType()); + coreMetricsTemplate.registerTaskDeletedFailure(null, task.getType()); return false; }); } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsNames.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsNames.java index 52c63dd7..3f910e14 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsNames.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsNames.java @@ -17,7 +17,7 @@ public final class EntryPointsNames { public static final String ASYNC_HANDLE_SUCCESS = "asyncHandleSuccess"; public static final String ASYNC_HANDLE_FAIL = "asyncHandleFail"; public static final String RESCHEDULE_TASK = "rescheduleTask"; - public static final String CANCEL_TASK = "cancelTask"; + public static final String DELETE_TASK = "deleteTask"; public static final String GET_TASK = "getTask"; private EntryPointsNames() { diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java index 3c4f5720..0cb4b32f 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java @@ -54,8 +54,8 @@ public class CoreMetricsTemplate implements ICoreMetricsTemplate { public static final String METRIC_TASKS_RESUMINGS_COUNT = METRIC_PREFIX + "tasks.resumingsCount"; public static final String METRIC_TASKS_MARKED_AS_FAILED_COUNT = METRIC_PREFIX + "tasks.markedAsFailedCount"; public static final String METRIC_TASKS_RESCHEDULED_COUNT = METRIC_PREFIX + "tasks.rescheduledCount"; - public static final String METRIC_TASKS_CANCELLED_COUNT = METRIC_PREFIX + "tasks.cancelledCount"; - public static final String METRIC_TASKS_FAILED_CANCELLATION_COUNT = METRIC_PREFIX + "tasks.failedCancellationCount"; + public static final String METRIC_TASKS_DELETED_COUNT = METRIC_PREFIX + "tasks.deletedCount"; + public static final String METRIC_TASKS_FAILED_DELETION_COUNT = METRIC_PREFIX + "tasks.failedDeletionCount"; public static final String METRIC_TASKS_FAILED_NEXT_EVENT_TIME_CHANGE_COUNT = METRIC_PREFIX + "tasks.failedNextEventTimeChangeCount"; public static final String METRIC_TASKS_ADDINGS_COUNT = METRIC_PREFIX + "task.addings.count"; public static final String GAUGE_TASKS_SERVICE_IN_PROGRESS_TRIGGERINGS_COUNT = METRIC_PREFIX + "tasksService.inProgressTriggeringsCount"; @@ -537,13 +537,13 @@ protected static class MetricHandle { } - public void registerTaskCancelled(String bucketId, String taskType) { - meterCache.counter(METRIC_TASKS_CANCELLED_COUNT, TagsSet.of(TAG_BUCKET_ID, resolveBucketId(bucketId), TAG_TASK_TYPE, taskType)) + public void registerTaskDeleted(String bucketId, String taskType) { + meterCache.counter(METRIC_TASKS_DELETED_COUNT, TagsSet.of(TAG_BUCKET_ID, resolveBucketId(bucketId), TAG_TASK_TYPE, taskType)) .increment(); } - public void registerTaskCancelledFailure(String bucketId, String taskType) { - meterCache.counter(METRIC_TASKS_FAILED_CANCELLATION_COUNT, TagsSet.of(TAG_BUCKET_ID, resolveBucketId(bucketId), TAG_TASK_TYPE, taskType)) + public void registerTaskDeletionFailure(String bucketId, String taskType) { + meterCache.counter(METRIC_TASKS_FAILED_DELETION_COUNT, TagsSet.of(TAG_BUCKET_ID, resolveBucketId(bucketId), TAG_TASK_TYPE, taskType)) .increment(); } } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java index 80ca6ef0..8466dda8 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java @@ -127,8 +127,8 @@ void registerTasksCleanerTasksDeletion(TaskStatus status, int deletableTasksCoun @SuppressWarnings("rawtypes") void registerKafkaProducer(Producer producer); - void registerTaskCancelled(String bucketId, String taskType); + void registerTaskDeleted(String bucketId, String taskType); - void registerTaskCancelledFailure(String bucketId, String taskType); + void registerTaskDeletedFailure(String bucketId, String taskType); } From d14cc9321ac2acd7ff737b4c447659d2a0e19f4a Mon Sep 17 00:00:00 2001 From: Sep Taheri Date: Wed, 5 Feb 2025 17:38:00 +0000 Subject: [PATCH 03/12] add cancellation --- .../db/changelog/mysql/V1.0__initialize.sql | 2 +- .../testapp/TasksManagementPortIntTest.java | 2 +- .../com/transferwise/tasks/ITasksService.java | 14 ++++++ .../com/transferwise/tasks/TasksService.java | 47 +++++++++++++++++-- .../tasks/cleaning/TasksCleaner.java | 2 +- .../transferwise/tasks/domain/TaskStatus.java | 1 + .../tasks/entrypoints/EntryPointsNames.java | 1 + .../tasks/helpers/CoreMetricsTemplate.java | 12 +++++ .../tasks/helpers/ICoreMetricsTemplate.java | 6 ++- .../db.tw-tasks-mysql-random-uuids.xml | 2 +- .../db/changelog/db.tw-tasks-mysql.xml | 2 +- 11 files changed, 82 insertions(+), 9 deletions(-) diff --git a/demoapp/src/main/resources/db/changelog/mysql/V1.0__initialize.sql b/demoapp/src/main/resources/db/changelog/mysql/V1.0__initialize.sql index 4ff07a8f..d0fffe7d 100644 --- a/demoapp/src/main/resources/db/changelog/mysql/V1.0__initialize.sql +++ b/demoapp/src/main/resources/db/changelog/mysql/V1.0__initialize.sql @@ -1,7 +1,7 @@ CREATE TABLE tw_task ( id BINARY(16) PRIMARY KEY NOT NULL, - status ENUM('NEW', 'WAITING', 'SUBMITTED', 'PROCESSING', 'DONE', 'ERROR', 'FAILED'), + status ENUM('NEW', 'WAITING', 'SUBMITTED', 'PROCESSING', 'DONE', 'ERROR', 'FAILED', 'CANCELLED'), -- Microsecond precision (6) is strongly recommended here to reduce the chance of gap locks deadlocking on tw_task_idx1 next_event_time DATETIME(6) NOT NULL, state_time DATETIME(3) NOT NULL, diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TasksManagementPortIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TasksManagementPortIntTest.java index 3bf4ae97..ad20d9fb 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TasksManagementPortIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TasksManagementPortIntTest.java @@ -85,7 +85,7 @@ void erroneousTasksWillBeCorrectlyFound() { } @Test - void crackerCantGetErronouseTasks() { + void crackerCantGetErroneousTasks() { ResponseEntity response = badEngineerTemplate().postForEntity("/v1/twTasks/getTasksInError", new ITasksManagementPort.GetTasksInErrorRequest().setMaxCount(1), GetTasksInErrorResponse.class ); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java index deedabd3..0ab16582 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java @@ -164,4 +164,18 @@ class DeleteTaskRequest { private UUID taskId; private long version; } + + /** + * Cancel a task + * + *

If the task is not found, false is returned. + */ + boolean cancelTask(CancelTaskRequest request); + + @Data + @Accessors(chain = true) + class CancelTaskRequest { + private UUID taskId; + private long version; + } } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java index 1879a193..b743294f 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java @@ -293,7 +293,7 @@ public ITasksService.TasksProcessingState getTasksProcessingState(String bucketI @Override @EntryPoint(usesExisting = true) @Transactional(rollbackFor = Exception.class) - public boolean deleteTask(deleteTaskRequest request) { + public boolean deleteTask(DeleteTaskRequest request) { return entryPointsHelper.continueOrCreate(EntryPointsGroups.TW_TASKS_ENGINE, EntryPointsNames.DELETE_TASK, () -> { UUID taskId = request.getTaskId(); @@ -318,7 +318,7 @@ public boolean deleteTask(deleteTaskRequest request) { if (task.getStatus().equals(TaskStatus.WAITING.name())) { if (!taskDao.deleteTask(taskId, version)) { - coreMetricsTemplate.registerTaskDeletedFailure(null, task.getType()); + coreMetricsTemplate.registerTaskDeletionFailure(null, task.getType()); return false; } else { coreMetricsTemplate.registerTaskDeleted(null, task.getType()); @@ -326,12 +326,53 @@ public boolean deleteTask(deleteTaskRequest request) { } } - coreMetricsTemplate.registerTaskDeletedFailure(null, task.getType()); + coreMetricsTemplate.registerTaskDeletionFailure(null, task.getType()); return false; }); } + @Override + @EntryPoint(usesExisting = true) + @Transactional(rollbackFor = Exception.class) + public boolean cancelTask(CancelTaskRequest request) { + return entryPointsHelper.continueOrCreate(EntryPointsGroups.TW_TASKS_ENGINE, EntryPointsNames.CANCEL_TASK, + () -> { + UUID taskId = request.getTaskId(); + mdcService.put(request.getTaskId(), request.getVersion()); + + FullTaskRecord task = taskDao.getTask(taskId, FullTaskRecord.class); + + if (task == null) { + log.debug("Cannot cancel task '" + taskId + "' as it was not found."); + return false; + } + + mdcService.put(task); + + long version = task.getVersion(); + + if (version != request.getVersion()) { + coreMetricsTemplate.registerTaskCancelled(null, task.getType()); + log.debug("Expected version " + request.getVersion() + " does not match " + version + "."); + return false; + } + + if (task.getStatus().equals(TaskStatus.WAITING.name())) { + if (!taskDao.setStatus(taskId, TaskStatus.CANCELLED, version)) { + coreMetricsTemplate.registerTaskCancellationFailure(null, task.getType()); + return false; + } else { + coreMetricsTemplate.registerTaskCancelled(null, task.getType()); + return true; + } + } + + coreMetricsTemplate.registerTaskCancellationFailure(null, task.getType()); + return false; + }); + } + /** * We register an after commit hook here, so as soon as transaction has finished, the task will be triggered. * diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/cleaning/TasksCleaner.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/cleaning/TasksCleaner.java index e38689ad..57f9b757 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/cleaning/TasksCleaner.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/cleaning/TasksCleaner.java @@ -49,7 +49,7 @@ public class TasksCleaner implements ITasksCleaner, GracefulShutdownStrategy, In public void afterPropertiesSet() { String nodePath = "/tw/tw_tasks/" + tasksProperties.getGroupId() + "/tasks_cleaner"; - TaskStatus[] statuses = {TaskStatus.DONE, TaskStatus.FAILED}; + TaskStatus[] statuses = {TaskStatus.DONE, TaskStatus.FAILED, TaskStatus.CANCELLED}; for (TaskStatus status : statuses) { DeletableStatus deletableStatus = new DeletableStatus(); deletableStatus.status = status; diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskStatus.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskStatus.java index 29079327..20c5713f 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskStatus.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskStatus.java @@ -8,5 +8,6 @@ public enum TaskStatus { DONE, ERROR, // Generates alerts FAILED, // ERROR is acked, alerts are off + CANCELLED, // User requested cancellation, no alerts UNKNOWN // For metrics, if getting task status is too expensive } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsNames.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsNames.java index 3f910e14..111de27d 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsNames.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsNames.java @@ -18,6 +18,7 @@ public final class EntryPointsNames { public static final String ASYNC_HANDLE_FAIL = "asyncHandleFail"; public static final String RESCHEDULE_TASK = "rescheduleTask"; public static final String DELETE_TASK = "deleteTask"; + public static final String CANCEL_TASK = "cancelTask"; public static final String GET_TASK = "getTask"; private EntryPointsNames() { diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java index 0cb4b32f..80ee12bd 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java @@ -56,6 +56,8 @@ public class CoreMetricsTemplate implements ICoreMetricsTemplate { public static final String METRIC_TASKS_RESCHEDULED_COUNT = METRIC_PREFIX + "tasks.rescheduledCount"; public static final String METRIC_TASKS_DELETED_COUNT = METRIC_PREFIX + "tasks.deletedCount"; public static final String METRIC_TASKS_FAILED_DELETION_COUNT = METRIC_PREFIX + "tasks.failedDeletionCount"; + public static final String METRIC_TASKS_CANCELLED_COUNT = METRIC_PREFIX + "tasks.cancelledCount"; + public static final String METRIC_TASKS_FAILED_CANCELLATION_COUNT = METRIC_PREFIX + "tasks.failedCancellationCount"; public static final String METRIC_TASKS_FAILED_NEXT_EVENT_TIME_CHANGE_COUNT = METRIC_PREFIX + "tasks.failedNextEventTimeChangeCount"; public static final String METRIC_TASKS_ADDINGS_COUNT = METRIC_PREFIX + "task.addings.count"; public static final String GAUGE_TASKS_SERVICE_IN_PROGRESS_TRIGGERINGS_COUNT = METRIC_PREFIX + "tasksService.inProgressTriggeringsCount"; @@ -546,4 +548,14 @@ public void registerTaskDeletionFailure(String bucketId, String taskType) { meterCache.counter(METRIC_TASKS_FAILED_DELETION_COUNT, TagsSet.of(TAG_BUCKET_ID, resolveBucketId(bucketId), TAG_TASK_TYPE, taskType)) .increment(); } + + public void registerTaskCancelled(String bucketId, String taskType) { + meterCache.counter(METRIC_TASKS_CANCELLED_COUNT, TagsSet.of(TAG_BUCKET_ID, resolveBucketId(bucketId), TAG_TASK_TYPE, taskType)) + .increment(); + } + + public void registerTaskCancellationFailure(String bucketId, String taskType) { + meterCache.counter(METRIC_TASKS_FAILED_CANCELLATION_COUNT, TagsSet.of(TAG_BUCKET_ID, resolveBucketId(bucketId), TAG_TASK_TYPE, taskType)) + .increment(); + } } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java index 8466dda8..f54832a4 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java @@ -129,6 +129,10 @@ void registerTasksCleanerTasksDeletion(TaskStatus status, int deletableTasksCoun void registerTaskDeleted(String bucketId, String taskType); - void registerTaskDeletedFailure(String bucketId, String taskType); + void registerTaskDeletionFailure(String bucketId, String taskType); + + void registerTaskCancelled(String bucketId, String taskType); + + void registerTaskCancellationFailure(String bucketId, String taskType); } diff --git a/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql-random-uuids.xml b/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql-random-uuids.xml index d0a59aac..8d29acc1 100644 --- a/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql-random-uuids.xml +++ b/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql-random-uuids.xml @@ -15,7 +15,7 @@ CREATE TABLE tw_task ( aid BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT, id BINARY(16) NOT NULL, - status ENUM('NEW', 'WAITING', 'SUBMITTED', 'PROCESSING', 'DONE', 'ERROR', 'FAILED'), + status ENUM('NEW', 'WAITING', 'SUBMITTED', 'PROCESSING', 'DONE', 'ERROR', 'FAILED', 'CANCELLED'), -- Microsecond precision (6) is strongly recommended here to reduce the chance of gap locks deadlocking on tw_task_idx1 next_event_time DATETIME(6) NOT NULL, state_time DATETIME(3) NOT NULL, diff --git a/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql.xml b/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql.xml index 00628713..a46b90a0 100644 --- a/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql.xml +++ b/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql.xml @@ -9,7 +9,7 @@ CREATE TABLE tw_task ( id BINARY(16) PRIMARY KEY NOT NULL, - status ENUM('NEW', 'WAITING', 'SUBMITTED', 'PROCESSING', 'DONE', 'ERROR', 'FAILED'), + status ENUM('NEW', 'WAITING', 'SUBMITTED', 'PROCESSING', 'DONE', 'ERROR', 'FAILED', 'CANCELLED'), -- Microsecond precision (6) is strongly recommended here to reduce the chance of gap locks deadlocking on tw_task_idx1 next_event_time DATETIME(6) NOT NULL, state_time DATETIME(6) NOT NULL, From 708c18aa46c27c61cd2559ff03ae3ed78c2fb275 Mon Sep 17 00:00:00 2001 From: Sep Taheri Date: Thu, 6 Feb 2025 13:11:19 +0000 Subject: [PATCH 04/12] add tests --- .../testapp/TaskCancellationIntTest.java | 177 ++++++++++++++++++ .../tasks/testapp/TaskDeletionIntTest.java | 177 ++++++++++++++++++ .../com/transferwise/tasks/TasksService.java | 17 +- .../tasks/helpers/CoreMetricsTemplate.java | 2 +- 4 files changed, 361 insertions(+), 12 deletions(-) create mode 100644 integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskCancellationIntTest.java create mode 100644 integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskCancellationIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskCancellationIntTest.java new file mode 100644 index 00000000..2c969aa6 --- /dev/null +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskCancellationIntTest.java @@ -0,0 +1,177 @@ +package com.transferwise.tasks.testapp; + +import static com.transferwise.tasks.domain.TaskStatus.WAITING; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.transferwise.common.baseutils.UuidUtils; +import com.transferwise.tasks.BaseIntTest; +import com.transferwise.tasks.ITaskDataSerializer; +import com.transferwise.tasks.ITasksService; +import com.transferwise.tasks.ITasksService.GetTaskRequest; +import com.transferwise.tasks.dao.ITaskDao; +import com.transferwise.tasks.domain.Task; +import com.transferwise.tasks.domain.TaskStatus; +import com.transferwise.tasks.test.ITestTasksService; +import io.micrometer.core.instrument.Counter; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.springframework.beans.factory.annotation.Autowired; + +@Slf4j +public class TaskCancellationIntTest extends BaseIntTest { + + @Autowired + private ITasksService tasksService; + @Autowired + private ITestTasksService testTasksService; + @Autowired + private ITaskDataSerializer taskDataSerializer; + @Autowired + private ITaskDao taskDao; + + @BeforeEach + void setup() { + transactionsHelper.withTransaction().asNew().call(() -> { + testTasksService.reset(); + return null; + }); + } + + @Test + void taskCanBeSuccessfullyCancelled() { + testTaskHandlerAdapter.setProcessor(resultRegisteringSyncTaskProcessor); + UUID taskId = UuidUtils.generatePrefixCombUuid(); + + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.addTask(new ITasksService.AddTaskRequest() + .setTaskId(taskId) + .setData(taskDataSerializer.serialize("I want to be cancelled")) + .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(1))) + ); + + await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty()); + + var task = tasksService.getTask(new GetTaskRequest().setTaskId(taskId)); + + assertTrue(transactionsHelper.withTransaction().asNew().call(() -> + tasksService.cancelTask( + new ITasksService.CancelTaskRequest() + .setTaskId(taskId) + .setVersion(task.getVersion()) + )) + ); + + await().until(() -> testTasksService.getTasks("test", null, WAITING).isEmpty()); + await().until(() -> resultRegisteringSyncTaskProcessor.getTaskResults().get(taskId) != null); + assertEquals(0, getFailedCancellationCount()); + assertEquals(1, getTaskCancelledCount()); + } + + @Test + void taskWillNotBeCancelledIfVersionHasAlreadyChanged() { + testTaskHandlerAdapter.setProcessor(resultRegisteringSyncTaskProcessor); + final long initialFailedNextEventTimeChangeCount = getFailedCancellationCount(); + final UUID taskId = UuidUtils.generatePrefixCombUuid(); + + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.addTask(new ITasksService.AddTaskRequest() + .setTaskId(taskId) + .setData(taskDataSerializer.serialize("I want to be cancelled too!")) + .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(1))) + ); + + await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty()); + + var task = tasksService.getTask(new GetTaskRequest().setTaskId(taskId)); + + assertFalse( + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.cancelTask( + new ITasksService.CancelTaskRequest() + .setTaskId(taskId) + .setVersion(task.getVersion() - 1) + ) + ) + ); + assertEquals(initialFailedNextEventTimeChangeCount + 1, getFailedCancellationCount()); + assertEquals(0, getTaskCancelledCount()); + } + + @ParameterizedTest + @EnumSource(value = TaskStatus.class, + names = {"WAITING", "UNKNOWN"}, + mode = EnumSource.Mode.EXCLUDE) + void taskWillNotBeCancelledIfNotWaiting(TaskStatus status) { + testTaskHandlerAdapter.setProcessor(resultRegisteringSyncTaskProcessor); + final long initialFailedNextEventTimeChangeCount = getFailedCancellationCount(); + final UUID taskId = UuidUtils.generatePrefixCombUuid(); + + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.addTask(new ITasksService.AddTaskRequest() + .setTaskId(taskId) + .setData(taskDataSerializer.serialize("I do not want to be cancelled!")) + .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(2))) + ); + + await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty()); + List tasks = testTasksService.getWaitingTasks("test", null); + Task task = tasks.stream().filter(t -> t.getId().equals(taskId)).findFirst().orElseThrow(); + + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.resumeTask(new ITasksService.ResumeTaskRequest().setTaskId(taskId).setVersion(task.getVersion())) + ); + + await().until(() -> testTasksService.getWaitingTasks("test", null).isEmpty()); + + var updateTask = tasksService.getTask(new GetTaskRequest().setTaskId(taskId)); + + taskDao.setStatus(taskId, status, updateTask.getVersion()); + + var finalTask = tasksService.getTask(new GetTaskRequest().setTaskId(taskId)); + + assertFalse( + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.cancelTask( + new ITasksService.CancelTaskRequest() + .setTaskId(taskId) + .setVersion(finalTask.getVersion()) + ) + ) + ); + assertEquals(initialFailedNextEventTimeChangeCount + 1, getFailedCancellationCount()); + assertEquals(0, getTaskCancelledCount()); + } + + private long getFailedCancellationCount() { + Counter counter = meterRegistry.find("twTasks.tasks.failedCancellationCount").tags( + "taskType", "test" + ).counter(); + + if (counter == null) { + return 0; + } else { + return (long) counter.count(); + } + } + + private long getTaskCancelledCount() { + Counter counter = meterRegistry.find("twTasks.tasks.cancelledCount").tags( + "taskType", "test" + ).counter(); + + if (counter == null) { + return 0; + } else { + return (long) counter.count(); + } + } +} diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java new file mode 100644 index 00000000..43ed36e5 --- /dev/null +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java @@ -0,0 +1,177 @@ +package com.transferwise.tasks.testapp; + +import static com.transferwise.tasks.domain.TaskStatus.WAITING; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.transferwise.common.baseutils.UuidUtils; +import com.transferwise.tasks.BaseIntTest; +import com.transferwise.tasks.ITaskDataSerializer; +import com.transferwise.tasks.ITasksService; +import com.transferwise.tasks.ITasksService.GetTaskRequest; +import com.transferwise.tasks.dao.ITaskDao; +import com.transferwise.tasks.domain.Task; +import com.transferwise.tasks.domain.TaskStatus; +import com.transferwise.tasks.test.ITestTasksService; +import io.micrometer.core.instrument.Counter; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.springframework.beans.factory.annotation.Autowired; + +@Slf4j +public class TaskDeletionIntTest extends BaseIntTest { + + @Autowired + private ITasksService tasksService; + @Autowired + private ITestTasksService testTasksService; + @Autowired + private ITaskDataSerializer taskDataSerializer; + @Autowired + private ITaskDao taskDao; + + @BeforeEach + void setup() { + transactionsHelper.withTransaction().asNew().call(() -> { + testTasksService.reset(); + return null; + }); + } + + @Test + void taskCanBeSuccessfullyDeleted() { + testTaskHandlerAdapter.setProcessor(resultRegisteringSyncTaskProcessor); + UUID taskId = UuidUtils.generatePrefixCombUuid(); + + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.addTask(new ITasksService.AddTaskRequest() + .setTaskId(taskId) + .setData(taskDataSerializer.serialize("I want to be deleted")) + .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(1))) + ); + + await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty()); + + var task = tasksService.getTask(new GetTaskRequest().setTaskId(taskId)); + + assertTrue(transactionsHelper.withTransaction().asNew().call(() -> + tasksService.deleteTask( + new ITasksService.DeleteTaskRequest() + .setTaskId(taskId) + .setVersion(task.getVersion()) + ) + )); + + await().until(() -> testTasksService.getTasks("test", null, WAITING).isEmpty()); + await().until(() -> resultRegisteringSyncTaskProcessor.getTaskResults().get(taskId) != null); + assertEquals(0, getFailedDeletionCount()); + assertEquals(1, getTaskDeletedCount()); + } + + @Test + void taskWillNotBeDeletedIfVersionHasAlreadyChanged() { + testTaskHandlerAdapter.setProcessor(resultRegisteringSyncTaskProcessor); + final long initialFailedNextEventTimeChangeCount = getFailedDeletionCount(); + final UUID taskId = UuidUtils.generatePrefixCombUuid(); + + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.addTask(new ITasksService.AddTaskRequest() + .setTaskId(taskId) + .setData(taskDataSerializer.serialize("I want to be deleted too!")) + .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(1))) + ); + + await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty()); + + var task = tasksService.getTask(new GetTaskRequest().setTaskId(taskId)); + + assertFalse( + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.deleteTask( + new ITasksService.DeleteTaskRequest() + .setTaskId(taskId) + .setVersion(task.getVersion() - 1) + ) + ) + ); + assertEquals(initialFailedNextEventTimeChangeCount + 1, getFailedDeletionCount()); + assertEquals(0, getTaskDeletedCount()); + } + + @ParameterizedTest + @EnumSource(value = TaskStatus.class, + names = {"WAITING", "UNKNOWN"}, + mode = EnumSource.Mode.EXCLUDE) + void taskWillBeDeletedIfWaiting(TaskStatus status) { + testTaskHandlerAdapter.setProcessor(resultRegisteringSyncTaskProcessor); + final long initialFailedNextEventTimeChangeCount = getFailedDeletionCount(); + final UUID taskId = UuidUtils.generatePrefixCombUuid(); + + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.addTask(new ITasksService.AddTaskRequest() + .setTaskId(taskId) + .setData(taskDataSerializer.serialize("I do want to be deleted!")) + .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(2))) + ); + + await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty()); + List tasks = testTasksService.getWaitingTasks("test", null); + Task task = tasks.stream().filter(t -> t.getId().equals(taskId)).findFirst().orElseThrow(); + + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.resumeTask(new ITasksService.ResumeTaskRequest().setTaskId(taskId).setVersion(task.getVersion())) + ); + + await().until(() -> testTasksService.getWaitingTasks("test", null).isEmpty()); + + var updateTask = tasksService.getTask(new GetTaskRequest().setTaskId(taskId)); + + taskDao.setStatus(taskId, status, updateTask.getVersion()); + + var finalTask = tasksService.getTask(new GetTaskRequest().setTaskId(taskId)); + + assertTrue( + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.deleteTask( + new ITasksService.DeleteTaskRequest() + .setTaskId(taskId) + .setVersion(finalTask.getVersion()) + ) + ) + ); + assertEquals(initialFailedNextEventTimeChangeCount + 1, getFailedDeletionCount()); + assertEquals(0, getTaskDeletedCount()); + } + + private long getFailedDeletionCount() { + Counter counter = meterRegistry.find("twTasks.tasks.failedDeletionCount").tags( + "taskType", "test" + ).counter(); + + if (counter == null) { + return 0; + } else { + return (long) counter.count(); + } + } + + private long getTaskDeletedCount() { + Counter counter = meterRegistry.find("twTasks.tasks.deletedCount").tags( + "taskType", "test" + ).counter(); + + if (counter == null) { + return 0; + } else { + return (long) counter.count(); + } + } +} diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java index b743294f..0939014a 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java @@ -316,18 +316,13 @@ public boolean deleteTask(DeleteTaskRequest request) { return false; } - if (task.getStatus().equals(TaskStatus.WAITING.name())) { - if (!taskDao.deleteTask(taskId, version)) { - coreMetricsTemplate.registerTaskDeletionFailure(null, task.getType()); - return false; - } else { - coreMetricsTemplate.registerTaskDeleted(null, task.getType()); - return true; - } + if (!taskDao.deleteTask(taskId, version)) { + coreMetricsTemplate.registerTaskDeletionFailure(null, task.getType()); + return false; + } else { + coreMetricsTemplate.registerTaskDeleted(null, task.getType()); + return true; } - - coreMetricsTemplate.registerTaskDeletionFailure(null, task.getType()); - return false; }); } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java index 80ee12bd..8555ec5e 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java @@ -54,9 +54,9 @@ public class CoreMetricsTemplate implements ICoreMetricsTemplate { public static final String METRIC_TASKS_RESUMINGS_COUNT = METRIC_PREFIX + "tasks.resumingsCount"; public static final String METRIC_TASKS_MARKED_AS_FAILED_COUNT = METRIC_PREFIX + "tasks.markedAsFailedCount"; public static final String METRIC_TASKS_RESCHEDULED_COUNT = METRIC_PREFIX + "tasks.rescheduledCount"; + public static final String METRIC_TASKS_CANCELLED_COUNT = METRIC_PREFIX + "tasks.cancelledCount"; public static final String METRIC_TASKS_DELETED_COUNT = METRIC_PREFIX + "tasks.deletedCount"; public static final String METRIC_TASKS_FAILED_DELETION_COUNT = METRIC_PREFIX + "tasks.failedDeletionCount"; - public static final String METRIC_TASKS_CANCELLED_COUNT = METRIC_PREFIX + "tasks.cancelledCount"; public static final String METRIC_TASKS_FAILED_CANCELLATION_COUNT = METRIC_PREFIX + "tasks.failedCancellationCount"; public static final String METRIC_TASKS_FAILED_NEXT_EVENT_TIME_CHANGE_COUNT = METRIC_PREFIX + "tasks.failedNextEventTimeChangeCount"; public static final String METRIC_TASKS_ADDINGS_COUNT = METRIC_PREFIX + "task.addings.count"; From 6206078e586393c206a96e5ef465752983e9a0c3 Mon Sep 17 00:00:00 2001 From: Sep Taheri Date: Thu, 6 Feb 2025 13:16:53 +0000 Subject: [PATCH 05/12] changelog --- CHANGELOG.md | 9 ++++++++- docs/support.md | 2 +- tw-tasks-kafka-listener/build.gradle | 2 +- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7316ab67..413c799d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 1.49.1 - 2025/02/07 + +### Changed + +- Added support to cancel tasks in progress and to delete tasks. + + ## 1.49.0 - 2025/01/08 ### Changed @@ -18,7 +25,7 @@ It is worth keeping an eye on: - changes to assignors used, log `Successfully synced group in generation Generation` - on assignment strategy failures on consumers in prod and [consumer state](https://dashboards.tw.ee/d/f7094f30-a509-4592-aced-37584a70132a/kafka-consumer-groups-and-lag-details-kminion?orgId=1&refresh=30s&viewPanel=14). -If you use `com.wise.kafka.assignors.CanaryAwareRangeAssignor`, consider setting this config: +If you use `com.wise.kafka.assignors.CanaryAwareRangeAssignor`, consider setting this config: ``` spring.kafka.consumer.properties.partition.assignment.strategy: com.wise.kafka.assignors.CanaryAwareRangeAssignor, org.apache.kafka.clients.consumer.RangeAssignor, org.apache.kafka.clients.consumer.CooperativeStickyAssignor diff --git a/docs/support.md b/docs/support.md index 9a4ba505..b312e5b8 100644 --- a/docs/support.md +++ b/docs/support.md @@ -1,7 +1,7 @@ # Support ## Support for integration tests -For making integration testing easy and fun, the following support classes can be used. They are especially convinient for +For making integration testing easy and fun, the following support classes can be used. They are especially convenient for tasks with Json payload. - `IToKafkaTestHelper` diff --git a/tw-tasks-kafka-listener/build.gradle b/tw-tasks-kafka-listener/build.gradle index aaa0f5a3..67dcfddc 100644 --- a/tw-tasks-kafka-listener/build.gradle +++ b/tw-tasks-kafka-listener/build.gradle @@ -1,7 +1,7 @@ ext.projectName = "TwTasks Extension - Kafka Listener" ext.projectDescription = ''' Set of components that simplify consumption of kafka messages from arbitrary topic -with further convertion of those to task instances. While providing sensible defaults +with further converting of those to task instances. While providing sensible defaults the extension is not as flexible as kafka integration oriented libraries, such as spring-kafka, consider using those instead if more flexibility is needed. From d06285b6b7f69d4b5c0525c77e68ea593e521179 Mon Sep 17 00:00:00 2001 From: Sep Taheri Date: Thu, 6 Feb 2025 16:18:37 +0000 Subject: [PATCH 06/12] fi test --- CHANGELOG.md | 2 +- .../testapp/TaskCancellationIntTest.java | 12 ++++-------- .../tasks/testapp/TaskDeletionIntTest.java | 19 +++++++------------ .../com/transferwise/tasks/ITasksService.java | 2 +- .../com/transferwise/tasks/TasksService.java | 4 ++-- tw-tasks-kafka-listener/build.gradle | 2 +- 6 files changed, 16 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 413c799d..e48b9b92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ### Changed -- Added support to cancel tasks in progress and to delete tasks. +- Added support to cancel tasks in waiting and to delete tasks. ## 1.49.0 - 2025/01/08 diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskCancellationIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskCancellationIntTest.java index 2c969aa6..651f4c72 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskCancellationIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskCancellationIntTest.java @@ -48,7 +48,6 @@ void setup() { @Test void taskCanBeSuccessfullyCancelled() { - testTaskHandlerAdapter.setProcessor(resultRegisteringSyncTaskProcessor); UUID taskId = UuidUtils.generatePrefixCombUuid(); transactionsHelper.withTransaction().asNew().call(() -> @@ -71,15 +70,13 @@ void taskCanBeSuccessfullyCancelled() { ); await().until(() -> testTasksService.getTasks("test", null, WAITING).isEmpty()); - await().until(() -> resultRegisteringSyncTaskProcessor.getTaskResults().get(taskId) != null); assertEquals(0, getFailedCancellationCount()); assertEquals(1, getTaskCancelledCount()); } @Test void taskWillNotBeCancelledIfVersionHasAlreadyChanged() { - testTaskHandlerAdapter.setProcessor(resultRegisteringSyncTaskProcessor); - final long initialFailedNextEventTimeChangeCount = getFailedCancellationCount(); + final long initialFailedCancellationCount = getFailedCancellationCount(); final UUID taskId = UuidUtils.generatePrefixCombUuid(); transactionsHelper.withTransaction().asNew().call(() -> @@ -102,7 +99,7 @@ void taskWillNotBeCancelledIfVersionHasAlreadyChanged() { ) ) ); - assertEquals(initialFailedNextEventTimeChangeCount + 1, getFailedCancellationCount()); + assertEquals(initialFailedCancellationCount + 1, getFailedCancellationCount()); assertEquals(0, getTaskCancelledCount()); } @@ -111,8 +108,7 @@ void taskWillNotBeCancelledIfVersionHasAlreadyChanged() { names = {"WAITING", "UNKNOWN"}, mode = EnumSource.Mode.EXCLUDE) void taskWillNotBeCancelledIfNotWaiting(TaskStatus status) { - testTaskHandlerAdapter.setProcessor(resultRegisteringSyncTaskProcessor); - final long initialFailedNextEventTimeChangeCount = getFailedCancellationCount(); + final long initialFailedCancellationCount = getFailedCancellationCount(); final UUID taskId = UuidUtils.generatePrefixCombUuid(); transactionsHelper.withTransaction().asNew().call(() -> @@ -147,7 +143,7 @@ void taskWillNotBeCancelledIfNotWaiting(TaskStatus status) { ) ) ); - assertEquals(initialFailedNextEventTimeChangeCount + 1, getFailedCancellationCount()); + assertEquals(initialFailedCancellationCount + 1, getFailedCancellationCount()); assertEquals(0, getTaskCancelledCount()); } diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java index 43ed36e5..85b5c6ee 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java @@ -48,7 +48,6 @@ void setup() { @Test void taskCanBeSuccessfullyDeleted() { - testTaskHandlerAdapter.setProcessor(resultRegisteringSyncTaskProcessor); UUID taskId = UuidUtils.generatePrefixCombUuid(); transactionsHelper.withTransaction().asNew().call(() -> @@ -71,14 +70,12 @@ void taskCanBeSuccessfullyDeleted() { )); await().until(() -> testTasksService.getTasks("test", null, WAITING).isEmpty()); - await().until(() -> resultRegisteringSyncTaskProcessor.getTaskResults().get(taskId) != null); assertEquals(0, getFailedDeletionCount()); assertEquals(1, getTaskDeletedCount()); } @Test void taskWillNotBeDeletedIfVersionHasAlreadyChanged() { - testTaskHandlerAdapter.setProcessor(resultRegisteringSyncTaskProcessor); final long initialFailedNextEventTimeChangeCount = getFailedDeletionCount(); final UUID taskId = UuidUtils.generatePrefixCombUuid(); @@ -102,24 +99,24 @@ void taskWillNotBeDeletedIfVersionHasAlreadyChanged() { ) ) ); + assertEquals(initialFailedNextEventTimeChangeCount + 1, getFailedDeletionCount()); assertEquals(0, getTaskDeletedCount()); } @ParameterizedTest @EnumSource(value = TaskStatus.class, - names = {"WAITING", "UNKNOWN"}, + names = {"UNKNOWN"}, mode = EnumSource.Mode.EXCLUDE) - void taskWillBeDeletedIfWaiting(TaskStatus status) { - testTaskHandlerAdapter.setProcessor(resultRegisteringSyncTaskProcessor); + void taskWillBeDeletedForAnyStatus(TaskStatus status) { final long initialFailedNextEventTimeChangeCount = getFailedDeletionCount(); final UUID taskId = UuidUtils.generatePrefixCombUuid(); transactionsHelper.withTransaction().asNew().call(() -> tasksService.addTask(new ITasksService.AddTaskRequest() .setTaskId(taskId) - .setData(taskDataSerializer.serialize("I do want to be deleted!")) - .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(2))) + .setData(taskDataSerializer.serialize("I want to be deleted!")) + .setType("test").setRunAfterTime(ZonedDateTime.now().plusDays(12))) ); await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty()); @@ -130,8 +127,6 @@ void taskWillBeDeletedIfWaiting(TaskStatus status) { tasksService.resumeTask(new ITasksService.ResumeTaskRequest().setTaskId(taskId).setVersion(task.getVersion())) ); - await().until(() -> testTasksService.getWaitingTasks("test", null).isEmpty()); - var updateTask = tasksService.getTask(new GetTaskRequest().setTaskId(taskId)); taskDao.setStatus(taskId, status, updateTask.getVersion()); @@ -147,8 +142,8 @@ void taskWillBeDeletedIfWaiting(TaskStatus status) { ) ) ); - assertEquals(initialFailedNextEventTimeChangeCount + 1, getFailedDeletionCount()); - assertEquals(0, getTaskDeletedCount()); + assertEquals(initialFailedNextEventTimeChangeCount, getFailedDeletionCount()); + assertEquals(1, getTaskDeletedCount()); } private long getFailedDeletionCount() { diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java index 0ab16582..ffda84a1 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java @@ -168,7 +168,7 @@ class DeleteTaskRequest { /** * Cancel a task * - *

If the task is not found, false is returned. + *

If the task is not found or not in WAITING state, false is returned. */ boolean cancelTask(CancelTaskRequest request); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java index 0939014a..7983e350 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java @@ -311,7 +311,7 @@ public boolean deleteTask(DeleteTaskRequest request) { long version = task.getVersion(); if (version != request.getVersion()) { - coreMetricsTemplate.registerTaskDeleted(null, task.getType()); + coreMetricsTemplate.registerTaskDeletionFailure(null, task.getType()); log.debug("Expected version " + request.getVersion() + " does not match " + version + "."); return false; } @@ -348,7 +348,7 @@ public boolean cancelTask(CancelTaskRequest request) { long version = task.getVersion(); if (version != request.getVersion()) { - coreMetricsTemplate.registerTaskCancelled(null, task.getType()); + coreMetricsTemplate.registerTaskCancellationFailure(null, task.getType()); log.debug("Expected version " + request.getVersion() + " does not match " + version + "."); return false; } diff --git a/tw-tasks-kafka-listener/build.gradle b/tw-tasks-kafka-listener/build.gradle index 67dcfddc..aaa0f5a3 100644 --- a/tw-tasks-kafka-listener/build.gradle +++ b/tw-tasks-kafka-listener/build.gradle @@ -1,7 +1,7 @@ ext.projectName = "TwTasks Extension - Kafka Listener" ext.projectDescription = ''' Set of components that simplify consumption of kafka messages from arbitrary topic -with further converting of those to task instances. While providing sensible defaults +with further convertion of those to task instances. While providing sensible defaults the extension is not as flexible as kafka integration oriented libraries, such as spring-kafka, consider using those instead if more flexibility is needed. From 73b2b3333ac18c74aba825aaa35358e8dbcd2b31 Mon Sep 17 00:00:00 2001 From: Sep Taheri Date: Thu, 6 Feb 2025 16:38:35 +0000 Subject: [PATCH 07/12] fi test --- .../tasks/testapp/TaskDeletionIntTest.java | 4 ++-- .../com/transferwise/tasks/ITasksService.java | 2 +- .../com/transferwise/tasks/TasksService.java | 17 +++++++++++------ 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java index 85b5c6ee..9f671564 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java @@ -106,9 +106,9 @@ void taskWillNotBeDeletedIfVersionHasAlreadyChanged() { @ParameterizedTest @EnumSource(value = TaskStatus.class, - names = {"UNKNOWN"}, + names = {"PROCESSING", "UNKNOWN"}, mode = EnumSource.Mode.EXCLUDE) - void taskWillBeDeletedForAnyStatus(TaskStatus status) { + void taskWillBeDeletedForAnyStatusExceptProcessing(TaskStatus status) { final long initialFailedNextEventTimeChangeCount = getFailedDeletionCount(); final UUID taskId = UuidUtils.generatePrefixCombUuid(); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java index ffda84a1..2c391218 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java @@ -154,7 +154,7 @@ enum TasksProcessingState { /** * Delete a task * - *

If the task is not found, false is returned. + *

If the task is not found or in PROCESSING state, false is returned. */ boolean deleteTask(DeleteTaskRequest request); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java index 7983e350..686e9127 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java @@ -316,13 +316,18 @@ public boolean deleteTask(DeleteTaskRequest request) { return false; } - if (!taskDao.deleteTask(taskId, version)) { - coreMetricsTemplate.registerTaskDeletionFailure(null, task.getType()); - return false; - } else { - coreMetricsTemplate.registerTaskDeleted(null, task.getType()); - return true; + if (!task.getStatus().equals(TaskStatus.PROCESSING.name())) { + if (!taskDao.deleteTask(taskId, version)) { + coreMetricsTemplate.registerTaskDeletionFailure(null, task.getType()); + return false; + } else { + coreMetricsTemplate.registerTaskDeleted(null, task.getType()); + return true; + } } + + coreMetricsTemplate.registerTaskDeletionFailure(null, task.getType()); + return false; }); } From 8e6cdd6f2a990f158616fceb7470c29b36303b63 Mon Sep 17 00:00:00 2001 From: Sep Taheri Date: Fri, 7 Feb 2025 09:53:45 +0000 Subject: [PATCH 08/12] more meaninigful assertions --- .../transferwise/tasks/testapp/TaskCancellationIntTest.java | 4 ++-- .../com/transferwise/tasks/testapp/TaskDeletionIntTest.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskCancellationIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskCancellationIntTest.java index 651f4c72..e89abd8e 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskCancellationIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskCancellationIntTest.java @@ -1,6 +1,6 @@ package com.transferwise.tasks.testapp; -import static com.transferwise.tasks.domain.TaskStatus.WAITING; +import static com.transferwise.tasks.domain.TaskStatus.CANCELLED; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -69,7 +69,7 @@ void taskCanBeSuccessfullyCancelled() { )) ); - await().until(() -> testTasksService.getTasks("test", null, WAITING).isEmpty()); + await().until(() -> !testTasksService.getTasks("test", null, CANCELLED).isEmpty()); assertEquals(0, getFailedCancellationCount()); assertEquals(1, getTaskCancelledCount()); } diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java index 9f671564..28d498ea 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java @@ -1,6 +1,5 @@ package com.transferwise.tasks.testapp; -import static com.transferwise.tasks.domain.TaskStatus.WAITING; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -11,6 +10,7 @@ import com.transferwise.tasks.ITaskDataSerializer; import com.transferwise.tasks.ITasksService; import com.transferwise.tasks.ITasksService.GetTaskRequest; +import com.transferwise.tasks.ITasksService.GetTaskResponse.Result; import com.transferwise.tasks.dao.ITaskDao; import com.transferwise.tasks.domain.Task; import com.transferwise.tasks.domain.TaskStatus; @@ -69,7 +69,7 @@ void taskCanBeSuccessfullyDeleted() { ) )); - await().until(() -> testTasksService.getTasks("test", null, WAITING).isEmpty()); + await().until(() -> tasksService.getTask(new GetTaskRequest().setTaskId(taskId)).getResult().equals(Result.NOT_FOUND)); assertEquals(0, getFailedDeletionCount()); assertEquals(1, getTaskDeletedCount()); } From 050dbf7ee7799a1be3ce30efe61d1b3ff28e738c Mon Sep 17 00:00:00 2001 From: Sep Taheri Date: Tue, 11 Feb 2025 14:33:42 +0000 Subject: [PATCH 09/12] remove deleting tasks --- CHANGELOG.md | 2 +- .../tasks/testapp/TaskDeletionIntTest.java | 172 ------------------ .../com/transferwise/tasks/ITasksService.java | 14 -- .../com/transferwise/tasks/TasksService.java | 43 ----- .../tasks/entrypoints/EntryPointsNames.java | 1 - .../tasks/helpers/CoreMetricsTemplate.java | 12 -- .../tasks/helpers/ICoreMetricsTemplate.java | 4 - 7 files changed, 1 insertion(+), 247 deletions(-) delete mode 100644 integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index e48b9b92..3ce9e2df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ### Changed -- Added support to cancel tasks in waiting and to delete tasks. +- Added support to cancel tasks in waiting state. ## 1.49.0 - 2025/01/08 diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java deleted file mode 100644 index 28d498ea..00000000 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDeletionIntTest.java +++ /dev/null @@ -1,172 +0,0 @@ -package com.transferwise.tasks.testapp; - -import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import com.transferwise.common.baseutils.UuidUtils; -import com.transferwise.tasks.BaseIntTest; -import com.transferwise.tasks.ITaskDataSerializer; -import com.transferwise.tasks.ITasksService; -import com.transferwise.tasks.ITasksService.GetTaskRequest; -import com.transferwise.tasks.ITasksService.GetTaskResponse.Result; -import com.transferwise.tasks.dao.ITaskDao; -import com.transferwise.tasks.domain.Task; -import com.transferwise.tasks.domain.TaskStatus; -import com.transferwise.tasks.test.ITestTasksService; -import io.micrometer.core.instrument.Counter; -import java.time.ZonedDateTime; -import java.util.List; -import java.util.UUID; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; -import org.springframework.beans.factory.annotation.Autowired; - -@Slf4j -public class TaskDeletionIntTest extends BaseIntTest { - - @Autowired - private ITasksService tasksService; - @Autowired - private ITestTasksService testTasksService; - @Autowired - private ITaskDataSerializer taskDataSerializer; - @Autowired - private ITaskDao taskDao; - - @BeforeEach - void setup() { - transactionsHelper.withTransaction().asNew().call(() -> { - testTasksService.reset(); - return null; - }); - } - - @Test - void taskCanBeSuccessfullyDeleted() { - UUID taskId = UuidUtils.generatePrefixCombUuid(); - - transactionsHelper.withTransaction().asNew().call(() -> - tasksService.addTask(new ITasksService.AddTaskRequest() - .setTaskId(taskId) - .setData(taskDataSerializer.serialize("I want to be deleted")) - .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(1))) - ); - - await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty()); - - var task = tasksService.getTask(new GetTaskRequest().setTaskId(taskId)); - - assertTrue(transactionsHelper.withTransaction().asNew().call(() -> - tasksService.deleteTask( - new ITasksService.DeleteTaskRequest() - .setTaskId(taskId) - .setVersion(task.getVersion()) - ) - )); - - await().until(() -> tasksService.getTask(new GetTaskRequest().setTaskId(taskId)).getResult().equals(Result.NOT_FOUND)); - assertEquals(0, getFailedDeletionCount()); - assertEquals(1, getTaskDeletedCount()); - } - - @Test - void taskWillNotBeDeletedIfVersionHasAlreadyChanged() { - final long initialFailedNextEventTimeChangeCount = getFailedDeletionCount(); - final UUID taskId = UuidUtils.generatePrefixCombUuid(); - - transactionsHelper.withTransaction().asNew().call(() -> - tasksService.addTask(new ITasksService.AddTaskRequest() - .setTaskId(taskId) - .setData(taskDataSerializer.serialize("I want to be deleted too!")) - .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(1))) - ); - - await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty()); - - var task = tasksService.getTask(new GetTaskRequest().setTaskId(taskId)); - - assertFalse( - transactionsHelper.withTransaction().asNew().call(() -> - tasksService.deleteTask( - new ITasksService.DeleteTaskRequest() - .setTaskId(taskId) - .setVersion(task.getVersion() - 1) - ) - ) - ); - - assertEquals(initialFailedNextEventTimeChangeCount + 1, getFailedDeletionCount()); - assertEquals(0, getTaskDeletedCount()); - } - - @ParameterizedTest - @EnumSource(value = TaskStatus.class, - names = {"PROCESSING", "UNKNOWN"}, - mode = EnumSource.Mode.EXCLUDE) - void taskWillBeDeletedForAnyStatusExceptProcessing(TaskStatus status) { - final long initialFailedNextEventTimeChangeCount = getFailedDeletionCount(); - final UUID taskId = UuidUtils.generatePrefixCombUuid(); - - transactionsHelper.withTransaction().asNew().call(() -> - tasksService.addTask(new ITasksService.AddTaskRequest() - .setTaskId(taskId) - .setData(taskDataSerializer.serialize("I want to be deleted!")) - .setType("test").setRunAfterTime(ZonedDateTime.now().plusDays(12))) - ); - - await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty()); - List tasks = testTasksService.getWaitingTasks("test", null); - Task task = tasks.stream().filter(t -> t.getId().equals(taskId)).findFirst().orElseThrow(); - - transactionsHelper.withTransaction().asNew().call(() -> - tasksService.resumeTask(new ITasksService.ResumeTaskRequest().setTaskId(taskId).setVersion(task.getVersion())) - ); - - var updateTask = tasksService.getTask(new GetTaskRequest().setTaskId(taskId)); - - taskDao.setStatus(taskId, status, updateTask.getVersion()); - - var finalTask = tasksService.getTask(new GetTaskRequest().setTaskId(taskId)); - - assertTrue( - transactionsHelper.withTransaction().asNew().call(() -> - tasksService.deleteTask( - new ITasksService.DeleteTaskRequest() - .setTaskId(taskId) - .setVersion(finalTask.getVersion()) - ) - ) - ); - assertEquals(initialFailedNextEventTimeChangeCount, getFailedDeletionCount()); - assertEquals(1, getTaskDeletedCount()); - } - - private long getFailedDeletionCount() { - Counter counter = meterRegistry.find("twTasks.tasks.failedDeletionCount").tags( - "taskType", "test" - ).counter(); - - if (counter == null) { - return 0; - } else { - return (long) counter.count(); - } - } - - private long getTaskDeletedCount() { - Counter counter = meterRegistry.find("twTasks.tasks.deletedCount").tags( - "taskType", "test" - ).counter(); - - if (counter == null) { - return 0; - } else { - return (long) counter.count(); - } - } -} diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java index 2c391218..2ed41a72 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java @@ -151,20 +151,6 @@ enum TasksProcessingState { STARTED, STOPPED, STOP_IN_PROGRESS } - /** - * Delete a task - * - *

If the task is not found or in PROCESSING state, false is returned. - */ - boolean deleteTask(DeleteTaskRequest request); - - @Data - @Accessors(chain = true) - class DeleteTaskRequest { - private UUID taskId; - private long version; - } - /** * Cancel a task * diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java index 686e9127..62cb1274 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java @@ -289,49 +289,6 @@ public ITasksService.TasksProcessingState getTasksProcessingState(String bucketI return tasksExecutionTriggerer.getTasksProcessingState(bucketId); } - - @Override - @EntryPoint(usesExisting = true) - @Transactional(rollbackFor = Exception.class) - public boolean deleteTask(DeleteTaskRequest request) { - return entryPointsHelper.continueOrCreate(EntryPointsGroups.TW_TASKS_ENGINE, EntryPointsNames.DELETE_TASK, - () -> { - UUID taskId = request.getTaskId(); - mdcService.put(request.getTaskId(), request.getVersion()); - - FullTaskRecord task = taskDao.getTask(taskId, FullTaskRecord.class); - - if (task == null) { - log.debug("Cannot delete task '" + taskId + "' as it was not found."); - return false; - } - - mdcService.put(task); - - long version = task.getVersion(); - - if (version != request.getVersion()) { - coreMetricsTemplate.registerTaskDeletionFailure(null, task.getType()); - log.debug("Expected version " + request.getVersion() + " does not match " + version + "."); - return false; - } - - if (!task.getStatus().equals(TaskStatus.PROCESSING.name())) { - if (!taskDao.deleteTask(taskId, version)) { - coreMetricsTemplate.registerTaskDeletionFailure(null, task.getType()); - return false; - } else { - coreMetricsTemplate.registerTaskDeleted(null, task.getType()); - return true; - } - } - - coreMetricsTemplate.registerTaskDeletionFailure(null, task.getType()); - return false; - }); - } - - @Override @EntryPoint(usesExisting = true) @Transactional(rollbackFor = Exception.class) diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsNames.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsNames.java index 111de27d..52c63dd7 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsNames.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsNames.java @@ -17,7 +17,6 @@ public final class EntryPointsNames { public static final String ASYNC_HANDLE_SUCCESS = "asyncHandleSuccess"; public static final String ASYNC_HANDLE_FAIL = "asyncHandleFail"; public static final String RESCHEDULE_TASK = "rescheduleTask"; - public static final String DELETE_TASK = "deleteTask"; public static final String CANCEL_TASK = "cancelTask"; public static final String GET_TASK = "getTask"; diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java index 8555ec5e..c2380c68 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java @@ -55,8 +55,6 @@ public class CoreMetricsTemplate implements ICoreMetricsTemplate { public static final String METRIC_TASKS_MARKED_AS_FAILED_COUNT = METRIC_PREFIX + "tasks.markedAsFailedCount"; public static final String METRIC_TASKS_RESCHEDULED_COUNT = METRIC_PREFIX + "tasks.rescheduledCount"; public static final String METRIC_TASKS_CANCELLED_COUNT = METRIC_PREFIX + "tasks.cancelledCount"; - public static final String METRIC_TASKS_DELETED_COUNT = METRIC_PREFIX + "tasks.deletedCount"; - public static final String METRIC_TASKS_FAILED_DELETION_COUNT = METRIC_PREFIX + "tasks.failedDeletionCount"; public static final String METRIC_TASKS_FAILED_CANCELLATION_COUNT = METRIC_PREFIX + "tasks.failedCancellationCount"; public static final String METRIC_TASKS_FAILED_NEXT_EVENT_TIME_CHANGE_COUNT = METRIC_PREFIX + "tasks.failedNextEventTimeChangeCount"; public static final String METRIC_TASKS_ADDINGS_COUNT = METRIC_PREFIX + "task.addings.count"; @@ -539,16 +537,6 @@ protected static class MetricHandle { } - public void registerTaskDeleted(String bucketId, String taskType) { - meterCache.counter(METRIC_TASKS_DELETED_COUNT, TagsSet.of(TAG_BUCKET_ID, resolveBucketId(bucketId), TAG_TASK_TYPE, taskType)) - .increment(); - } - - public void registerTaskDeletionFailure(String bucketId, String taskType) { - meterCache.counter(METRIC_TASKS_FAILED_DELETION_COUNT, TagsSet.of(TAG_BUCKET_ID, resolveBucketId(bucketId), TAG_TASK_TYPE, taskType)) - .increment(); - } - public void registerTaskCancelled(String bucketId, String taskType) { meterCache.counter(METRIC_TASKS_CANCELLED_COUNT, TagsSet.of(TAG_BUCKET_ID, resolveBucketId(bucketId), TAG_TASK_TYPE, taskType)) .increment(); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java index f54832a4..240e739b 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java @@ -127,10 +127,6 @@ void registerTasksCleanerTasksDeletion(TaskStatus status, int deletableTasksCoun @SuppressWarnings("rawtypes") void registerKafkaProducer(Producer producer); - void registerTaskDeleted(String bucketId, String taskType); - - void registerTaskDeletionFailure(String bucketId, String taskType); - void registerTaskCancelled(String bucketId, String taskType); void registerTaskCancellationFailure(String bucketId, String taskType); From 372dd0a2e2d4891f071aae6f7027c755d10b933b Mon Sep 17 00:00:00 2001 From: Sep Taheri Date: Wed, 12 Feb 2025 17:06:13 +0000 Subject: [PATCH 10/12] CR comments --- CHANGELOG.md | 2 +- gradle.properties | 2 +- .../testapp/TaskCancellationIntTest.java | 80 ++++++++----------- .../com/transferwise/tasks/ITasksService.java | 14 ---- .../com/transferwise/tasks/TasksService.java | 41 ---------- .../tasks/domain/TaskVersionId.java | 1 + .../tasks/helpers/CoreMetricsTemplate.java | 8 +- .../tasks/helpers/ICoreMetricsTemplate.java | 4 +- .../management/ITasksManagementService.java | 13 +++ .../management/TasksManagementService.java | 43 ++++++++++ 10 files changed, 98 insertions(+), 110 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ce9e2df..caecebec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## 1.49.1 - 2025/02/07 +## 1.50.0 - 2025/02/07 ### Changed diff --git a/gradle.properties b/gradle.properties index 5493b308..612630bf 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ -version=1.49.1 +version=1.50.0 org.gradle.internal.http.socketTimeout=120000 diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskCancellationIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskCancellationIntTest.java index e89abd8e..a23aa131 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskCancellationIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskCancellationIntTest.java @@ -14,6 +14,8 @@ import com.transferwise.tasks.dao.ITaskDao; import com.transferwise.tasks.domain.Task; import com.transferwise.tasks.domain.TaskStatus; +import com.transferwise.tasks.domain.TaskVersionId; +import com.transferwise.tasks.management.ITasksManagementService; import com.transferwise.tasks.test.ITestTasksService; import io.micrometer.core.instrument.Counter; import java.time.ZonedDateTime; @@ -37,36 +39,32 @@ public class TaskCancellationIntTest extends BaseIntTest { private ITaskDataSerializer taskDataSerializer; @Autowired private ITaskDao taskDao; + @Autowired + private ITasksManagementService tasksManagementService; @BeforeEach void setup() { - transactionsHelper.withTransaction().asNew().call(() -> { - testTasksService.reset(); - return null; - }); + testTasksService.reset(); } @Test void taskCanBeSuccessfullyCancelled() { UUID taskId = UuidUtils.generatePrefixCombUuid(); - transactionsHelper.withTransaction().asNew().call(() -> - tasksService.addTask(new ITasksService.AddTaskRequest() - .setTaskId(taskId) - .setData(taskDataSerializer.serialize("I want to be cancelled")) - .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(1))) - ); + tasksService.addTask(new ITasksService.AddTaskRequest() + .setTaskId(taskId) + .setData(taskDataSerializer.serialize("I want to be cancelled")) + .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(1))); await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty()); var task = tasksService.getTask(new GetTaskRequest().setTaskId(taskId)); - assertTrue(transactionsHelper.withTransaction().asNew().call(() -> - tasksService.cancelTask( - new ITasksService.CancelTaskRequest() - .setTaskId(taskId) - .setVersion(task.getVersion()) - )) + assertTrue( + tasksManagementService.cancelTask( + new ITasksManagementService.CancelTaskRequest() + .setTaskVersionId(new TaskVersionId().setId(taskId).setVersion(task.getVersion())) + ) ); await().until(() -> !testTasksService.getTasks("test", null, CANCELLED).isEmpty()); @@ -79,26 +77,21 @@ void taskWillNotBeCancelledIfVersionHasAlreadyChanged() { final long initialFailedCancellationCount = getFailedCancellationCount(); final UUID taskId = UuidUtils.generatePrefixCombUuid(); - transactionsHelper.withTransaction().asNew().call(() -> - tasksService.addTask(new ITasksService.AddTaskRequest() - .setTaskId(taskId) - .setData(taskDataSerializer.serialize("I want to be cancelled too!")) - .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(1))) - ); + tasksService.addTask(new ITasksService.AddTaskRequest() + .setTaskId(taskId) + .setData(taskDataSerializer.serialize("I want to be cancelled too!")) + .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(1))); await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty()); var task = tasksService.getTask(new GetTaskRequest().setTaskId(taskId)); assertFalse( - transactionsHelper.withTransaction().asNew().call(() -> - tasksService.cancelTask( - new ITasksService.CancelTaskRequest() - .setTaskId(taskId) - .setVersion(task.getVersion() - 1) - ) - ) - ); + tasksManagementService.cancelTask( + new ITasksManagementService.CancelTaskRequest() + .setTaskVersionId(new TaskVersionId().setId(taskId).setVersion(task.getVersion() - 1)) + )); + assertEquals(initialFailedCancellationCount + 1, getFailedCancellationCount()); assertEquals(0, getTaskCancelledCount()); } @@ -111,20 +104,16 @@ void taskWillNotBeCancelledIfNotWaiting(TaskStatus status) { final long initialFailedCancellationCount = getFailedCancellationCount(); final UUID taskId = UuidUtils.generatePrefixCombUuid(); - transactionsHelper.withTransaction().asNew().call(() -> - tasksService.addTask(new ITasksService.AddTaskRequest() - .setTaskId(taskId) - .setData(taskDataSerializer.serialize("I do not want to be cancelled!")) - .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(2))) - ); + tasksService.addTask(new ITasksService.AddTaskRequest() + .setTaskId(taskId) + .setData(taskDataSerializer.serialize("I do not want to be cancelled!")) + .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(2))); await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty()); List tasks = testTasksService.getWaitingTasks("test", null); Task task = tasks.stream().filter(t -> t.getId().equals(taskId)).findFirst().orElseThrow(); - transactionsHelper.withTransaction().asNew().call(() -> - tasksService.resumeTask(new ITasksService.ResumeTaskRequest().setTaskId(taskId).setVersion(task.getVersion())) - ); + tasksService.resumeTask(new ITasksService.ResumeTaskRequest().setTaskId(taskId).setVersion(task.getVersion())); await().until(() -> testTasksService.getWaitingTasks("test", null).isEmpty()); @@ -135,14 +124,11 @@ void taskWillNotBeCancelledIfNotWaiting(TaskStatus status) { var finalTask = tasksService.getTask(new GetTaskRequest().setTaskId(taskId)); assertFalse( - transactionsHelper.withTransaction().asNew().call(() -> - tasksService.cancelTask( - new ITasksService.CancelTaskRequest() - .setTaskId(taskId) - .setVersion(finalTask.getVersion()) - ) - ) - ); + tasksManagementService.cancelTask( + new ITasksManagementService.CancelTaskRequest() + .setTaskVersionId(new TaskVersionId().setId(taskId).setVersion(finalTask.getVersion())) + )); + assertEquals(initialFailedCancellationCount + 1, getFailedCancellationCount()); assertEquals(0, getTaskCancelledCount()); } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java index 2ed41a72..6b31c7ef 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java @@ -150,18 +150,4 @@ public enum Result { enum TasksProcessingState { STARTED, STOPPED, STOP_IN_PROGRESS } - - /** - * Cancel a task - * - *

If the task is not found or not in WAITING state, false is returned. - */ - boolean cancelTask(CancelTaskRequest request); - - @Data - @Accessors(chain = true) - class CancelTaskRequest { - private UUID taskId; - private long version; - } } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java index 62cb1274..c32b8077 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java @@ -289,47 +289,6 @@ public ITasksService.TasksProcessingState getTasksProcessingState(String bucketI return tasksExecutionTriggerer.getTasksProcessingState(bucketId); } - @Override - @EntryPoint(usesExisting = true) - @Transactional(rollbackFor = Exception.class) - public boolean cancelTask(CancelTaskRequest request) { - return entryPointsHelper.continueOrCreate(EntryPointsGroups.TW_TASKS_ENGINE, EntryPointsNames.CANCEL_TASK, - () -> { - UUID taskId = request.getTaskId(); - mdcService.put(request.getTaskId(), request.getVersion()); - - FullTaskRecord task = taskDao.getTask(taskId, FullTaskRecord.class); - - if (task == null) { - log.debug("Cannot cancel task '" + taskId + "' as it was not found."); - return false; - } - - mdcService.put(task); - - long version = task.getVersion(); - - if (version != request.getVersion()) { - coreMetricsTemplate.registerTaskCancellationFailure(null, task.getType()); - log.debug("Expected version " + request.getVersion() + " does not match " + version + "."); - return false; - } - - if (task.getStatus().equals(TaskStatus.WAITING.name())) { - if (!taskDao.setStatus(taskId, TaskStatus.CANCELLED, version)) { - coreMetricsTemplate.registerTaskCancellationFailure(null, task.getType()); - return false; - } else { - coreMetricsTemplate.registerTaskCancelled(null, task.getType()); - return true; - } - } - - coreMetricsTemplate.registerTaskCancellationFailure(null, task.getType()); - return false; - }); - } - /** * We register an after commit hook here, so as soon as transaction has finished, the task will be triggered. * diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskVersionId.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskVersionId.java index 7e095918..2fca3854 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskVersionId.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskVersionId.java @@ -21,6 +21,7 @@ @JsonDeserialize(using = TaskVersionId.TaskVersionIdJsonDeserializer.class) public class TaskVersionId implements ITaskVersionId { + // Task ID private UUID id; private long version; diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java index c2380c68..61a80a17 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java @@ -537,13 +537,13 @@ protected static class MetricHandle { } - public void registerTaskCancelled(String bucketId, String taskType) { - meterCache.counter(METRIC_TASKS_CANCELLED_COUNT, TagsSet.of(TAG_BUCKET_ID, resolveBucketId(bucketId), TAG_TASK_TYPE, taskType)) + public void registerTaskCancelled(String taskType) { + meterCache.counter(METRIC_TASKS_CANCELLED_COUNT, TagsSet.of(TAG_BUCKET_ID, resolveBucketId(null), TAG_TASK_TYPE, taskType)) .increment(); } - public void registerTaskCancellationFailure(String bucketId, String taskType) { - meterCache.counter(METRIC_TASKS_FAILED_CANCELLATION_COUNT, TagsSet.of(TAG_BUCKET_ID, resolveBucketId(bucketId), TAG_TASK_TYPE, taskType)) + public void registerTaskCancellationFailure(String taskType) { + meterCache.counter(METRIC_TASKS_FAILED_CANCELLATION_COUNT, TagsSet.of(TAG_BUCKET_ID, resolveBucketId(null), TAG_TASK_TYPE, taskType)) .increment(); } } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java index 240e739b..fcf74cfa 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java @@ -127,8 +127,8 @@ void registerTasksCleanerTasksDeletion(TaskStatus status, int deletableTasksCoun @SuppressWarnings("rawtypes") void registerKafkaProducer(Producer producer); - void registerTaskCancelled(String bucketId, String taskType); + void registerTaskCancelled(String taskType); - void registerTaskCancellationFailure(String bucketId, String taskType); + void registerTaskCancellationFailure(String taskType); } diff --git a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/ITasksManagementService.java b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/ITasksManagementService.java index c9617013..2c195674 100644 --- a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/ITasksManagementService.java +++ b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/ITasksManagementService.java @@ -228,4 +228,17 @@ public static class TaskType { private List subTypes; } } + + /** + * Cancel a task + * + *

If the task is not found or not in WAITING state, false is returned. + */ + boolean cancelTask(CancelTaskRequest request); + + @Data + @Accessors(chain = true) + class CancelTaskRequest { + private TaskVersionId taskVersionId; + } } diff --git a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementService.java b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementService.java index 3e2122bc..2ba2a531 100644 --- a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementService.java +++ b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementService.java @@ -6,6 +6,8 @@ import com.transferwise.tasks.domain.TaskStatus; import com.transferwise.tasks.domain.TaskVersionId; import com.transferwise.tasks.entrypoints.EntryPoint; +import com.transferwise.tasks.entrypoints.EntryPointsGroups; +import com.transferwise.tasks.entrypoints.EntryPointsNames; import com.transferwise.tasks.entrypoints.IEntryPointsService; import com.transferwise.tasks.entrypoints.IMdcService; import com.transferwise.tasks.helpers.ICoreMetricsTemplate; @@ -269,4 +271,45 @@ public GetTaskTypesResponse getTaskTypes(List status) { .collect(Collectors.toList())); }); } + + @Override + @EntryPoint(usesExisting = true) + public boolean cancelTask(CancelTaskRequest request) { + return entryPointsHelper.continueOrCreate(EntryPointsGroups.TW_TASKS_ENGINE, EntryPointsNames.CANCEL_TASK, + () -> { + TaskVersionId taskVersionId = request.getTaskVersionId(); + mdcService.put(taskVersionId.getId(), taskVersionId.getVersion()); + + FullTaskRecord task = taskDao.getTask(taskVersionId.getId(), FullTaskRecord.class); + + if (task == null) { + log.debug("Cannot cancel task '" + taskVersionId.getId() + "' as it was not found."); + return false; + } + + mdcService.put(task); + + long version = task.getVersion(); + + if (version != request.getTaskVersionId().getVersion()) { + coreMetricsTemplate.registerTaskCancellationFailure( task.getType()); + log.debug("Expected version " + request.getTaskVersionId().getVersion() + " does not match " + version + "."); + return false; + } + + if (task.getStatus().equals(TaskStatus.WAITING.name())) { + if (!taskDao.setStatus(taskVersionId.getId(), TaskStatus.CANCELLED, version)) { + coreMetricsTemplate.registerTaskCancellationFailure( task.getType()); + return false; + } else { + coreMetricsTemplate.registerTaskCancelled( task.getType()); + return true; + } + } + + coreMetricsTemplate.registerTaskCancellationFailure( task.getType()); + return false; + }); + } + } From 96157ff06043a9e47bee6d4ac0f3127ff964d396 Mon Sep 17 00:00:00 2001 From: Sep Taheri Date: Wed, 12 Feb 2025 17:09:16 +0000 Subject: [PATCH 11/12] update changelog: --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index caecebec..929214bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## 1.50.0 - 2025/02/07 +## 1.50.0 - 2025/02/12 ### Changed From 55d13d50e8076b6f35f50ddb00979702162e34b1 Mon Sep 17 00:00:00 2001 From: Sep Taheri Date: Wed, 12 Feb 2025 17:16:31 +0000 Subject: [PATCH 12/12] cehckstyle --- .../tasks/management/TasksManagementService.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementService.java b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementService.java index 2ba2a531..ffc9ac8b 100644 --- a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementService.java +++ b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementService.java @@ -292,24 +292,23 @@ public boolean cancelTask(CancelTaskRequest request) { long version = task.getVersion(); if (version != request.getTaskVersionId().getVersion()) { - coreMetricsTemplate.registerTaskCancellationFailure( task.getType()); + coreMetricsTemplate.registerTaskCancellationFailure(task.getType()); log.debug("Expected version " + request.getTaskVersionId().getVersion() + " does not match " + version + "."); return false; } if (task.getStatus().equals(TaskStatus.WAITING.name())) { if (!taskDao.setStatus(taskVersionId.getId(), TaskStatus.CANCELLED, version)) { - coreMetricsTemplate.registerTaskCancellationFailure( task.getType()); + coreMetricsTemplate.registerTaskCancellationFailure(task.getType()); return false; } else { - coreMetricsTemplate.registerTaskCancelled( task.getType()); + coreMetricsTemplate.registerTaskCancelled(task.getType()); return true; } } - coreMetricsTemplate.registerTaskCancellationFailure( task.getType()); + coreMetricsTemplate.registerTaskCancellationFailure(task.getType()); return false; }); } - }