Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAA-2356: Cancel tasks implementation #221

Merged
merged 12 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ build

.idea
/demoapp/src/main/resources/application-rds.yml
*.iml
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres
to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## 1.49.1 - 2025/02/07

### Changed

- Added support to cancel tasks in waiting and to delete tasks.


## 1.49.0 - 2025/01/08

### Changed
Expand All @@ -18,7 +25,7 @@ It is worth keeping an eye on:
- changes to assignors used, log `Successfully synced group in generation Generation`
- on assignment strategy failures on consumers in prod and [consumer state](https://dashboards.tw.ee/d/f7094f30-a509-4592-aced-37584a70132a/kafka-consumer-groups-and-lag-details-kminion?orgId=1&refresh=30s&viewPanel=14).

If you use `com.wise.kafka.assignors.CanaryAwareRangeAssignor`, consider setting this config:
If you use `com.wise.kafka.assignors.CanaryAwareRangeAssignor`, consider setting this config:
```
spring.kafka.consumer.properties.partition.assignment.strategy:
com.wise.kafka.assignors.CanaryAwareRangeAssignor, org.apache.kafka.clients.consumer.RangeAssignor, org.apache.kafka.clients.consumer.CooperativeStickyAssignor
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
CREATE TABLE tw_task
(
id BINARY(16) PRIMARY KEY NOT NULL,
status ENUM('NEW', 'WAITING', 'SUBMITTED', 'PROCESSING', 'DONE', 'ERROR', 'FAILED'),
status ENUM('NEW', 'WAITING', 'SUBMITTED', 'PROCESSING', 'DONE', 'ERROR', 'FAILED', 'CANCELLED'),
-- Microsecond precision (6) is strongly recommended here to reduce the chance of gap locks deadlocking on tw_task_idx1
next_event_time DATETIME(6) NOT NULL,
state_time DATETIME(3) NOT NULL,
Expand Down
2 changes: 1 addition & 1 deletion docs/support.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Support

## Support for integration tests
For making integration testing easy and fun, the following support classes can be used. They are especially convinient for
For making integration testing easy and fun, the following support classes can be used. They are especially convenient for
tasks with Json payload.

- `IToKafkaTestHelper`
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=1.49.0
version=1.49.1
org.gradle.internal.http.socketTimeout=120000
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package com.transferwise.tasks.testapp;

import static com.transferwise.tasks.domain.TaskStatus.WAITING;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.transferwise.common.baseutils.UuidUtils;
import com.transferwise.tasks.BaseIntTest;
import com.transferwise.tasks.ITaskDataSerializer;
import com.transferwise.tasks.ITasksService;
import com.transferwise.tasks.ITasksService.GetTaskRequest;
import com.transferwise.tasks.dao.ITaskDao;
import com.transferwise.tasks.domain.Task;
import com.transferwise.tasks.domain.TaskStatus;
import com.transferwise.tasks.test.ITestTasksService;
import io.micrometer.core.instrument.Counter;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.springframework.beans.factory.annotation.Autowired;

@Slf4j
public class TaskCancellationIntTest extends BaseIntTest {

@Autowired
private ITasksService tasksService;
@Autowired
private ITestTasksService testTasksService;
@Autowired
private ITaskDataSerializer taskDataSerializer;
@Autowired
private ITaskDao taskDao;

@BeforeEach
void setup() {
transactionsHelper.withTransaction().asNew().call(() -> {
testTasksService.reset();
return null;
});
}

@Test
void taskCanBeSuccessfullyCancelled() {
UUID taskId = UuidUtils.generatePrefixCombUuid();

transactionsHelper.withTransaction().asNew().call(() ->
tasksService.addTask(new ITasksService.AddTaskRequest()
.setTaskId(taskId)
.setData(taskDataSerializer.serialize("I want to be cancelled"))
.setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(1)))
);

await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty());

var task = tasksService.getTask(new GetTaskRequest().setTaskId(taskId));

assertTrue(transactionsHelper.withTransaction().asNew().call(() ->
tasksService.cancelTask(
new ITasksService.CancelTaskRequest()
.setTaskId(taskId)
.setVersion(task.getVersion())
))
);

await().until(() -> testTasksService.getTasks("test", null, WAITING).isEmpty());
assertEquals(0, getFailedCancellationCount());
assertEquals(1, getTaskCancelledCount());
}

@Test
void taskWillNotBeCancelledIfVersionHasAlreadyChanged() {
final long initialFailedCancellationCount = getFailedCancellationCount();
final UUID taskId = UuidUtils.generatePrefixCombUuid();

transactionsHelper.withTransaction().asNew().call(() ->
tasksService.addTask(new ITasksService.AddTaskRequest()
.setTaskId(taskId)
.setData(taskDataSerializer.serialize("I want to be cancelled too!"))
.setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(1)))
);

await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty());

var task = tasksService.getTask(new GetTaskRequest().setTaskId(taskId));

assertFalse(
transactionsHelper.withTransaction().asNew().call(() ->
tasksService.cancelTask(
new ITasksService.CancelTaskRequest()
.setTaskId(taskId)
.setVersion(task.getVersion() - 1)
)
)
);
assertEquals(initialFailedCancellationCount + 1, getFailedCancellationCount());
assertEquals(0, getTaskCancelledCount());
}

@ParameterizedTest
@EnumSource(value = TaskStatus.class,
names = {"WAITING", "UNKNOWN"},
mode = EnumSource.Mode.EXCLUDE)
void taskWillNotBeCancelledIfNotWaiting(TaskStatus status) {
final long initialFailedCancellationCount = getFailedCancellationCount();
final UUID taskId = UuidUtils.generatePrefixCombUuid();

transactionsHelper.withTransaction().asNew().call(() ->
tasksService.addTask(new ITasksService.AddTaskRequest()
.setTaskId(taskId)
.setData(taskDataSerializer.serialize("I do not want to be cancelled!"))
.setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(2)))
);

await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty());
List<Task> tasks = testTasksService.getWaitingTasks("test", null);
Task task = tasks.stream().filter(t -> t.getId().equals(taskId)).findFirst().orElseThrow();

transactionsHelper.withTransaction().asNew().call(() ->
tasksService.resumeTask(new ITasksService.ResumeTaskRequest().setTaskId(taskId).setVersion(task.getVersion()))
);

await().until(() -> testTasksService.getWaitingTasks("test", null).isEmpty());

var updateTask = tasksService.getTask(new GetTaskRequest().setTaskId(taskId));

taskDao.setStatus(taskId, status, updateTask.getVersion());

var finalTask = tasksService.getTask(new GetTaskRequest().setTaskId(taskId));

assertFalse(
transactionsHelper.withTransaction().asNew().call(() ->
tasksService.cancelTask(
new ITasksService.CancelTaskRequest()
.setTaskId(taskId)
.setVersion(finalTask.getVersion())
)
)
);
assertEquals(initialFailedCancellationCount + 1, getFailedCancellationCount());
assertEquals(0, getTaskCancelledCount());
}

private long getFailedCancellationCount() {
Counter counter = meterRegistry.find("twTasks.tasks.failedCancellationCount").tags(
"taskType", "test"
).counter();

if (counter == null) {
return 0;
} else {
return (long) counter.count();
}
}

private long getTaskCancelledCount() {
Counter counter = meterRegistry.find("twTasks.tasks.cancelledCount").tags(
"taskType", "test"
).counter();

if (counter == null) {
return 0;
} else {
return (long) counter.count();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package com.transferwise.tasks.testapp;

import static com.transferwise.tasks.domain.TaskStatus.WAITING;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.transferwise.common.baseutils.UuidUtils;
import com.transferwise.tasks.BaseIntTest;
import com.transferwise.tasks.ITaskDataSerializer;
import com.transferwise.tasks.ITasksService;
import com.transferwise.tasks.ITasksService.GetTaskRequest;
import com.transferwise.tasks.dao.ITaskDao;
import com.transferwise.tasks.domain.Task;
import com.transferwise.tasks.domain.TaskStatus;
import com.transferwise.tasks.test.ITestTasksService;
import io.micrometer.core.instrument.Counter;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.springframework.beans.factory.annotation.Autowired;

@Slf4j
public class TaskDeletionIntTest extends BaseIntTest {

@Autowired
private ITasksService tasksService;
@Autowired
private ITestTasksService testTasksService;
@Autowired
private ITaskDataSerializer taskDataSerializer;
@Autowired
private ITaskDao taskDao;

@BeforeEach
void setup() {
transactionsHelper.withTransaction().asNew().call(() -> {
testTasksService.reset();
return null;
});
}

@Test
void taskCanBeSuccessfullyDeleted() {
UUID taskId = UuidUtils.generatePrefixCombUuid();

transactionsHelper.withTransaction().asNew().call(() ->
tasksService.addTask(new ITasksService.AddTaskRequest()
.setTaskId(taskId)
.setData(taskDataSerializer.serialize("I want to be deleted"))
.setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(1)))
);

await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty());

var task = tasksService.getTask(new GetTaskRequest().setTaskId(taskId));

assertTrue(transactionsHelper.withTransaction().asNew().call(() ->
tasksService.deleteTask(
new ITasksService.DeleteTaskRequest()
.setTaskId(taskId)
.setVersion(task.getVersion())
)
));

await().until(() -> testTasksService.getTasks("test", null, WAITING).isEmpty());
assertEquals(0, getFailedDeletionCount());
assertEquals(1, getTaskDeletedCount());
}

@Test
void taskWillNotBeDeletedIfVersionHasAlreadyChanged() {
final long initialFailedNextEventTimeChangeCount = getFailedDeletionCount();
final UUID taskId = UuidUtils.generatePrefixCombUuid();

transactionsHelper.withTransaction().asNew().call(() ->
tasksService.addTask(new ITasksService.AddTaskRequest()
.setTaskId(taskId)
.setData(taskDataSerializer.serialize("I want to be deleted too!"))
.setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(1)))
);

await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty());

var task = tasksService.getTask(new GetTaskRequest().setTaskId(taskId));

assertFalse(
transactionsHelper.withTransaction().asNew().call(() ->
tasksService.deleteTask(
new ITasksService.DeleteTaskRequest()
.setTaskId(taskId)
.setVersion(task.getVersion() - 1)
)
)
);

assertEquals(initialFailedNextEventTimeChangeCount + 1, getFailedDeletionCount());
assertEquals(0, getTaskDeletedCount());
}

@ParameterizedTest
@EnumSource(value = TaskStatus.class,
names = {"PROCESSING", "UNKNOWN"},
mode = EnumSource.Mode.EXCLUDE)
void taskWillBeDeletedForAnyStatusExceptProcessing(TaskStatus status) {
final long initialFailedNextEventTimeChangeCount = getFailedDeletionCount();
final UUID taskId = UuidUtils.generatePrefixCombUuid();

transactionsHelper.withTransaction().asNew().call(() ->
tasksService.addTask(new ITasksService.AddTaskRequest()
.setTaskId(taskId)
.setData(taskDataSerializer.serialize("I want to be deleted!"))
.setType("test").setRunAfterTime(ZonedDateTime.now().plusDays(12)))
);

await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty());
List<Task> tasks = testTasksService.getWaitingTasks("test", null);
Task task = tasks.stream().filter(t -> t.getId().equals(taskId)).findFirst().orElseThrow();

transactionsHelper.withTransaction().asNew().call(() ->
tasksService.resumeTask(new ITasksService.ResumeTaskRequest().setTaskId(taskId).setVersion(task.getVersion()))
);

var updateTask = tasksService.getTask(new GetTaskRequest().setTaskId(taskId));

taskDao.setStatus(taskId, status, updateTask.getVersion());

var finalTask = tasksService.getTask(new GetTaskRequest().setTaskId(taskId));

assertTrue(
transactionsHelper.withTransaction().asNew().call(() ->
tasksService.deleteTask(
new ITasksService.DeleteTaskRequest()
.setTaskId(taskId)
.setVersion(finalTask.getVersion())
)
)
);
assertEquals(initialFailedNextEventTimeChangeCount, getFailedDeletionCount());
assertEquals(1, getTaskDeletedCount());
}

private long getFailedDeletionCount() {
Counter counter = meterRegistry.find("twTasks.tasks.failedDeletionCount").tags(
"taskType", "test"
).counter();

if (counter == null) {
return 0;
} else {
return (long) counter.count();
}
}

private long getTaskDeletedCount() {
Counter counter = meterRegistry.find("twTasks.tasks.deletedCount").tags(
"taskType", "test"
).counter();

if (counter == null) {
return 0;
} else {
return (long) counter.count();
}
}
}
Loading