From 91d066081a195d3f59c85367946a03da36f6c8e6 Mon Sep 17 00:00:00 2001 From: Gabriel Taets <70316822+gabrieltaets-tw@users.noreply.github.com> Date: Wed, 13 Dec 2023 15:13:42 +0200 Subject: [PATCH] add endpoint to get task types (#192) * add endpoint to get task types * fix style * accept collection of task types and subtypes --- CHANGELOG.md | 5 + gradle.properties | 2 +- .../dao/ManagementTaskDaoIntTest.java | 6 +- .../tasks/testapp/TaskProcessingIntTest.java | 2 +- .../testapp/TasksManagementPortIntTest.java | 164 ++++++++++++++++++ .../management/ITasksManagementPort.java | 25 ++- .../management/ITasksManagementService.java | 21 +++ .../management/ManagementEntryPointNames.java | 1 + .../TasksManagementPortController.java | 16 ++ .../management/TasksManagementService.java | 23 ++- .../management/dao/IManagementTaskDao.java | 15 +- .../management/dao/JdbcManagementTaskDao.java | 160 +++++++++++++++-- 12 files changed, 407 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4eee8643..5433f100 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +#### 1.41.0 - 2023/11/16 +### Added +- Added `taskType` and `taskSubType` parameters to management query endpoints. +- Added `/getTaskTypes` endpoint to retrieve list of registered task types and sub-types + ## 1.40.6 - 2023/11/16 ### Fixed diff --git a/gradle.properties b/gradle.properties index bb994dd0..4ef47a12 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ -version=1.40.6 +version=1.41.0 org.gradle.internal.http.socketTimeout=120000 diff --git a/integration-tests/src/test/java/com/transferwise/tasks/ext/management/dao/ManagementTaskDaoIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/ext/management/dao/ManagementTaskDaoIntTest.java index 805525e9..e2b2f1c6 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/ext/management/dao/ManagementTaskDaoIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/ext/management/dao/ManagementTaskDaoIntTest.java @@ -77,7 +77,7 @@ void gettingTasksInErrorStatusReturnsTasksInTheLimitOfMaxCount() { randomProcessingTask().withSubType("3").save(); randomErrorTask().withSubType("4").save(); - List tasks = managementTaskDao.getTasksInErrorStatus(2); + List tasks = managementTaskDao.getTasksInErrorStatus(2, null, null); assertEquals(2, tasks.size()); for (DaoTask1 task : tasks) { @@ -93,7 +93,7 @@ void gettingTasksInProcessingOrWaitingStatusReturnsTasksInTheLimitOfMaxCount() { randomProcessingTask().withSubType("4").save(); randomWaitingTask().withSubType("5").save(); - List tasks = managementTaskDao.getTasksInProcessingOrWaitingStatus(3); + List tasks = managementTaskDao.getTasksInProcessingOrWaitingStatus(3, null, null); assertEquals(3, tasks.size()); assertEquals(3, tasks.stream().filter(t -> ImmutableSet.of("1", "3", "4", "5").contains(t.getSubType())).count()); @@ -114,7 +114,7 @@ void gettingStuckTasksReturnsTasksInTheLimitOfMaxCount() { randomDoneTask().save(); randomWaitingTask().save(); - List tasks = managementTaskDao.getStuckTasks(4, Duration.ofMillis(-2)); + List tasks = managementTaskDao.getStuckTasks(4, null, null, Duration.ofMillis(-2)); assertEquals(4, tasks.size()); } diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskProcessingIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskProcessingIntTest.java index 6aa4f23b..f8f4a0e7 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskProcessingIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskProcessingIntTest.java @@ -214,7 +214,7 @@ public ISyncTaskProcessor.ProcessResult process(ITask task) { } List error = transactionsHelper.withTransaction().asNew().call(() -> - managementTaskDao.getTasksInErrorStatus(10) + managementTaskDao.getTasksInErrorStatus(10, null, null) ); boolean taskWasMarkedAsError = error.size() != 0 && error.get(0).getId().equals(taskRef.get().getTaskId()); if (taskWasMarkedAsError) { diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TasksManagementPortIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TasksManagementPortIntTest.java index 6a83a944..abfaf92f 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TasksManagementPortIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TasksManagementPortIntTest.java @@ -17,9 +17,12 @@ import com.transferwise.tasks.management.ITasksManagementPort; import com.transferwise.tasks.management.ITasksManagementPort.GetTaskDataResponse; import com.transferwise.tasks.management.ITasksManagementPort.GetTaskDataResponse.ResultCode; +import com.transferwise.tasks.management.ITasksManagementPort.GetTaskTypesResponse; import com.transferwise.tasks.management.ITasksManagementPort.GetTaskWithoutDataResponse; import com.transferwise.tasks.management.ITasksManagementPort.GetTasksInErrorResponse; import com.transferwise.tasks.management.ITasksManagementPort.GetTasksInErrorResponse.TaskInError; +import com.transferwise.tasks.management.ITasksManagementPort.GetTasksInProcessingOrWaitingResponse; +import com.transferwise.tasks.management.ITasksManagementPort.GetTasksInProcessingOrWaitingResponse.TaskInProcessingOrWaiting; import com.transferwise.tasks.management.ITasksManagementPort.GetTasksStuckResponse; import com.transferwise.tasks.management.ITasksManagementPort.GetTasksStuckResponse.TaskStuck; import java.time.ZonedDateTime; @@ -354,6 +357,167 @@ void immediatelyResumingAllTasksWorks() { assertFalse(task.getNextEventTime().isAfter(ZonedDateTime.now(TwContextClockHolder.getClock()).plusSeconds(1))); } + @Test + void filtersErroredTasksByTypeAndSubType() { + final UUID taskId = transactionsHelper.withTransaction().asNew().call(() -> { + TaskTestBuilder.newTask().inStatus(TaskStatus.ERROR).withMaxStuckTime(ZonedDateTime.now().plusDays(2)).save(); + TaskTestBuilder.newTask().inStatus(TaskStatus.ERROR).withMaxStuckTime(ZonedDateTime.now().plusDays(2)) + .withType("B") + .withSubType("SUB") + .save(); + TaskTestBuilder.newTask().inStatus(TaskStatus.ERROR).withMaxStuckTime(ZonedDateTime.now().plusDays(2)) + .withType("A") + .withSubType("BAD") + .save(); + TaskTestBuilder.newTask().inStatus(TaskStatus.ERROR).withMaxStuckTime(ZonedDateTime.now().plusDays(2)) + .withSubType("SUB") + .save(); + return TaskTestBuilder.newTask().inStatus(TaskStatus.ERROR).withMaxStuckTime(ZonedDateTime.now().plusDays(2)) + .withType("A") + .withSubType("SUB") + .save() + .getTaskId(); + }); + + ResponseEntity response = goodEngineerTemplate().postForEntity( + "/v1/twTasks/getTasksInError", + new ITasksManagementPort.GetTasksInErrorRequest().setMaxCount(10) + .setTaskTypes(List.of("A")) + .setTaskSubTypes(List.of("SUB")), + GetTasksInErrorResponse.class + ); + + assertEquals(200, response.getStatusCodeValue()); + GetTasksInErrorResponse tasksInErrorResponse = response.getBody(); + assertNotNull(tasksInErrorResponse); + List tasksInError = tasksInErrorResponse.getTasksInError(); + assertEquals(1, tasksInError.size()); + assertEquals(taskId, tasksInError.get(0).getTaskVersionId().getId()); + assertEquals("A", tasksInError.get(0).getType()); + assertEquals("SUB", tasksInError.get(0).getSubType()); + } + + @Test + void filtersStuckTasksByTypeAndSubType() { + testTasksService.stopProcessing(); + + final UUID taskId = transactionsHelper.withTransaction().asNew().call(() -> { + TaskTestBuilder.newTask().inStatus(TaskStatus.PROCESSING).withMaxStuckTime(ZonedDateTime.now().minusDays(2)).save(); + TaskTestBuilder.newTask().inStatus(TaskStatus.PROCESSING).withMaxStuckTime(ZonedDateTime.now().minusDays(2)) + .withType("B") + .withSubType("SUB") + .save(); + TaskTestBuilder.newTask().inStatus(TaskStatus.PROCESSING).withMaxStuckTime(ZonedDateTime.now().minusDays(2)) + .withType("A") + .withSubType("BAD") + .save(); + TaskTestBuilder.newTask().inStatus(TaskStatus.PROCESSING).withMaxStuckTime(ZonedDateTime.now().minusDays(2)) + .withSubType("SUB") + .save(); + return TaskTestBuilder.newTask().inStatus(TaskStatus.PROCESSING).withMaxStuckTime(ZonedDateTime.now().minusDays(2)) + .withType("A") + .withSubType("SUB") + .save() + .getTaskId(); + }); + + ResponseEntity response = goodEngineerTemplate().postForEntity( + "/v1/twTasks/getTasksStuck", + new ITasksManagementPort.GetTasksStuckRequest().setMaxCount(10) + .setTaskTypes(List.of("A")) + .setTaskSubTypes(List.of("SUB")), + ITasksManagementPort.GetTasksStuckResponse.class + ); + + assertEquals(200, response.getStatusCodeValue()); + GetTasksStuckResponse stuckTasksResponse = response.getBody(); + assertNotNull(stuckTasksResponse); + List tasksStuck = stuckTasksResponse.getTasksStuck(); + assertEquals(1, tasksStuck.size()); + assertEquals(taskId, tasksStuck.get(0).getTaskVersionId().getId()); + } + + @Test + void filtersWaitingTasksByTypeAndSubType() { + final UUID taskId = transactionsHelper.withTransaction().asNew().call(() -> { + TaskTestBuilder.newTask().inStatus(TaskStatus.WAITING).withMaxStuckTime(ZonedDateTime.now().plusDays(2)).save(); + TaskTestBuilder.newTask().inStatus(TaskStatus.WAITING).withMaxStuckTime(ZonedDateTime.now().plusDays(2)) + .withType("B") + .withSubType("SUB") + .save(); + TaskTestBuilder.newTask().inStatus(TaskStatus.WAITING).withMaxStuckTime(ZonedDateTime.now().plusDays(2)) + .withType("A") + .withSubType("BAD") + .save(); + TaskTestBuilder.newTask().inStatus(TaskStatus.WAITING).withMaxStuckTime(ZonedDateTime.now().plusDays(2)) + .withSubType("SUB") + .save(); + return TaskTestBuilder.newTask().inStatus(TaskStatus.WAITING).withMaxStuckTime(ZonedDateTime.now().plusDays(2)) + .withType("A") + .withSubType("SUB") + .save() + .getTaskId(); + }); + + ResponseEntity response = goodEngineerTemplate().postForEntity( + "/v1/twTasks/getTasksInProcessingOrWaiting", + new ITasksManagementPort.GetTasksInProcessingOrWaitingRequest().setMaxCount(10) + .setTaskTypes(List.of("A")) + .setTaskSubTypes(List.of("SUB")), + GetTasksInProcessingOrWaitingResponse.class + ); + + assertEquals( + 200, response.getStatusCodeValue()); + GetTasksInProcessingOrWaitingResponse waitingTasksResponse = response.getBody(); + assertNotNull(waitingTasksResponse); + List tasksWaiting = waitingTasksResponse.getTasksInProcessingOrWaiting(); + assertEquals(1, tasksWaiting.size()); + assertEquals(taskId, tasksWaiting.get(0).getTaskVersionId().getId()); + assertEquals("A", tasksWaiting.get(0).getType()); + assertEquals("SUB", tasksWaiting.get(0).getSubType()); + } + + @Test + void getTaskTypesWillReturnCorrectly() { + transactionsHelper.withTransaction().asNew().run(() -> { + TaskTestBuilder.newTask().inStatus(TaskStatus.WAITING).withMaxStuckTime(ZonedDateTime.now().plusDays(2)) + .withType("A") + .withSubType("SUB-2") + .save(); + TaskTestBuilder.newTask().inStatus(TaskStatus.WAITING).withMaxStuckTime(ZonedDateTime.now().plusDays(2)) + .withType("A") + .withSubType("SUB-1") + .save(); + TaskTestBuilder.newTask().inStatus(TaskStatus.WAITING).withMaxStuckTime(ZonedDateTime.now().plusDays(2)) + .withType("B") + .save(); + TaskTestBuilder.newTask().inStatus(TaskStatus.WAITING).withMaxStuckTime(ZonedDateTime.now().plusDays(2)) + .withType("A") + .save(); + TaskTestBuilder.newTask().inStatus(TaskStatus.WAITING).withMaxStuckTime(ZonedDateTime.now().plusDays(2)) + .withType("B") + .save(); + }); + + ResponseEntity response = goodEngineerTemplate().getForEntity( + "/v1/twTasks/getTaskTypes", + GetTaskTypesResponse.class + ); + + assertEquals(200, response.getStatusCodeValue()); + GetTaskTypesResponse typesResponse = response.getBody(); + assertNotNull(typesResponse); + List types = typesResponse.getTypes(); + assertEquals(2, types.size()); + assertEquals("A", types.get(0).getType()); + assertEquals("B", types.get(1).getType()); + assertEquals(2, types.get(0).getSubTypes().size()); + assertEquals("SUB-1", types.get(0).getSubTypes().get(0)); + assertEquals("SUB-2", types.get(0).getSubTypes().get(1)); + assertTrue(types.get(1).getSubTypes().isEmpty()); + } + private TestRestTemplate goodEngineerTemplate() { return testRestTemplate.withBasicAuth("goodEngineer", "q1w2e3r4"); } diff --git a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/ITasksManagementPort.java b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/ITasksManagementPort.java index 0909c893..29931ebe 100644 --- a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/ITasksManagementPort.java +++ b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/ITasksManagementPort.java @@ -140,6 +140,11 @@ ResponseEntity getTasksInProcessingOrWait @ResponseBody ResponseEntity getTasksById(@RequestBody GetTasksByIdRequest request); + @GetMapping(value = "${tw-tasks.core.base-url:}/v1/twTasks/getTaskTypes", produces = {MediaType.APPLICATION_JSON_VALUE}) + @ResponseBody + ResponseEntity getTaskTypes(); + + @Data @Accessors(chain = true) class GetTasksByIdRequest { @@ -186,8 +191,9 @@ class GetTaskWithoutDataResponse { @Data @Accessors(chain = true) class GetTasksInErrorRequest { - private int maxCount; + List taskTypes; + List taskSubTypes; } @Data @@ -195,6 +201,8 @@ class GetTasksInErrorRequest { class GetTasksInProcessingOrWaitingRequest { private int maxCount; + List taskTypes; + List taskSubTypes; } @Data @@ -241,6 +249,8 @@ public static class TaskInError { class GetTasksStuckRequest { private int maxCount; + List taskTypes; + List taskSubTypes; } @Data @@ -257,4 +267,17 @@ public static class TaskStuck { private Instant stuckTime; } } + + @Data + @Accessors(chain = true) + class GetTaskTypesResponse { + private List types; + + @Data + @Accessors(chain = true) + public static class TaskType { + private String type; + private List subTypes; + } + } } diff --git a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/ITasksManagementService.java b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/ITasksManagementService.java index c772579c..265531c8 100644 --- a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/ITasksManagementService.java +++ b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/ITasksManagementService.java @@ -110,6 +110,8 @@ public static class Result { class GetTasksInErrorRequest { private int maxCount; + private List taskTypes; + private List taskSubTypes; } @Data @@ -137,6 +139,8 @@ class GetTasksStuckRequest { private int maxCount; private Duration delta; + private List taskTypes; + private List taskSubTypes; } @Data @@ -161,6 +165,8 @@ public static class TaskStuck { class GetTasksInProcessingOrWaitingRequest { private int maxCount = 10; + private List taskTypes; + private List taskSubTypes; } @Data @@ -207,4 +213,19 @@ public static class Task { private Instant stateTime; } } + + GetTaskTypesResponse getTaskTypes(); + + @Data + @Accessors(chain = true) + class GetTaskTypesResponse { + private List types; + + @Data + @Accessors(chain = true) + public static class TaskType { + private String type; + private List subTypes; + } + } } diff --git a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/ManagementEntryPointNames.java b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/ManagementEntryPointNames.java index 62cf8b8a..3795932c 100644 --- a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/ManagementEntryPointNames.java +++ b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/ManagementEntryPointNames.java @@ -11,6 +11,7 @@ public final class ManagementEntryPointNames { public static final String MARK_AS_FAILED = "markAsFailed"; public static final String GET_TASK_WITHOUT_DATA = "getTaskWithoutData"; public static final String GET_TASK_DATA = "getTaskData"; + public static final String GET_TASKS_TYPES = "getTasksTypes"; private ManagementEntryPointNames() { throw new AssertionError(); diff --git a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementPortController.java b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementPortController.java index 99faa433..863683bf 100644 --- a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementPortController.java +++ b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementPortController.java @@ -99,6 +99,8 @@ public ResponseEntity getTasksInError(@RequestBody(requ return callWithAuthentication(() -> { ITasksManagementService.GetTasksInErrorResponse serviceResponse = tasksManagementService .getTasksInError(new ITasksManagementService.GetTasksInErrorRequest() + .setTaskTypes(request != null ? request.getTaskTypes() : null) + .setTaskSubTypes(request != null ? request.getTaskSubTypes() : null) .setMaxCount(request != null ? request.getMaxCount() : DEFAULT_MAX_COUNT)); return ResponseEntity.ok(new GetTasksInErrorResponse().setTasksInError(serviceResponse.getTasksInError().stream().map(taskInError -> @@ -140,6 +142,8 @@ public ResponseEntity getTasksInProcessin return callWithAuthentication(() -> { ITasksManagementService.GetTasksInProcessingOrWaitingResponse serviceResponse = tasksManagementService .getTasksInProcessingOrWaiting(new ITasksManagementService.GetTasksInProcessingOrWaitingRequest() + .setTaskTypes(request != null ? request.getTaskTypes() : null) + .setTaskSubTypes(request != null ? request.getTaskSubTypes() : null) .setMaxCount(request != null ? request.getMaxCount() : DEFAULT_MAX_COUNT)); return ResponseEntity.ok(new GetTasksInProcessingOrWaitingResponse() @@ -184,6 +188,8 @@ public ResponseEntity getTasksStuck(@RequestBody(required return callWithAuthentication(() -> { ITasksManagementService.GetTasksStuckResponse serviceResponse = tasksManagementService .getTasksStuck(new ITasksManagementService.GetTasksStuckRequest() + .setTaskTypes(request != null ? request.getTaskTypes() : null) + .setTaskSubTypes(request != null ? request.getTaskSubTypes() : null) .setMaxCount(request != null ? request.getMaxCount() : DEFAULT_MAX_COUNT)); return ResponseEntity.ok(new GetTasksStuckResponse().setTasksStuck(serviceResponse.getTasksStuck().stream().map(taskStuck -> @@ -192,6 +198,16 @@ public ResponseEntity getTasksStuck(@RequestBody(required }); } + @Override + public ResponseEntity getTaskTypes() { + return callWithAuthentication(() -> { + ITasksManagementService.GetTaskTypesResponse serviceResponse = tasksManagementService.getTaskTypes(); + + return ResponseEntity.ok(new GetTaskTypesResponse().setTypes(serviceResponse.getTypes().stream().map(type -> + new GetTaskTypesResponse.TaskType().setType(type.getType()).setSubTypes(type.getSubTypes())).collect(Collectors.toList()))); + }); + } + protected T callWithAuthentication(Supplier supplier) { return callWithAuthentication(tasksProperties.getTasksManagement().getRoles(), (auth) -> supplier.get()); } diff --git a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementService.java b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementService.java index c8125d38..4100f0e5 100644 --- a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementService.java +++ b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementService.java @@ -17,6 +17,7 @@ import com.transferwise.tasks.management.dao.IManagementTaskDao.DaoTask1; import com.transferwise.tasks.management.dao.IManagementTaskDao.DaoTask2; import com.transferwise.tasks.management.dao.IManagementTaskDao.DaoTask3; +import com.transferwise.tasks.management.dao.IManagementTaskDao.DaoTaskType; import com.transferwise.tasks.utils.LogUtils; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -123,7 +124,7 @@ public ResumeTasksImmediatelyResponse resumeAllTasksImmediately(ResumeAllTasksIm return response; } - List tasksInError = managementTaskDao.getTasksInErrorStatus(request.getMaxCount()); + List tasksInError = managementTaskDao.getTasksInErrorStatus(request.getMaxCount(), List.of(request.getTaskType()), null); List taskVersionIdsToResume = tasksInError.stream() .filter(t -> t.getType().equals(request.getTaskType())) .map(t -> new TaskVersionId().setId(t.getId()).setVersion(t.getVersion())) @@ -138,7 +139,7 @@ public ResumeTasksImmediatelyResponse resumeAllTasksImmediately(ResumeAllTasksIm public GetTasksInErrorResponse getTasksInError(GetTasksInErrorRequest request) { return entryPointsHelper .continueOrCreate(ManagementEntryPointGroups.TW_TASKS_MANAGEMENT, ManagementEntryPointNames.GET_TASKS_IN_ERROR, () -> { - List tasks = managementTaskDao.getTasksInErrorStatus(request.getMaxCount()); + List tasks = managementTaskDao.getTasksInErrorStatus(request.getMaxCount(), request.getTaskTypes(), request.getTaskSubTypes()); return new GetTasksInErrorResponse().setTasksInError( tasks.stream().map(t -> new GetTasksInErrorResponse.TaskInError() @@ -155,8 +156,8 @@ public GetTasksInErrorResponse getTasksInError(GetTasksInErrorRequest request) { public GetTasksStuckResponse getTasksStuck(GetTasksStuckRequest request) { return entryPointsHelper .continueOrCreate(ManagementEntryPointGroups.TW_TASKS_MANAGEMENT, ManagementEntryPointNames.GET_TASKS_STUCK, () -> { - List tasks = managementTaskDao.getStuckTasks(request.getMaxCount(), request.getDelta() == null - ? Duration.ofSeconds(10) : request.getDelta()); + List tasks = managementTaskDao.getStuckTasks(request.getMaxCount(), request.getTaskTypes(), request.getTaskSubTypes(), + request.getDelta() == null ? Duration.ofSeconds(10) : request.getDelta()); return new GetTasksStuckResponse().setTasksStuck( tasks.stream().map(t -> new GetTasksStuckResponse.TaskStuck() @@ -172,7 +173,8 @@ public GetTasksInProcessingOrWaitingResponse getTasksInProcessingOrWaiting(GetTa return entryPointsHelper .continueOrCreate(ManagementEntryPointGroups.TW_TASKS_MANAGEMENT, ManagementEntryPointNames.GET_TASKS_IN_PROCESSING_OR_WAITING, () -> { - List tasks = managementTaskDao.getTasksInProcessingOrWaitingStatus(request.getMaxCount()); + List tasks = managementTaskDao.getTasksInProcessingOrWaitingStatus( + request.getMaxCount(), request.getTaskTypes(), request.getTaskSubTypes()); return new GetTasksInProcessingOrWaitingResponse().setTasksInProcessingOrWaiting( tasks.stream().map(t -> new GetTasksInProcessingOrWaitingResponse.TaskInProcessingOrWaiting() .setTaskVersionId(new TaskVersionId().setId(t.getId()).setVersion(t.getVersion())) @@ -255,4 +257,15 @@ public GetTaskDataResponse getTaskData(GetTaskDataRequest request) { return response; }); } + + @Override + public GetTaskTypesResponse getTaskTypes() { + return entryPointsHelper + .continueOrCreate(ManagementEntryPointGroups.TW_TASKS_MANAGEMENT, ManagementEntryPointNames.GET_TASKS_TYPES, () -> { + List types = managementTaskDao.getTaskTypes(); + return new GetTaskTypesResponse().setTypes( + types.stream().map(t -> new GetTaskTypesResponse.TaskType().setType(t.getType()).setSubTypes(t.getSubTypes())) + .collect(Collectors.toList())); + }); + } } diff --git a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/dao/IManagementTaskDao.java b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/dao/IManagementTaskDao.java index dc9d0a84..76ff6c27 100644 --- a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/dao/IManagementTaskDao.java +++ b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/dao/IManagementTaskDao.java @@ -43,13 +43,22 @@ class DaoTask3 { private ZonedDateTime nextEventTime; } - List getTasksInErrorStatus(int maxCount); + @Data + @Accessors(chain = true) + class DaoTaskType { + private String type; + private List subTypes; + } + + List getTasksInErrorStatus(int maxCount, List taskType, List taskSubType); boolean scheduleTaskForImmediateExecution(UUID taskId, long version); - List getStuckTasks(int maxCount, Duration delta); + List getStuckTasks(int maxCount, List taskTypes, List taskSubTypes, Duration delta); - List getTasksInProcessingOrWaitingStatus(int maxCount); + List getTasksInProcessingOrWaitingStatus(int maxCount, List taskTypes, List taskSubTypes); List getTasks(List uuids); + + List getTaskTypes(); } diff --git a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/dao/JdbcManagementTaskDao.java b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/dao/JdbcManagementTaskDao.java index 44e1892f..100faea4 100644 --- a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/dao/JdbcManagementTaskDao.java +++ b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/dao/JdbcManagementTaskDao.java @@ -1,5 +1,10 @@ package com.transferwise.tasks.management.dao; +import static java.util.stream.Collectors.filtering; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toList; + import com.transferwise.common.context.TwContextClockHolder; import com.transferwise.tasks.dao.ITaskDaoDataSerializer; import com.transferwise.tasks.dao.ITaskDaoDataSerializer.SerializedData; @@ -10,24 +15,31 @@ import com.transferwise.tasks.helpers.sql.ArgumentPreparedStatementSetter; import com.transferwise.tasks.helpers.sql.CacheKey; import com.transferwise.tasks.helpers.sql.SqlHelper; +import com.transferwise.tasks.management.dao.JdbcManagementTaskDao.Queries.QueryBuilder; +import com.transferwise.tasks.management.dao.JdbcManagementTaskDao.Queries.QueryBuilder.Op; import com.transferwise.tasks.utils.TimeUtils; import java.sql.Timestamp; import java.time.Duration; import java.time.Instant; import java.time.ZonedDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.List; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import javax.sql.DataSource; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.PreparedStatementSetter; import org.springframework.transaction.annotation.Transactional; + public class JdbcManagementTaskDao implements IManagementTaskDao { - private static class Queries { + static class Queries { static final String GET_TASKS = "getTasks"; @@ -36,20 +48,79 @@ private static class Queries { final String getStuckTasks; final String getTasksInStatus; final String getTasks; + final String getTaskTypes; Queries(ITwTaskTables tables) { scheduleTaskForImmediateExecution = "update " + tables.getTaskTableIdentifier() + " set status=?" - + ",next_event_time=?,state_time=?,time_updated=?,version=? where id=? and version=?"; - getTasksInErrorStatus = "select id,version,state_time,type,sub_type from " + tables.getTaskTableIdentifier() - + " where status='" + TaskStatus.ERROR.name() + "' order by next_event_time desc limit ?"; - getStuckTasks = "select id,version,next_event_time from " + tables.getTaskTableIdentifier() + " where status=?" - + " and next_event_time getTasksInErrorStatus(int maxCount) { + public List getTasksInErrorStatus(int maxCount, List taskTypes, List taskSubTypes) { + QueryBuilder builder = Queries.queryBuilder(queries.getTasksInErrorStatus) + .and("status") + .desc("next_event_time") + .withLimit(); + + addTaskTypeArgs(builder, taskTypes, taskSubTypes); return jdbcTemplate.query( - queries.getTasksInErrorStatus, - args(maxCount), + builder.build(), + args(TaskStatus.ERROR.name(), taskTypes, taskSubTypes, maxCount), (rs, rowNum) -> new DaoTask1() .setId(sqlMapper.sqlTaskIdToUuid(rs.getObject(1))) @@ -99,23 +176,37 @@ public List getTasksInErrorStatus(int maxCount) { @Override @Transactional(rollbackFor = Exception.class) public boolean scheduleTaskForImmediateExecution(UUID taskId, long version) { + if (taskId == null) { + throw new IllegalArgumentException("taskId may not be null"); + } Timestamp now = Timestamp.from(Instant.now(TwContextClockHolder.getClock())); + String query = Queries.queryBuilder(queries.scheduleTaskForImmediateExecution) + .and("id") + .and("version") + .build(); return jdbcTemplate.update( - queries.scheduleTaskForImmediateExecution, + query, args(TaskStatus.WAITING, now, now, now, version + 1, taskId, version) ) == 1; } @Override - public List getStuckTasks(int maxCount, Duration delta) { + public List getStuckTasks(int maxCount, List taskTypes, List taskSubTypes, Duration delta) { Timestamp timeThreshold = Timestamp.from(ZonedDateTime.now(TwContextClockHolder.getClock()).toInstant().minus(delta)); List stuckTasks = new ArrayList<>(); + QueryBuilder builder = Queries.queryBuilder(queries.getStuckTasks) + .and("status") + .and("next_event_time", Op.LESS_THAN) + .desc("next_event_time") + .withLimit(); + addTaskTypeArgs(builder, taskTypes, taskSubTypes); + String query = builder.build(); for (TaskStatus taskStatus : STUCK_STATUSES) { stuckTasks.addAll( jdbcTemplate.query( - queries.getStuckTasks, - args(taskStatus, timeThreshold, maxCount), + query, + args(taskStatus.name(), timeThreshold, taskTypes, taskSubTypes, maxCount), (rs, rowNum) -> new DaoTask2() .setId(sqlMapper.sqlTaskIdToUuid(rs.getObject(1))) @@ -131,13 +222,19 @@ public List getStuckTasks(int maxCount, Duration delta) { } @Override - public List getTasksInProcessingOrWaitingStatus(int maxCount) { + public List getTasksInProcessingOrWaitingStatus(int maxCount, List taskTypes, List taskSubTypes) { List result = new ArrayList<>(); + QueryBuilder builder = Queries.queryBuilder(queries.getTasksInStatus) + .and("status") + .desc("next_event_time") + .withLimit(); + addTaskTypeArgs(builder, taskTypes, taskSubTypes); + String query = builder.build(); for (TaskStatus taskStatus : WAITING_AND_PROCESSING_STATUSES) { result.addAll( jdbcTemplate.query( - queries.getTasksInStatus, - args(taskStatus, maxCount), + query, + args(taskStatus, taskTypes, taskSubTypes, maxCount), (rs, rowNum) -> new DaoTask3() .setId(sqlMapper.sqlTaskIdToUuid(rs.getObject(1))) @@ -211,7 +308,32 @@ public List getTasks(List taskIds) { } } + @Override + public List getTaskTypes() { + List> types = jdbcTemplate.query( + queries.getTaskTypes, + (rs, rowNum) -> ImmutablePair.of(rs.getString(1), rs.getString(2))); + + return types.stream() + .collect(groupingBy(Pair::getKey, mapping(Pair::getValue, filtering(Objects::nonNull, toList())))) + .entrySet() + .stream() + .map(entry -> new DaoTaskType().setType(entry.getKey()).setSubTypes(entry.getValue().stream().sorted().collect(toList()))) + .sorted(Comparator.comparing(DaoTaskType::getType)) + .collect(toList()); + } + + protected void addTaskTypeArgs(QueryBuilder builder, List taskTypes, List taskSubTypes) { + if (taskTypes != null && !taskTypes.isEmpty()) { + builder.and("type", Op.IN); + } + if (taskSubTypes != null && !taskSubTypes.isEmpty()) { + builder.and("sub_type", Op.IN); + } + } + protected PreparedStatementSetter args(Object... args) { - return new ArgumentPreparedStatementSetter(sqlMapper::uuidToSqlTaskId, args); + Object[] filtered = Arrays.stream(args).filter(Objects::nonNull).toArray(); + return new ArgumentPreparedStatementSetter(sqlMapper::uuidToSqlTaskId, filtered); } }