diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d391688..6cf07a4b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,12 +5,19 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -#### 1.44.0 - 2024/10/16 +## 1.45.0 - 2024/10/30 + +### Changed + +- Context switches are reduced. +- MDC values are cleared more aggressively. + +## 1.44.0 - 2024/10/16 - When registering cron tasks, log error if job already exists, but task is in error state. - - If silent mode is turned on, then this log will not appear. +- If silent mode is turned on, then this log will not appear. -#### 1.43.0 - 2024/08/09 +## 1.43.0 - 2024/08/09 - Added support for task context @@ -34,7 +41,7 @@ ALTER TABLE tw_task_data WAIT 2 LOCK = NONE; ``` -#### 1.42.0 - 2024/07/16 +## 1.42.0 - 2024/07/16 ### Added @@ -44,48 +51,48 @@ ALTER TABLE tw_task_data WAIT 2 - Support for spring boot 3.1 and 2.7 versions. -#### 1.41.6 - 2024/04/17 +## 1.41.6 - 2024/04/17 ### Added - `/getTaskTypes` endpoint may be disabled through configuration property `tw-tasks.core.tasks-management.enable-get-task-types: false`. Services with extreme amount of tasks might benefit from this. -#### 1.41.5 - 2024/04/05 +## 1.41.5 - 2024/04/05 ### Changed * Use static methods to create BeanPostProcessors. -#### 1.41.4 - 2024/04/02 +## 1.41.4 - 2024/04/02 ### Changed - `/getTaskTypes` endpoint accepts optional query parameter `status` to filter only types of tasks in the particular status(es). - Fixed a bug with `taskType` and `taskSubType` filters on query endpoints when multiple values are supplied, where it would consider only one value. -#### 1.41.3 - 2024/02/29 +## 1.41.3 - 2024/02/29 ### Changed * Add compatibility with Spring Boot 3.2. * Update dependencies -#### 1.41.2 - 2024/02/16 +## 1.41.2 - 2024/02/16 ### Changed * Kafka producer instantiation will be attempted up to 5 times with a 500ms delay between each attempt. In some cases, it has been observed that the CI fails to start the Kafka producer because the kafka docker container itself seems to not be fully up & accessible yet. -#### 1.41.1 - 2023/12/19 +## 1.41.1 - 2023/12/19 ### Changed - When building a Spring `ResponseEntity` with an explicit status, provide an integer derived from the `HttpStatus` enum, rather than providing the `HttpStatus` directly, to handle binary incompatibility between Spring 5 and 6 causing NoSuchMethod errors when tw-tasks is used with Spring 6 -#### 1.41.0 - 2023/11/16 +## 1.41.0 - 2023/11/16 ### Added @@ -98,19 +105,19 @@ ALTER TABLE tw_task_data WAIT 2 * NullPointerException in TaskManagementService.getTaskData in case task is not found -#### 1.40.5 - 2023/10/30 +## 1.40.5 - 2023/10/30 ### Added - Setting METADATA_MAX_AGE_CONFIG to two minutes for producer -#### 1.40.4 - 2023/10/06 +## 1.40.4 - 2023/10/06 ### Fixed * Monitoring queries for Postgres finding approximate table sizes in the databases were using a wrong schema and thus no records were found. -#### 1.40.3 - 2023/08/01 +## 1.40.3 - 2023/08/01 ### Added @@ -122,26 +129,26 @@ ALTER TABLE tw_task_data WAIT 2 * Build against Spring Boot 2.7.11 --> 2.7.14 * Build against Spring Boot 2.6.14 --> 2.6.15 -#### 1.40.2 - 2023/07/14 +## 1.40.2 - 2023/07/14 ### Added * introduced a new configuration parameter `tw-tasks.core.no-op-task-types` that allows a default no operation task handler to pick up deprecated task types in your service. -#### 1.40.1 - 2023/07/12 +## 1.40.1 - 2023/07/12 ### Fixed * `commitSync` operation sometimes reporting a WakeupException. -#### 1.40.0 - 2023/06/12 +## 1.40.0 - 2023/06/12 ### Added * CronJob annotation for Spring bean's methods -#### 1.39.2 - 2023/06/06 +## 1.39.2 - 2023/06/06 ### Fixed diff --git a/TODO.md b/TODO.md index 49ee895e..3aef293a 100644 --- a/TODO.md +++ b/TODO.md @@ -27,3 +27,10 @@ topics configurations. Could refactor the properties to more hierarhical structu 20. Add metric for how long it takes a task from adding to processing. Or scheduling time to processing. 23. Start using Avro or other binary messages for triggering queue. This Json crap is expensive? + +25. Check automatically if the concurrency policy returned is the same instance. +Unfortunately, it is quite common for services to create a new instance everytime we ask it, for let's say SimpleConcurrencyPolicy, and with +doing that, losing any concurrency control. +We could allow to still return a separate instance, if for example a special property is set. + +26. Add DSL from Wkp example. \ No newline at end of file diff --git a/build.common.gradle b/build.common.gradle index 974716d3..6afffc06 100644 --- a/build.common.gradle +++ b/build.common.gradle @@ -6,7 +6,6 @@ group = "com.transferwise.tasks" apply plugin: "java-library" apply plugin: "checkstyle" apply plugin: "idea" -apply plugin: "com.github.spotbugs" apply from: "../build.libraries.gradle" @@ -16,7 +15,7 @@ repositories { } configurations { - all { + configureEach { exclude(group: 'junit', module: 'junit') exclude(group: 'org.junit.vintage', module: 'junit-vintage-engine') @@ -134,7 +133,6 @@ test { } tasks.findAll { it.name.startsWith("spotbugs") }*.configure { - effort = "max" excludeFilter = file('../spotbugs-exclude.xml') reports { xml.required = true @@ -143,7 +141,7 @@ tasks.findAll { it.name.startsWith("spotbugs") }*.configure { } -tasks.withType(Checkstyle) { +tasks.withType(Checkstyle).configureEach { config = resources.text.fromFile(file('../google_checks.xml')) maxWarnings = 0 diff --git a/build.gradle b/build.gradle index 53ae99ed..8af887f6 100644 --- a/build.gradle +++ b/build.gradle @@ -1,20 +1,22 @@ import org.eclipse.jgit.api.errors.RefAlreadyExistsException +import com.github.spotbugs.snom.Confidence +import com.github.spotbugs.snom.Effort buildscript { if (!project.hasProperty("springBootVersion")) { ext.springBootVersion = System.getenv("SPRING_BOOT_VERSION") ?: "3.2.2" } dependencies { - classpath "com.avast.gradle:gradle-docker-compose-plugin:0.16.4" + classpath "com.avast.gradle:gradle-docker-compose-plugin:0.17.10" } } plugins { - id "com.github.spotbugs" version "5.0.14" apply false + id "com.github.spotbugs" version "6.0.+" id 'org.springframework.boot' version "$springBootVersion" apply false id "idea" - id 'org.ajoberstar.grgit' version '5.2.0' - id 'io.github.gradle-nexus.publish-plugin' version "1.1.0" + id 'org.ajoberstar.grgit' version '5.3.0' + id 'io.github.gradle-nexus.publish-plugin' version "2.0.0" } @@ -36,7 +38,7 @@ idea.module { excludeDirs += file('logs2') } -task tagRelease { +tasks.register('tagRelease') { doLast { try { grgit.tag.add { @@ -65,3 +67,9 @@ nexusPublishing { tasks.findByName("initializeSonatypeStagingRepository").setOnlyIf { System.getenv("OSS_SIGNING_KEY") } + +spotbugs { + effort = Effort.valueOf('MAX') + reportLevel = Confidence.valueOf('DEFAULT') +} + diff --git a/build.libraries.gradle b/build.libraries.gradle index f6da85ae..324c17df 100644 --- a/build.libraries.gradle +++ b/build.libraries.gradle @@ -1,13 +1,13 @@ ext { libraries = [ // version defined - awaitility : 'org.awaitility:awaitility:4.2.0', + awaitility : 'org.awaitility:awaitility:4.2.2', twGafferJtaStarter : 'com.transferwise.common:tw-gaffer-jta-starter:2.2.0', twGafferJtaJakartaStarter : 'com.transferwise.common:tw-gaffer-jta-jakarta-starter:3.0.0', - apacheCuratorRecipies : "org.apache.curator:curator-recipes:5.5.0", + apacheCuratorRecipies : "org.apache.curator:curator-recipes:5.7.1", apacheCommonsCollections : "org.apache.commons:commons-collections4:4.4", commonsIo : "commons-io:commons-io:2.15.1", - guava : "com.google.guava:guava:33.0.0-jre", + guava : "com.google.guava:guava:33.3.1-jre", jakartaValidationApi : 'jakarta.validation:jakarta.validation-api:3.0.2', javaxValidationApi : "javax.validation:validation-api:2.0.1.Final", semver4j : "com.vdurmont:semver4j:3.1.0", @@ -20,10 +20,10 @@ ext { twIncidents : 'com.transferwise.common:tw-incidents:1.2.2', twLeaderSelector : "com.transferwise.common:tw-leader-selector:1.10.2", twLeaderSelectorStarter : "com.transferwise.common:tw-leader-selector-starter:1.10.2", - twBaseUtils : "com.transferwise.common:tw-base-utils:1.12.4", + twBaseUtils : "com.transferwise.common:tw-base-utils:1.13.0", twEntryPointsStarter : 'com.transferwise.common:tw-entrypoints-starter:2.16.0', twSpyqlStarter : 'com.transferwise.common:tw-spyql-starter:1.6.2', - lubenZstd : 'com.github.luben:zstd-jni:1.5.5-2', + lubenZstd : 'com.github.luben:zstd-jni:1.5.6-7', lz4Java : 'org.lz4:lz4-java:1.8.0', // versions managed by spring-boot-dependencies platform diff --git a/demoapp/docker/docker-compose.yml b/demoapp/docker/docker-compose.yml index 18412d30..ac3d83bd 100644 --- a/demoapp/docker/docker-compose.yml +++ b/demoapp/docker/docker-compose.yml @@ -61,7 +61,7 @@ services: # --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --transaction-isolation=READ-COMMITTED" mariadb: - image: mariadb:10.6 + image: mariadb:10.11 hostname: mysql ports: - "13307:3306" @@ -78,7 +78,6 @@ services: --innodb_sync_array_size=16 --innodb_log_file_size=10g --query_cache_type=0 - --innodb_adaptive_hash_index=0 --collation-server=utf8mb4_unicode_ci --transaction-isolation=READ-COMMITTED --innodb_autoinc_lock_mode=2 " diff --git a/gradle.properties b/gradle.properties index a2ddf214..c0ccaa5f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ -version=1.44.0 +version=1.45.0 org.gradle.internal.http.socketTimeout=120000 diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index fae08049..1e2fbf0d 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.2-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/tw-tasks-core-spring-boot-starter/src/main/java/com/transferwise/tasks/core/autoconfigure/TwTasksEnvironmentPostProcessor.java b/tw-tasks-core-spring-boot-starter/src/main/java/com/transferwise/tasks/core/autoconfigure/TwTasksEnvironmentPostProcessor.java new file mode 100644 index 00000000..5a8a1e2a --- /dev/null +++ b/tw-tasks-core-spring-boot-starter/src/main/java/com/transferwise/tasks/core/autoconfigure/TwTasksEnvironmentPostProcessor.java @@ -0,0 +1,39 @@ +package com.transferwise.tasks.core.autoconfigure; + +import com.transferwise.tasks.helpers.CoreMetricsTemplate; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.env.EnvironmentPostProcessor; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.MapPropertySource; +import org.springframework.core.env.PropertySource; + +@Slf4j +public class TwTasksEnvironmentPostProcessor implements EnvironmentPostProcessor { + + private static final String PROPERTY_SOURCE_KEY = TwTasksEnvironmentPostProcessor.class.getName(); + static final String TW_OBS_BASE_EXTREMUM_CONFIG_PATH = "transferwise.observability.base.metrics.local-extremum-gauge-names.tw-tasks"; + + @Override + public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) { + PropertySource<?> propertySource = environment.getPropertySources().get(PROPERTY_SOURCE_KEY); + if (propertySource == null) { + final HashMap<String, Object> map = new HashMap<>(); + + // Calculate last minute min/max using tw-observability-base local extremums. + Set<String> gaugeNames = new HashSet<>(); + gaugeNames.add(CoreMetricsTemplate.GAUGE_METRIC_PROCESSING_ONGOING_TASKS_GRABBINGS_COUNT); + gaugeNames.add(CoreMetricsTemplate.GAUGE_PROCESSING_RUNNING_TASKS_COUNT); + gaugeNames.add(CoreMetricsTemplate.GAUGE_PROCESSING_IN_PROGRESS_TASKS_GRABBING_COUNT); + gaugeNames.add(CoreMetricsTemplate.GAUGE_TASKS_ONGOING_PROCESSINGS_COUNT); + + map.put(TW_OBS_BASE_EXTREMUM_CONFIG_PATH, gaugeNames); + + MapPropertySource mapPropertySource = new MapPropertySource(PROPERTY_SOURCE_KEY, map); + environment.getPropertySources().addLast(mapPropertySource); + } + } +} diff --git a/tw-tasks-core-spring-boot-starter/src/main/resources/META-INF/spring.factories b/tw-tasks-core-spring-boot-starter/src/main/resources/META-INF/spring.factories index 46ceed1a..bbc40bdf 100644 --- a/tw-tasks-core-spring-boot-starter/src/main/resources/META-INF/spring.factories +++ b/tw-tasks-core-spring-boot-starter/src/main/resources/META-INF/spring.factories @@ -1,2 +1,3 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.transferwise.tasks.core.autoconfigure.TwTasksCoreAutoConfiguration org.springframework.context.ApplicationListener=com.transferwise.tasks.TwTasksApplicationListener +org.springframework.boot.env.EnvironmentPostProcessor=com.transferwise.tasks.core.autoconfigure.TwTasksEnvironmentPostProcessor diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/buckets/BucketsManager.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/buckets/BucketsManager.java index cbc98932..4a0b2fb4 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/buckets/BucketsManager.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/buckets/BucketsManager.java @@ -11,6 +11,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Semaphore; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; @@ -58,7 +59,9 @@ protected void registerUniqueBucketIds() { for (String bucketId : bucketIds) { globalProcessingState.getBuckets().put(bucketId, new GlobalProcessingState.Bucket(priorityManager.getHighestPriority(), priorityManager.getLowestPriority()) - .setBucketId(bucketId)); + .setBucketId(bucketId) + .setTasksGrabbingSemaphore(new Semaphore(tasksProperties.getTaskGrabbingMaxConcurrency())) + ); } } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java index 35fc50c6..3b074e78 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java @@ -99,7 +99,6 @@ public JdbcTaskDao(DataSource dataSource, ITaskSqlMapper sqlMapper) { protected String insertTaskSql; protected String insertUniqueTaskKeySql; protected String insertTaskDataSql; - protected String insertTaskContext; protected String setToBeRetriedSql; protected String setToBeRetriedSql1; protected String grabForProcessingWithStatusAssertionSql; @@ -286,7 +285,7 @@ public InsertTaskResponse insertTask(InsertTaskRequest request) { SerializedData serializedContext = taskDataSerializer.serialize(contextBlob, request.getCompression()); jdbcTemplate.update( insertTaskDataSql, - args(taskId, Integer.valueOf(serializedData.getDataFormat()), serializedData.getData(), serializedContext.getDataFormat(), + args(taskId, serializedData.getDataFormat(), serializedData.getData(), serializedContext.getDataFormat(), serializedContext.getData()) ); if (request.getData() != null) { diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsService.java index 43220cb0..fffb7d0e 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/EntryPointsService.java @@ -10,12 +10,20 @@ public class EntryPointsService implements IEntryPointsService { @Autowired private UnitOfWorkManager unitOfWorkManager; + @Autowired + private MdcService mdcService; + @Override public <T> T continueOrCreate(String group, String name, Supplier<T> supplier) { if (TwContext.current().isEntryPoint()) { - return supplier.get(); + return mdcService.with(() -> supplier.get()); } - return unitOfWorkManager.createEntryPoint(group, name).toContext().execute(supplier); + return createEntrypoint(group, name, supplier); + } + + @Override + public <T> T createEntrypoint(String group, String name, Supplier<T> supplier) { + return unitOfWorkManager.createEntryPoint(group, name).toContext().execute(mdcService.with(() -> supplier)); } } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/IEntryPointsService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/IEntryPointsService.java index 2b9b79a8..d5f4a671 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/IEntryPointsService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/IEntryPointsService.java @@ -5,4 +5,6 @@ public interface IEntryPointsService { <T> T continueOrCreate(String group, String name, Supplier<T> supplier); + + <T> T createEntrypoint(String group, String name, Supplier<T> supplier); } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/IMdcService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/IMdcService.java index 428fafa2..f27756eb 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/IMdcService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/IMdcService.java @@ -16,7 +16,7 @@ public interface IMdcService { void put(@NonNull ITask task); - void put(@NonNull IBaseTask task); + void put(IBaseTask task); void put(UUID taskId, Long taskVersion); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/MdcService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/MdcService.java index 9dc9841d..c67949f7 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/MdcService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/entrypoints/MdcService.java @@ -28,7 +28,10 @@ public void put(@NonNull ITask task) { } @Override - public void put(@NonNull IBaseTask task) { + public void put(IBaseTask task) { + if (task == null) { + return; + } put(task.getId(), task.getVersion()); putType(task.getType()); } @@ -91,7 +94,8 @@ public <T> T with(Callable<T> callable) { if (previousMap != null && previousMap.containsKey(key)) { MDC.put(key, previousMap.get(key)); } else { - MDC.remove(key); + // We don't remove so we can have a sticky mdc keys visible for when an exception is logged outside. + // MDC.remove(key); } } keysSet.remove(); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java index 4d601e4d..70dcae0c 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java @@ -29,70 +29,70 @@ public class CoreMetricsTemplate implements ICoreMetricsTemplate { - private static final String METRIC_LIBRARY_INFO = "tw.library.info"; - - private static final String METRIC_TASKS_MARKED_AS_ERROR_COUNT = METRIC_PREFIX + "tasks.markedAsErrorCount"; - private static final String METRIC_TASKS_PROCESSINGS_COUNT = METRIC_PREFIX + "tasks.processingsCount"; - private static final String METRIC_TASKS_ONGOING_PROCESSINGS_COUNT = METRIC_PREFIX + "tasks.ongoingProcessingsCount"; - private static final String METRIC_TASKS_PROCESSED_COUNT = METRIC_PREFIX + "tasks.processedCount"; - private static final String METRIC_TASKS_PROCESSING_TIME = METRIC_PREFIX + "tasks.processingTime"; - private static final String METRIC_TASKS_FAILED_STATUS_CHANGE_COUNT = METRIC_PREFIX + "tasks.failedStatusChangeCount"; - private static final String METRIC_TASKS_DEBUG_PRIORITY_QUEUE_CHECK = METRIC_PREFIX + "tasks.debug.priorityQueueCheck"; - private static final String METRIC_TASKS_TASK_GRABBING = METRIC_PREFIX + "tasks.taskGrabbing"; - private static final String METRIC_TASKS_DEBUG_ROOM_MAP_ALREADY_HAS_TYPE = METRIC_PREFIX + "tasks.debug.roomMapAlreadyHasType"; - private static final String METRIC_TASKS_DEBUG_TASK_TRIGGERING_QUEUE_EMPTY = METRIC_PREFIX + "tasks.debug.taskTriggeringQueueEmpty"; - private static final String METRIC_TASKS_DUPLICATES_COUNT = METRIC_PREFIX + "tasks.duplicatesCount"; - private static final String METRIC_TASKS_RESUMER_SCHEDULED_TASKS_RESUMED_COUNT = METRIC_PREFIX + "tasksResumer.scheduledTasks.resumedCount"; - private static final String METRIC_TASKS_RESUMER_STUCK_TASKS_MARK_FAILED_COUNT = METRIC_PREFIX + "tasksResumer.stuckTasks.markFailedCount"; - private static final String METRIC_TASKS_RESUMER_STUCK_TASKS_IGNORED_COUNT = METRIC_PREFIX + "tasksResumer.stuckTasks.ignoredCount"; - private static final String METRIC_TASKS_RESUMER_STUCK_TASKS_RESUMED_COUNT = METRIC_PREFIX + "tasksResumer.stuckTasks.resumedCount"; - private static final String METRIC_TASKS_RESUMER_STUCK_TASKS_MARK_ERROR_COUNT = METRIC_PREFIX + "tasksResumer.stuckTasks.markErrorCount"; - private static final String METRIC_TASKS_FAILED_GRABBINGS_COUNT = METRIC_PREFIX + "tasks.failedGrabbingsCount"; - private static final String METRIC_TASKS_RETRIES_COUNT = METRIC_PREFIX + "tasks.retriesCount"; - private static final String METRIC_TASKS_RESUMINGS_COUNT = METRIC_PREFIX + "tasks.resumingsCount"; - private static final String METRIC_TASKS_MARKED_AS_FAILED_COUNT = METRIC_PREFIX + "tasks.markedAsFailedCount"; - private static final String METRIC_TASKS_RESCHEDULED_COUNT = METRIC_PREFIX + "tasks.rescheduledCount"; - private static final String METRIC_TASKS_FAILED_NEXT_EVENT_TIME_CHANGE_COUNT = METRIC_PREFIX + "tasks.failedNextEventTimeChangeCount"; - private static final String METRIC_TASKS_ADDINGS_COUNT = METRIC_PREFIX + "task.addings.count"; - private static final String METRIC_TASKS_SERVICE_IN_PROGRESS_TRIGGERINGS_COUNT = METRIC_PREFIX + "tasksService.inProgressTriggeringsCount"; - private static final String METRIC_TASKS_SERVICE_ACTIVE_TRIGGERINGS_COUNT = METRIC_PREFIX + "tasksService.activeTriggeringsCount"; - private static final String METRIC_BUCKETS_MANAGER_BUCKETS_COUNT = METRIC_PREFIX + "bucketsManager.bucketsCount"; - private static final String METRIC_PROCESSING_ONGOING_TASKS_GRABBINGS_COUNT = METRIC_PREFIX + "processing.ongoingTasksGrabbingsCount"; - private static final String METRIC_TASKS_CLEANER_DELETABLE_TASKS_COUNT = METRIC_PREFIX + "tasksCleaner.deletableTasksCount"; - private static final String METRIC_TASKS_CLEANER_DELETED_TASKS_COUNT = METRIC_PREFIX + "tasksCleaner.deletedTasksCount"; - private static final String METRIC_TASKS_CLEANER_DELETED_UNIQUE_KEYS_COUNT = METRIC_PREFIX + "tasksCleaner.deletedUniqueKeysCount"; - private static final String METRIC_TASKS_CLEANER_DELETED_TASK_DATAS_COUNT = METRIC_PREFIX + "tasksCleaner.deletedTaskDatasCount"; - private static final String METRIC_TASKS_CLEANER_DELETE_LAG_SECONDS = METRIC_PREFIX + "tasksCleaner.deleteLagSeconds"; - private static final String METRIC_DAO_DATA_SIZE = METRIC_PREFIX + "dao.data.size"; - private static final String METRIC_DAO_DATA_SERIALIZED_SIZE = METRIC_PREFIX + "dao.data.serialized.size"; - private static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_POLLING_BUCKETS_COUNT = + private static final String GAUGE_LIBRARY_INFO = "tw.library.info"; + + public static final String METRIC_TASKS_MARKED_AS_ERROR_COUNT = METRIC_PREFIX + "tasks.markedAsErrorCount"; + public static final String METRIC_TASKS_PROCESSINGS_COUNT = METRIC_PREFIX + "tasks.processingsCount"; + public static final String GAUGE_TASKS_ONGOING_PROCESSINGS_COUNT = METRIC_PREFIX + "tasks.ongoingProcessingsCount"; + public static final String METRIC_TASKS_PROCESSED_COUNT = METRIC_PREFIX + "tasks.processedCount"; + public static final String METRIC_TASKS_PROCESSING_TIME = METRIC_PREFIX + "tasks.processingTime"; + public static final String METRIC_TASKS_FAILED_STATUS_CHANGE_COUNT = METRIC_PREFIX + "tasks.failedStatusChangeCount"; + public static final String METRIC_TASKS_DEBUG_PRIORITY_QUEUE_CHECK = METRIC_PREFIX + "tasks.debug.priorityQueueCheck"; + public static final String METRIC_TASKS_TASK_GRABBING = METRIC_PREFIX + "tasks.taskGrabbing"; + public static final String METRIC_TASKS_DEBUG_ROOM_MAP_ALREADY_HAS_TYPE = METRIC_PREFIX + "tasks.debug.roomMapAlreadyHasType"; + public static final String METRIC_TASKS_DEBUG_TASK_TRIGGERING_QUEUE_EMPTY = METRIC_PREFIX + "tasks.debug.taskTriggeringQueueEmpty"; + public static final String METRIC_TASKS_DUPLICATES_COUNT = METRIC_PREFIX + "tasks.duplicatesCount"; + public static final String METRIC_TASKS_RESUMER_SCHEDULED_TASKS_RESUMED_COUNT = METRIC_PREFIX + "tasksResumer.scheduledTasks.resumedCount"; + public static final String METRIC_TASKS_RESUMER_STUCK_TASKS_MARK_FAILED_COUNT = METRIC_PREFIX + "tasksResumer.stuckTasks.markFailedCount"; + public static final String METRIC_TASKS_RESUMER_STUCK_TASKS_IGNORED_COUNT = METRIC_PREFIX + "tasksResumer.stuckTasks.ignoredCount"; + public static final String METRIC_TASKS_RESUMER_STUCK_TASKS_RESUMED_COUNT = METRIC_PREFIX + "tasksResumer.stuckTasks.resumedCount"; + public static final String METRIC_TASKS_RESUMER_STUCK_TASKS_MARK_ERROR_COUNT = METRIC_PREFIX + "tasksResumer.stuckTasks.markErrorCount"; + public static final String METRIC_TASKS_FAILED_GRABBINGS_COUNT = METRIC_PREFIX + "tasks.failedGrabbingsCount"; + public static final String METRIC_TASKS_RETRIES_COUNT = METRIC_PREFIX + "tasks.retriesCount"; + public static final String METRIC_TASKS_RESUMINGS_COUNT = METRIC_PREFIX + "tasks.resumingsCount"; + public static final String METRIC_TASKS_MARKED_AS_FAILED_COUNT = METRIC_PREFIX + "tasks.markedAsFailedCount"; + public static final String METRIC_TASKS_RESCHEDULED_COUNT = METRIC_PREFIX + "tasks.rescheduledCount"; + public static final String METRIC_TASKS_FAILED_NEXT_EVENT_TIME_CHANGE_COUNT = METRIC_PREFIX + "tasks.failedNextEventTimeChangeCount"; + public static final String METRIC_TASKS_ADDINGS_COUNT = METRIC_PREFIX + "task.addings.count"; + public static final String GAUGE_TASKS_SERVICE_IN_PROGRESS_TRIGGERINGS_COUNT = METRIC_PREFIX + "tasksService.inProgressTriggeringsCount"; + public static final String GAUGE_TASKS_SERVICE_ACTIVE_TRIGGERINGS_COUNT = METRIC_PREFIX + "tasksService.activeTriggeringsCount"; + public static final String METRIC_BUCKETS_MANAGER_BUCKETS_COUNT = METRIC_PREFIX + "bucketsManager.bucketsCount"; + public static final String GAUGE_METRIC_PROCESSING_ONGOING_TASKS_GRABBINGS_COUNT = METRIC_PREFIX + "processing.ongoingTasksGrabbingsCount"; + public static final String METRIC_TASKS_CLEANER_DELETABLE_TASKS_COUNT = METRIC_PREFIX + "tasksCleaner.deletableTasksCount"; + public static final String METRIC_TASKS_CLEANER_DELETED_TASKS_COUNT = METRIC_PREFIX + "tasksCleaner.deletedTasksCount"; + public static final String METRIC_TASKS_CLEANER_DELETED_UNIQUE_KEYS_COUNT = METRIC_PREFIX + "tasksCleaner.deletedUniqueKeysCount"; + public static final String METRIC_TASKS_CLEANER_DELETED_TASK_DATAS_COUNT = METRIC_PREFIX + "tasksCleaner.deletedTaskDatasCount"; + public static final String GAUGE_TASKS_CLEANER_DELETE_LAG_SECONDS = METRIC_PREFIX + "tasksCleaner.deleteLagSeconds"; + public static final String METRIC_DAO_DATA_SIZE = METRIC_PREFIX + "dao.data.size"; + public static final String METRIC_DAO_DATA_SERIALIZED_SIZE = METRIC_PREFIX + "dao.data.serialized.size"; + public static final String GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_POLLING_BUCKETS_COUNT = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.pollingBucketsCount"; - private static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_RECEIVED_TRIGGERS_COUNT = + public static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_RECEIVED_TRIGGERS_COUNT = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.receivedTriggersCount"; - private static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_COMMITS_COUNT = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.commitsCount"; - private static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSET_ALREADY_COMMITTED = + public static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_COMMITS_COUNT = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.commitsCount"; + public static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSET_ALREADY_COMMITTED = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.offsetAlreadyCommitted"; - private static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_TO_BE_COMMITED_COUNT = + public static final String GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_TO_BE_COMMITED_COUNT = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.offsetsToBeCommitedCount"; - private static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_COMPLETED_COUNT = + public static final String GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_COMPLETED_COUNT = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.offsetsCompletedCount"; - private static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_UNPROCESSED_FETCHED_RECORDS_COUNT = + public static final String GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_UNPROCESSED_FETCHED_RECORDS_COUNT = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.unprocessedFetchedRecordsCount"; - private static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_COUNT = + public static final String GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_COUNT = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.offsetsCount"; - private static final String METRIC_HEALTH_TASKS_IN_ERROR_COUNT = METRIC_PREFIX + "health.tasksInErrorCount"; - private static final String METRIC_HEALTH_STUCK_TASKS_COUNT = METRIC_PREFIX + "health.stuckTasksCount"; - private static final String METRIC_HEALTH_TASK_HISTORY_LENGTH_SECONDS = METRIC_PREFIX + "health.tasksHistoryLengthSeconds"; - private static final String METRIC_HEALTH_TASKS_IN_ERROR_COUNT_PER_TYPE = METRIC_PREFIX + "health.tasksInErrorCountPerType"; - private static final String METRIC_HEALTH_STUCK_TASKS_COUNT_PER_TYPE = METRIC_PREFIX + "health.stuckTasksCountPerType"; - private static final String METRIC_STATE_APPROXIMATE_TASKS = METRIC_PREFIX + "state.approximateTasks"; - private static final String METRIC_STATE_APPROXIMATE_UNIQUE_KEYS = METRIC_PREFIX + "state.approximateUniqueKeys"; - private static final String METRIC_STATE_APPROXIMATE_TASK_DATAS = METRIC_PREFIX + "state.approximateTaskDatas"; - private static final String METRIC_PROCESSING_TYPE_TRIGGERS_COUNT = METRIC_PREFIX + "processing.typeTriggersCount"; - private static final String METRIC_PROCESSING_RUNNING_TASKS_COUNT = METRIC_PREFIX + "processing.runningTasksCount"; - private static final String METRIC_PROCESSING_IN_PROGRESS_TASKS_GRABBING_COUNT = METRIC_PREFIX + "processing.inProgressTasksGrabbingCount"; - private static final String METRIC_PROCESSING_TRIGGERS_COUNT = METRIC_PREFIX + "processing.triggersCount"; - private static final String METRIC_PROCESSING_STATE_VERSION = METRIC_PREFIX + "processing.stateVersion"; + public static final String GAUGE_METRIC_HEALTH_TASKS_IN_ERROR_COUNT = METRIC_PREFIX + "health.tasksInErrorCount"; + public static final String GAUGE_HEALTH_STUCK_TASKS_COUNT = METRIC_PREFIX + "health.stuckTasksCount"; + public static final String GAUGE_HEALTH_TASK_HISTORY_LENGTH_SECONDS = METRIC_PREFIX + "health.tasksHistoryLengthSeconds"; + public static final String GAUGE_HEALTH_TASKS_IN_ERROR_COUNT_PER_TYPE = METRIC_PREFIX + "health.tasksInErrorCountPerType"; + public static final String GAUGE_HEALTH_STUCK_TASKS_COUNT_PER_TYPE = METRIC_PREFIX + "health.stuckTasksCountPerType"; + public static final String GAUGE_METRIC_STATE_APPROXIMATE_TASKS = METRIC_PREFIX + "state.approximateTasks"; + public static final String GAUGE_STATE_APPROXIMATE_UNIQUE_KEYS = METRIC_PREFIX + "state.approximateUniqueKeys"; + public static final String GAUGE_STATE_APPROXIMATE_TASK_DATAS = METRIC_PREFIX + "state.approximateTaskDatas"; + public static final String GAUGE_PROCESSING_TYPE_TRIGGERS_COUNT = METRIC_PREFIX + "processing.typeTriggersCount"; + public static final String GAUGE_PROCESSING_RUNNING_TASKS_COUNT = METRIC_PREFIX + "processing.runningTasksCount"; + public static final String GAUGE_PROCESSING_IN_PROGRESS_TASKS_GRABBING_COUNT = METRIC_PREFIX + "processing.inProgressTasksGrabbingCount"; + public static final String GAUGE_PROCESSING_TRIGGERS_COUNT = METRIC_PREFIX + "processing.triggersCount"; + public static final String GAUGE_PROCESSING_STATE_VERSION = METRIC_PREFIX + "processing.stateVersion"; private static final String TAG_PROCESSING_RESULT = "processingResult"; private static final String TAG_FROM_STATUS = "fromStatus"; @@ -137,10 +137,10 @@ public void registerTaskProcessingStart(String bucketId, String taskType) { meterCache.counter(METRIC_TASKS_PROCESSINGS_COUNT, TagsSet.of(TAG_BUCKET_ID, resolvedBucketId, TAG_TASK_TYPE, taskType)) .increment(); - taskProcessingGauges.computeIfAbsent(Triple.of(METRIC_TASKS_ONGOING_PROCESSINGS_COUNT, resolvedBucketId, taskType), (t) -> { + taskProcessingGauges.computeIfAbsent(Triple.of(GAUGE_TASKS_ONGOING_PROCESSINGS_COUNT, resolvedBucketId, taskType), (t) -> { AtomicInteger counter = new AtomicInteger(0); meterCache.getMeterRegistry() - .gauge(METRIC_TASKS_ONGOING_PROCESSINGS_COUNT, Tags.of(TAG_BUCKET_ID, resolvedBucketId, TAG_TASK_TYPE, taskType), counter); + .gauge(GAUGE_TASKS_ONGOING_PROCESSINGS_COUNT, Tags.of(TAG_BUCKET_ID, resolvedBucketId, TAG_TASK_TYPE, taskType), counter); return counter; }).incrementAndGet(); } @@ -154,7 +154,7 @@ public void registerTaskProcessingEnd(String bucketId, String taskType, long pro meterCache.timer(METRIC_TASKS_PROCESSING_TIME, TagsSet.of(TAG_BUCKET_ID, resolvedBucketId, TAG_TASK_TYPE, taskType, TAG_PROCESSING_RESULT, processingResult)) .record(TwContextClockHolder.getClock().millis() - processingStartTimeMs, TimeUnit.MILLISECONDS); - taskProcessingGauges.get(Triple.of(METRIC_TASKS_ONGOING_PROCESSINGS_COUNT, resolvedBucketId, taskType)) + taskProcessingGauges.get(Triple.of(GAUGE_TASKS_ONGOING_PROCESSINGS_COUNT, resolvedBucketId, taskType)) .decrementAndGet(); } @@ -291,25 +291,25 @@ TAG_DATA_SIZE, getDataSizeBucket(data) @Override public void registerInProgressTriggeringsCount(AtomicInteger count) { - Gauge.builder(METRIC_TASKS_SERVICE_IN_PROGRESS_TRIGGERINGS_COUNT, count::get) + Gauge.builder(GAUGE_TASKS_SERVICE_IN_PROGRESS_TRIGGERINGS_COUNT, count::get) .register(meterCache.getMeterRegistry()); } @Override public void registerActiveTriggeringsCount(AtomicInteger count) { - Gauge.builder(METRIC_TASKS_SERVICE_ACTIVE_TRIGGERINGS_COUNT, count::get) + Gauge.builder(GAUGE_TASKS_SERVICE_ACTIVE_TRIGGERINGS_COUNT, count::get) .register(meterCache.getMeterRegistry()); } @Override public void registerOngoingTasksGrabbingsCount(AtomicInteger count) { - Gauge.builder(METRIC_PROCESSING_ONGOING_TASKS_GRABBINGS_COUNT, count::get) + Gauge.builder(GAUGE_METRIC_PROCESSING_ONGOING_TASKS_GRABBINGS_COUNT, count::get) .register(meterCache.getMeterRegistry()); } @Override public void registerPollingBucketsCount(AtomicInteger count) { - Gauge.builder(METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_POLLING_BUCKETS_COUNT, count::get) + Gauge.builder(GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_POLLING_BUCKETS_COUNT, count::get) .register(meterCache.getMeterRegistry()); } @@ -358,7 +358,7 @@ public void registerKafkaTasksExecutionTriggererAlreadyCommitedOffset(String buc @Override public Object registerTasksCleanerDeleteLagSeconds(TaskStatus status, AtomicLong lagInSeconds) { - return registerGauge(METRIC_TASKS_CLEANER_DELETE_LAG_SECONDS, lagInSeconds::get, TAG_TASK_STATUS, status.name()); + return registerGauge(GAUGE_TASKS_CLEANER_DELETE_LAG_SECONDS, lagInSeconds::get, TAG_TASK_STATUS, status.name()); } @Override @@ -373,90 +373,90 @@ public void unregisterMetric(Object rawMetricHandle) { @Override public Object registerTasksInErrorCount(AtomicInteger erroneousTasksCount) { - return registerGauge(METRIC_HEALTH_TASKS_IN_ERROR_COUNT, erroneousTasksCount::get); + return registerGauge(GAUGE_METRIC_HEALTH_TASKS_IN_ERROR_COUNT, erroneousTasksCount::get); } @Override public Object registerTasksInErrorCount(String taskType, AtomicInteger count) { - return registerGauge(METRIC_HEALTH_TASKS_IN_ERROR_COUNT_PER_TYPE, count::get, TAG_TASK_TYPE, taskType); + return registerGauge(GAUGE_HEALTH_TASKS_IN_ERROR_COUNT_PER_TYPE, count::get, TAG_TASK_TYPE, taskType); } @Override public Object registerStuckTasksCount(AtomicInteger stuckTasksCount) { - return registerGauge(METRIC_HEALTH_STUCK_TASKS_COUNT, stuckTasksCount::get); + return registerGauge(GAUGE_HEALTH_STUCK_TASKS_COUNT, stuckTasksCount::get); } @Override public Object registerStuckTasksCount(TaskStatus status, String type, AtomicInteger count) { - return registerGauge(METRIC_HEALTH_STUCK_TASKS_COUNT_PER_TYPE, count::get, TAG_TASK_STATUS, status.name(), TAG_TASK_TYPE, type); + return registerGauge(GAUGE_HEALTH_STUCK_TASKS_COUNT_PER_TYPE, count::get, TAG_TASK_STATUS, status.name(), TAG_TASK_TYPE, type); } @Override public Object registerApproximateTasksCount(AtomicLong approximateTasksCount) { - return registerGauge(METRIC_STATE_APPROXIMATE_TASKS, approximateTasksCount::get); + return registerGauge(GAUGE_METRIC_STATE_APPROXIMATE_TASKS, approximateTasksCount::get); } @Override public Object registerApproximateUniqueKeysCount(AtomicLong approximateUniqueKeysCount) { - return registerGauge(METRIC_STATE_APPROXIMATE_UNIQUE_KEYS, approximateUniqueKeysCount::get); + return registerGauge(GAUGE_STATE_APPROXIMATE_UNIQUE_KEYS, approximateUniqueKeysCount::get); } @Override public Object registerApproximateTaskDatasCount(AtomicLong approximateTaskDatasCount) { - return registerGauge(METRIC_STATE_APPROXIMATE_TASK_DATAS, approximateTaskDatasCount::get); + return registerGauge(GAUGE_STATE_APPROXIMATE_TASK_DATAS, approximateTaskDatasCount::get); } @Override public Object registerTaskHistoryLength(TaskStatus status, AtomicLong lengthInSeconds) { - return registerGauge(METRIC_HEALTH_TASK_HISTORY_LENGTH_SECONDS, lengthInSeconds::get, TAG_TASK_STATUS, status.name()); + return registerGauge(GAUGE_HEALTH_TASK_HISTORY_LENGTH_SECONDS, lengthInSeconds::get, TAG_TASK_STATUS, status.name()); } @Override public Object registerProcessingTriggersCount(String bucketId, String taskType, Supplier<Number> countSupplier) { - return registerGauge(METRIC_PROCESSING_TYPE_TRIGGERS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId), + return registerGauge(GAUGE_PROCESSING_TYPE_TRIGGERS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId), TAG_TASK_TYPE, taskType); } @Override public Object registerProcessingTriggersCount(String bucketId, Supplier<Number> countSupplier) { - return registerGauge(METRIC_PROCESSING_TRIGGERS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); + return registerGauge(GAUGE_PROCESSING_TRIGGERS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); } @Override public Object registerRunningTasksCount(String bucketId, Supplier<Number> countSupplier) { - return registerGauge(METRIC_PROCESSING_RUNNING_TASKS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); + return registerGauge(GAUGE_PROCESSING_RUNNING_TASKS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); } @Override public Object registerInProgressTasksGrabbingCount(String bucketId, Supplier<Number> countSupplier) { - return registerGauge(METRIC_PROCESSING_IN_PROGRESS_TASKS_GRABBING_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); + return registerGauge(GAUGE_PROCESSING_IN_PROGRESS_TASKS_GRABBING_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); } @Override public Object registerProcessingStateVersion(String bucketId, Supplier<Number> countSupplier) { - return registerGauge(METRIC_PROCESSING_STATE_VERSION, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); + return registerGauge(GAUGE_PROCESSING_STATE_VERSION, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); } @Override public Object registerKafkaTasksExecutionTriggererOffsetsToBeCommitedCount(String bucketId, Supplier<Number> countSupplier) { - return registerGauge(METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_TO_BE_COMMITED_COUNT, countSupplier, TAG_BUCKET_ID, + return registerGauge(GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_TO_BE_COMMITED_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); } @Override public Object registerKafkaTasksExecutionTriggererOffsetsCompletedCount(String bucketId, Supplier<Number> countSupplier) { - return registerGauge(METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_COMPLETED_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); + return registerGauge(GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_COMPLETED_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); } @Override public Object registerKafkaTasksExecutionTriggererUnprocessedFetchedRecordsCount(String bucketId, Supplier<Number> countSupplier) { - return registerGauge(METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_UNPROCESSED_FETCHED_RECORDS_COUNT, countSupplier, TAG_BUCKET_ID, + return registerGauge(GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_UNPROCESSED_FETCHED_RECORDS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); } @Override public Object registerKafkaTasksExecutionTriggererOffsetsCount(String bucketId, Supplier<Number> countSupplier) { - return registerGauge(METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); + return registerGauge(GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); } @Override @@ -471,7 +471,7 @@ public void registerLibrary() { version = "Unknown"; } - Gauge.builder(METRIC_LIBRARY_INFO, () -> 1d).tags("version", version, "library", "tw-tasks-core") + Gauge.builder(GAUGE_LIBRARY_INFO, () -> 1d).tags("version", version, "library", "tw-tasks-core") .description("Provides metadata about the library, for example the version.") .register(meterCache.getMeterRegistry()); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/GlobalProcessingState.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/GlobalProcessingState.java index 1350b7ec..a8b2e4ac 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/GlobalProcessingState.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/GlobalProcessingState.java @@ -8,6 +8,8 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -39,8 +41,7 @@ public static class Bucket { private AtomicInteger size = new AtomicInteger(); private AtomicInteger runningTasksCount = new AtomicInteger(); private AtomicInteger inProgressTasksGrabbingCount = new AtomicInteger(); - private Lock tasksGrabbingLock = new ReentrantLock(); - private Condition tasksGrabbingCondition = tasksGrabbingLock.newCondition(); + private Semaphore tasksGrabbingSemaphore; public Bucket(int minPriority, int maxPriority) { for (int i = minPriority; i <= maxPriority; i++) { @@ -48,12 +49,22 @@ public Bucket(int minPriority, int maxPriority) { } } + // Optimization to avoid waiting behind a lock, but also reducing the amount of signals and context switches. + // One ongoing version update is enough to wake up all necessary components. + private AtomicBoolean versionUpdateInProgress = new AtomicBoolean(); + public void increaseVersion() { + if (versionUpdateInProgress.getAndSet(true)) { + return; + } + versionLock.lock(); try { version.incrementAndGet(); versionCondition.signalAll(); } finally { + // Needs to happen before we unlock. + versionUpdateInProgress.getAndSet(false); versionLock.unlock(); } } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/TasksProcessingService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/TasksProcessingService.java index 13625bba..9736ab60 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/TasksProcessingService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/TasksProcessingService.java @@ -299,7 +299,6 @@ protected void markAsError(IBaseTask task, String bucketId) { protected ProcessTaskResponse grabTaskForProcessing(String bucketId, BaseTask task) { GlobalProcessingState.Bucket bucket = globalProcessingState.getBuckets().get(bucketId); - BucketProperties bucketProperties = bucketsManager.getBucketProperties(bucketId); ITaskHandler taskHandler = taskHandlerRegistry.getTaskHandler(task); if (taskHandler == null) { @@ -335,15 +334,8 @@ protected ProcessTaskResponse grabTaskForProcessing(String bucketId, BaseTask ta } try { - bucket.getTasksGrabbingLock().lock(); - try { - while (bucket.getInProgressTasksGrabbingCount().incrementAndGet() > bucketProperties.getTaskGrabbingMaxConcurrency()) { - bucket.getInProgressTasksGrabbingCount().decrementAndGet(); - boolean ignored = bucket.getTasksGrabbingCondition().await(tasksProperties.getGenericMediumDelay().toMillis(), TimeUnit.MILLISECONDS); - } - } finally { - bucket.getTasksGrabbingLock().unlock(); - } + bucket.getTasksGrabbingSemaphore().acquire(); + bucket.getInProgressTasksGrabbingCount().incrementAndGet(); ongoingTasksGrabbingsCount.incrementAndGet(); tasksGrabbingExecutor.submit(() -> { try { @@ -402,15 +394,8 @@ protected void grabTaskForProcessing0(GlobalProcessingState.Bucket bucket, BaseT bucket.getSize().decrementAndGet(); bucket.increaseVersion(); - Lock tasksGrabbingLock = bucket.getTasksGrabbingLock(); - tasksGrabbingLock.lock(); - try { - bucket.getInProgressTasksGrabbingCount().decrementAndGet(); - bucket.getTasksGrabbingCondition().signalAll(); - } finally { - tasksGrabbingLock.unlock(); - } - + bucket.getInProgressTasksGrabbingCount().decrementAndGet(); + bucket.getTasksGrabbingSemaphore().release(); ongoingTasksGrabbingsCount.decrementAndGet(); } }); @@ -761,7 +746,7 @@ public void startProcessing() { if (waitTimeMs > 0) { try { - bucket.getVersionCondition().await(waitTimeMs, TimeUnit.MILLISECONDS); + var ignored = bucket.getVersionCondition().await(waitTimeMs, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error(e.getMessage(), e); } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer.java index b9a864fc..69760124 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer.java @@ -35,12 +35,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -77,6 +80,7 @@ @Slf4j public class KafkaTasksExecutionTriggerer implements ITasksExecutionTriggerer, GracefulShutdownStrategy, InitializingBean { + private static final int MAX_KAFKA_PRODUCER_INSTANTIATION_ATTEMPTS = 5; private static final int KAFKA_PRODUCER_INSTANTIATION_FAILURE_WAIT_TIME_MS = 500; @Autowired @@ -114,6 +118,7 @@ public class KafkaTasksExecutionTriggerer implements ITasksExecutionTriggerer, G private final AtomicInteger pollingBucketsCount = new AtomicInteger(); private final Lock lifecycleLock = new ReentrantLock(); + @Override public void afterPropertiesSet() { executorService = executorsHelper.newCachedExecutor("ktet"); @@ -125,11 +130,10 @@ public void afterPropertiesSet() { } String bucketId = taskTriggering.getBucketId(); - ConsumerBucket consumerBucket = consumerBuckets.get(bucketId); - TopicPartition topicPartition = taskTriggering.getTopicPartition(); - long offset = taskTriggering.getOffset(); + var consumerBucket = consumerBuckets.get(bucketId); + consumerBucket.finishedTaskTriggerings.add(taskTriggering); - releaseCompletedOffset(consumerBucket, topicPartition, offset); + releaseCompletedOffsetsIfNoOneIsOnIt(consumerBucket); }); coreMetricsTemplate.registerPollingBucketsCount(pollingBucketsCount); @@ -345,43 +349,67 @@ void registerPolledOffset(ConsumerBucket consumerBucket, TopicPartition topicPar } } - void releaseCompletedOffset(ConsumerBucket consumerBucket, TopicPartition topicPartition, long offset) { - consumerBucket.getOffsetsStorageLock().lock(); - // TODO: Lots of stuff inside this lock... - try { - ConsumerTopicPartition consumerTopicPartition = consumerBucket.getConsumerTopicPartitions().get(topicPartition); + void releaseCompletedOffsetsIfNoOneIsOnIt(ConsumerBucket consumerBucket) { + if (consumerBucket.finishedTaskTriggeringsProcessingInProgress.getAndSet(true)) { + return; + } - TreeSet<Long> offsets = consumerTopicPartition.getOffsets(); - if (!offsets.contains(offset)) { - // Theoretically possible, when we reconnect to Kafka and we had registered one offset multiple times - // (in this case there is only single record in TreeSet for it). + try { + releaseCompletedOffsets(consumerBucket); + } finally { + consumerBucket.finishedTaskTriggeringsProcessingInProgress.getAndSet(false); + } + } - coreMetricsTemplate.registerKafkaTasksExecutionTriggererAlreadyCommitedOffset(consumerBucket.getBucketId()); - log.debug("Offset {} has already been commited.", offset); - return; - } - consumerTopicPartition.getOffsetsCompleted().put(offset, Boolean.TRUE); - - boolean isFirst = offsets.first() == offset; - if (isFirst) { - while (!offsets.isEmpty()) { - long firstOffset = offsets.first(); - if (consumerTopicPartition.isDone(firstOffset)) { - // From Kafka Docs - // Note: The committed offset should always be the offset of the next message that your application will read. - consumerBucket.getOffsetsToBeCommitted().put(topicPartition, new OffsetAndMetadata(firstOffset + 1)); - offsets.pollFirst(); - consumerTopicPartition.getOffsetsCompleted().remove(firstOffset); - } else { - break; - } + void releaseCompletedOffsets(ConsumerBucket consumerBucket) { + consumerBucket.getOffsetsStorageLock().lock(); + try { + while (true) { + var taskTriggering = consumerBucket.finishedTaskTriggerings.poll(); + if (taskTriggering == null) { + return; } + + TopicPartition topicPartition = taskTriggering.getTopicPartition(); + long offset = taskTriggering.getOffset(); + releaseCompletedOffset(consumerBucket, topicPartition, offset); } } finally { consumerBucket.getOffsetsStorageLock().unlock(); } } + void releaseCompletedOffset(ConsumerBucket consumerBucket, TopicPartition topicPartition, long offset) { + ConsumerTopicPartition consumerTopicPartition = consumerBucket.getConsumerTopicPartitions().get(topicPartition); + + TreeSet<Long> offsets = consumerTopicPartition.getOffsets(); + if (!offsets.contains(offset)) { + // Theoretically possible, when we reconnect to Kafka, and we had registered one offset multiple times + // (in this case there is only single record in TreeSet for it). + + coreMetricsTemplate.registerKafkaTasksExecutionTriggererAlreadyCommitedOffset(consumerBucket.getBucketId()); + log.debug("Offset {} has already been commited.", offset); + return; + } + consumerTopicPartition.getOffsetsCompleted().put(offset, Boolean.TRUE); + + boolean isFirst = offsets.first() == offset; + if (isFirst) { + while (!offsets.isEmpty()) { + long firstOffset = offsets.first(); + if (consumerTopicPartition.isDone(firstOffset)) { + // From Kafka Docs + // Note: The committed offset should always be the offset of the next message that your application will read. + consumerBucket.getOffsetsToBeCommitted().put(topicPartition, new OffsetAndMetadata(firstOffset + 1)); + offsets.pollFirst(); + consumerTopicPartition.getOffsetsCompleted().remove(firstOffset); + } else { + break; + } + } + } + } + private void commitSync(ConsumerBucket consumerBucket, Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) { if (offsetsToCommit.isEmpty()) { return; @@ -391,6 +419,8 @@ private void commitSync(ConsumerBucket consumerBucket, Map<TopicPartition, Offse var success = false; try { + releaseCompletedOffsets(consumerBucket); + if (log.isDebugEnabled()) { log.debug("Sync-committing bucket '" + bucketId + "' offsets to Kafka: " + offsetsToCommit.entrySet().stream() .map(e -> e.getKey() + ":" + e.getValue().offset()).collect(Collectors.joining(", "))); @@ -444,6 +474,8 @@ private void commitOffsetsWithLowFrequency(ConsumerBucket consumerBucket) { .map(e -> e.getKey() + ":" + e.getValue().offset()).collect(Collectors.joining(", "))); } + releaseCompletedOffsets(consumerBucket); + consumerBucket.getKafkaConsumer().commitAsync(consumerBucket.getOffsetsToBeCommitted(), (map, e) -> { if (e != null) { coreMetricsTemplate.registerKafkaTasksExecutionTriggererCommit(bucketId, false, false); @@ -739,6 +771,8 @@ public static class ConsumerBucket { private Map<TopicPartition, OffsetAndMetadata> offsetsToBeCommitted = new ConcurrentHashMap<>(); private int unprocessedFetchedRecordsCount; private boolean topicConfigured; + private Queue<TaskTriggering> finishedTaskTriggerings = new ConcurrentLinkedQueue<>(); + private AtomicBoolean finishedTaskTriggeringsProcessingInProgress = new AtomicBoolean(false); public int getOffsetsToBeCommitedCount() { return offsetsToBeCommitted.size(); diff --git a/tw-tasks-incidents/src/main/java/com/transferwise/tasks/health/TasksIncidentGenerator.java b/tw-tasks-incidents/src/main/java/com/transferwise/tasks/health/TasksIncidentGenerator.java index b3c4c001..20600b88 100644 --- a/tw-tasks-incidents/src/main/java/com/transferwise/tasks/health/TasksIncidentGenerator.java +++ b/tw-tasks-incidents/src/main/java/com/transferwise/tasks/health/TasksIncidentGenerator.java @@ -46,7 +46,7 @@ public List<Incident> getActiveIncidents() { errorIncident = new Incident() .setId("twTasks/error") .setMessage(buildDetailedErrorReport(erroneousTasksCountByType, "in ERROR")) - .setSummary("" + cnt + " tasks in ERROR state.") + .setSummary(cnt + " tasks in ERROR state.") .setMetaData(Collections.singletonMap(TASK_CNT_KEY, String.valueOf(cnt))); } } else { diff --git a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementService.java b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementService.java index 755555ff..3e2122bc 100644 --- a/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementService.java +++ b/tw-tasks-management/src/main/java/com/transferwise/tasks/management/TasksManagementService.java @@ -117,21 +117,22 @@ public ResumeTasksImmediatelyResponse resumeTasksImmediately(ResumeTasksImmediat @Transactional(rollbackFor = Exception.class) public ResumeTasksImmediatelyResponse resumeAllTasksImmediately(ResumeAllTasksImmediatelyRequest request) { return entryPointsHelper - .continueOrCreate(ManagementEntryPointGroups.TW_TASKS_MANAGEMENT, ManagementEntryPointNames.RESUME_ALL_IMMEDIATELY, () -> { - ResumeTasksImmediatelyResponse response = new ResumeTasksImmediatelyResponse(); + .continueOrCreate(ManagementEntryPointGroups.TW_TASKS_MANAGEMENT, ManagementEntryPointNames.RESUME_ALL_IMMEDIATELY, + () -> { + ResumeTasksImmediatelyResponse response = new ResumeTasksImmediatelyResponse(); - if (StringUtils.isEmpty(request.getTaskType())) { - return response; - } + if (StringUtils.isEmpty(request.getTaskType())) { + return response; + } - List<DaoTask1> tasksInError = managementTaskDao.getTasksInErrorStatus(request.getMaxCount(), List.of(request.getTaskType()), null); - List<TaskVersionId> taskVersionIdsToResume = tasksInError.stream() - .filter(t -> t.getType().equals(request.getTaskType())) - .map(t -> new TaskVersionId().setId(t.getId()).setVersion(t.getVersion())) - .collect(Collectors.toList()); + List<DaoTask1> tasksInError = managementTaskDao.getTasksInErrorStatus(request.getMaxCount(), List.of(request.getTaskType()), null); + List<TaskVersionId> taskVersionIdsToResume = tasksInError.stream() + .filter(t -> t.getType().equals(request.getTaskType())) + .map(t -> new TaskVersionId().setId(t.getId()).setVersion(t.getVersion())) + .collect(Collectors.toList()); - return resumeTasksImmediately(new ResumeTasksImmediatelyRequest().setTaskVersionIds(taskVersionIdsToResume)); - }); + return resumeTasksImmediately(new ResumeTasksImmediatelyRequest().setTaskVersionIds(taskVersionIdsToResume)); + }); } @Override @@ -143,10 +144,10 @@ public GetTasksInErrorResponse getTasksInError(GetTasksInErrorRequest request) { return new GetTasksInErrorResponse().setTasksInError( tasks.stream().map(t -> new GetTasksInErrorResponse.TaskInError() - .setErrorTime(t.getStateTime().toInstant()) - .setTaskVersionId(new TaskVersionId().setId(t.getId()).setVersion(t.getVersion())) - .setType(t.getType()) - .setSubType(t.getSubType())) + .setErrorTime(t.getStateTime().toInstant()) + .setTaskVersionId(new TaskVersionId().setId(t.getId()).setVersion(t.getVersion())) + .setType(t.getType()) + .setSubType(t.getSubType())) .collect(Collectors.toList())); }); } @@ -161,8 +162,8 @@ public GetTasksStuckResponse getTasksStuck(GetTasksStuckRequest request) { return new GetTasksStuckResponse().setTasksStuck( tasks.stream().map(t -> new GetTasksStuckResponse.TaskStuck() - .setStuckTime(t.getNextEventTime().toInstant()) - .setTaskVersionId(new TaskVersionId().setId(t.getId()).setVersion(t.getVersion()))) + .setStuckTime(t.getNextEventTime().toInstant()) + .setTaskVersionId(new TaskVersionId().setId(t.getId()).setVersion(t.getVersion()))) .collect(Collectors.toList())); }); } @@ -177,11 +178,11 @@ public GetTasksInProcessingOrWaitingResponse getTasksInProcessingOrWaiting(GetTa request.getMaxCount(), request.getTaskTypes(), request.getTaskSubTypes()); return new GetTasksInProcessingOrWaitingResponse().setTasksInProcessingOrWaiting( tasks.stream().map(t -> new GetTasksInProcessingOrWaitingResponse.TaskInProcessingOrWaiting() - .setTaskVersionId(new TaskVersionId().setId(t.getId()).setVersion(t.getVersion())) - .setType(t.getType()) - .setSubType(t.getSubType()) - .setStatus(t.getStatus()) - .setStateTime(t.getStateTime().toInstant())) + .setTaskVersionId(new TaskVersionId().setId(t.getId()).setVersion(t.getVersion())) + .setType(t.getType()) + .setSubType(t.getSubType()) + .setStatus(t.getStatus()) + .setStateTime(t.getStateTime().toInstant())) .collect(Collectors.toList())); }); } @@ -194,12 +195,12 @@ public GetTasksByIdResponse getTasksById(GetTasksByIdRequest request) { List<FullTaskRecord> tasks = managementTaskDao.getTasks(request.getTaskIds()); return new GetTasksByIdResponse().setTasks( tasks.stream().map(t -> new GetTasksByIdResponse.Task() - .setTaskVersionId(new TaskVersionId().setId(t.getId()).setVersion(t.getVersion())) - .setType(t.getType()) - .setSubType(t.getSubType()) - .setStatus(t.getStatus()) - .setStateTime(t.getStateTime().toInstant()) - ) + .setTaskVersionId(new TaskVersionId().setId(t.getId()).setVersion(t.getVersion())) + .setType(t.getType()) + .setSubType(t.getSubType()) + .setStatus(t.getStatus()) + .setStateTime(t.getStateTime().toInstant()) + ) .collect(Collectors.toList()) ); });