Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub)!: set max ack extension period to 60 minutes #3501

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,10 @@ public class GcpPubSubAutoConfiguration {

private final ApplicationContext applicationContext;

private ThreadPoolTaskScheduler globalScheduler;

private FlowControlSettings globalFlowControlSettings;

private RetrySettings globalRetrySettings;

private ExecutorProvider globalExecutorProvider;

private ObjectProvider<SelectiveSchedulerThreadNameProvider> selectiveSchedulerThreadNameProvider;

public GcpPubSubAutoConfiguration(
Expand Down Expand Up @@ -257,7 +253,6 @@ public SubscriberFactory defaultSubscriberFactory(
factory.setExecutorProvider(executorProvider.get());
}
factory.setExecutorProviderMap(this.executorProviderMap);
factory.setGlobalExecutorProvider(this.globalExecutorProvider);

factory.setCredentialsProvider(this.finalCredentialsProvider);
factory.setHeaderProvider(this.headerProvider);
Expand Down Expand Up @@ -464,13 +459,6 @@ public void registerSubscriberSettings() {
}

private void registerSubscriberThreadPoolSchedulerBeans(GenericApplicationContext context) {
Integer numThreads = getGlobalExecutorThreads();
this.globalScheduler =
createAndRegisterSchedulerBean(
numThreads,
"global-gcp-pubsub-subscriber",
"globalPubSubSubscriberThreadPoolScheduler",
context);
registerSelectiveSchedulerBeans(context);
}

Expand All @@ -494,11 +482,6 @@ private void registerExecutorProviderBeans(GenericApplicationContext context) {
if (context.containsBean("subscriberExecutorProvider")) {
return;
}
if (this.globalScheduler != null) {
this.globalExecutorProvider =
createAndRegisterExecutorProvider(
"globalSubscriberExecutorProvider", this.globalScheduler, context);
}
createAndRegisterSelectiveExecutorProvider(context);
}

Expand Down Expand Up @@ -559,10 +542,11 @@ private ThreadPoolTaskScheduler createAndRegisterSchedulerBean(
String threadName,
String beanName,
GenericApplicationContext context) {
ThreadPoolTaskScheduler scheduler = createThreadPoolTaskScheduler(executorThreads, threadName);
ThreadPoolTaskScheduler scheduler;
scheduler = executorThreads == null ? null : createThreadPoolTaskScheduler(executorThreads, threadName);
context.registerBeanDefinition(
beanName,
BeanDefinitionBuilder.genericBeanDefinition(ThreadPoolTaskScheduler.class, () -> scheduler)
BeanDefinitionBuilder.genericBeanDefinition(ThreadPoolTaskScheduler.class, () -> scheduler)
.getBeanDefinition());
return scheduler;
}
Expand Down Expand Up @@ -660,7 +644,6 @@ private void createAndRegisterSelectiveRetrySettings(GenericApplicationContext c
}

private Integer getGlobalExecutorThreads() {
Integer numThreads = this.gcpPubSubProperties.getSubscriber().getExecutorThreads();
return numThreads != null ? numThreads : PubSubConfiguration.DEFAULT_EXECUTOR_THREADS;
return this.gcpPubSubProperties.getSubscriber().getExecutorThreads();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.StatusCode.Code;
Expand Down Expand Up @@ -144,6 +145,20 @@ void maxInboundMetadataSize_default() {
});
}

@Test
void defaultSubscriberFactory_noExecutorThreadsSet_usesClientDefault() {
contextRunner.run(
ctx -> {
DefaultSubscriberFactory defaultSubscriberFactory =
ctx.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
Subscriber subscriber = defaultSubscriberFactory.createSubscriber("dead10cc", (message, consumer) -> {});
// we confirm that the created subscriber uses the default thread setting in the client (5 as of Jan 2025).
InstantiatingExecutorProvider executorProvider = (InstantiatingExecutorProvider) FieldUtils.readField(subscriber, "executorProvider", true);
Integer threadsPerChannel = (Integer) FieldUtils.readField(subscriber, "THREADS_PER_CHANNEL", true);
assertThat(executorProvider.getExecutorThreadCount()).isEqualTo(threadsPerChannel);
});
}

@Test
void retryableCodes_default() {
contextRunner.run(
Expand Down Expand Up @@ -283,59 +298,21 @@ void customExecutorProviderUsedWhenProvided() {
DefaultSubscriberFactory factory =
ctx.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
assertThat(factory.getExecutorProvider("name")).isSameAs(executorProvider);
assertThat(ctx.containsBean("globalSubscriberExecutorProvider")).isFalse();
assertThat(ctx.containsBean("subscriberExecutorProvider-name")).isFalse();
});
}

@Test
void threadPoolScheduler_noConfigurationSet_globalCreated() {
contextRunner.run(
ctx -> {
ThreadPoolTaskScheduler globalSchedulerBean =
(ThreadPoolTaskScheduler) ctx.getBean("globalPubSubSubscriberThreadPoolScheduler");

assertThat(FieldUtils.readField(globalSchedulerBean, "poolSize", true)).isEqualTo(4);
assertThat(globalSchedulerBean.getThreadNamePrefix())
.isEqualTo("global-gcp-pubsub-subscriber");
assertThat(globalSchedulerBean.isDaemon()).isTrue();
});
}

@Test
void subscriberThreadPoolTaskScheduler_globalConfigurationSet() {
contextRunner
.withPropertyValues("spring.cloud.gcp.pubsub.subscriber.executor-threads=7")
.run(
ctx -> {
GcpPubSubProperties gcpPubSubProperties = ctx.getBean(GcpPubSubProperties.class);
ThreadPoolTaskScheduler globalSchedulerBean =
(ThreadPoolTaskScheduler) ctx.getBean(
"globalPubSubSubscriberThreadPoolScheduler");

assertThat(gcpPubSubProperties.getSubscriber().getExecutorThreads()).isEqualTo(7);
assertThat(globalSchedulerBean.getThreadNamePrefix())
.isEqualTo("global-gcp-pubsub-subscriber");
assertThat(FieldUtils.readField(globalSchedulerBean, "poolSize", true)).isEqualTo(7);
assertThat(globalSchedulerBean.isDaemon()).isTrue();
});
}

@Test
void subscriberExecutorProvider_globalConfigurationSet() {
contextRunner
.withPropertyValues("spring.cloud.gcp.pubsub.subscriber.executor-threads=7")
.run(
ctx -> {
DefaultSubscriberFactory factory =
(DefaultSubscriberFactory) ctx.getBean("defaultSubscriberFactory");
ExecutorProvider globalExecutorProvider =
(ExecutorProvider) ctx.getBean("globalSubscriberExecutorProvider");

assertThat(globalExecutorProvider).isNotNull();
assertThat(factory.getExecutorProvider("other")).isSameAs(globalExecutorProvider);
});
}

@Test
void threadPoolTaskScheduler_selectiveConfigurationSet() {
Expand All @@ -349,18 +326,11 @@ void threadPoolTaskScheduler_selectiveConfigurationSet() {
ThreadPoolTaskScheduler selectiveScheduler =
(ThreadPoolTaskScheduler) ctx.getBean(
"threadPoolScheduler_projects/fake project/subscriptions/subscription-name");
ThreadPoolTaskScheduler globalScheduler =
(ThreadPoolTaskScheduler) ctx.getBean(
"globalPubSubSubscriberThreadPoolScheduler");
assertThat(selectiveScheduler.getThreadNamePrefix())
.isEqualTo(
"gcp-pubsub-subscriber-projects/fake project/subscriptions/subscription-name");
assertThat(selectiveScheduler.isDaemon()).isTrue();
assertThat(FieldUtils.readField(selectiveScheduler, "poolSize", true)).isEqualTo(7);
assertThat(globalScheduler.getThreadNamePrefix())
.isEqualTo("global-gcp-pubsub-subscriber");
assertThat(FieldUtils.readField(globalScheduler, "poolSize", true)).isEqualTo(4);
assertThat(globalScheduler.isDaemon()).isTrue();
});
}

Expand Down Expand Up @@ -393,10 +363,6 @@ void subscriberExecutorProvider_selectiveConfigurationSet() {
ExecutorProvider selectiveExecutorProvider =
(ExecutorProvider) ctx.getBean(
"subscriberExecutorProvider-projects/fake project/subscriptions/subscription-name");
ExecutorProvider globalExecutorProvider =
(ExecutorProvider) ctx.getBean("globalSubscriberExecutorProvider");

assertThat(globalExecutorProvider).isNotNull();
assertThat(selectiveExecutorProvider).isNotNull();
assertThat(factory.getExecutorProvider("subscription-name"))
.isSameAs(selectiveExecutorProvider);
Expand All @@ -416,40 +382,11 @@ void threadPoolScheduler_globalAndSelectiveConfigurationSet() {
ThreadPoolTaskScheduler selectiveScheduler =
(ThreadPoolTaskScheduler) ctx.getBean(
"threadPoolScheduler_projects/fake project/subscriptions/subscription-name");
ThreadPoolTaskScheduler globalScheduler =
(ThreadPoolTaskScheduler) ctx.getBean(
"globalPubSubSubscriberThreadPoolScheduler");
assertThat(selectiveScheduler.getThreadNamePrefix())
.isEqualTo(
"gcp-pubsub-subscriber-projects/fake project/subscriptions/subscription-name");
assertThat(FieldUtils.readField(selectiveScheduler, "poolSize", true)).isEqualTo(3);
assertThat(selectiveScheduler.isDaemon()).isTrue();
assertThat(globalScheduler.getThreadNamePrefix())
.isEqualTo("global-gcp-pubsub-subscriber");
assertThat(FieldUtils.readField(globalScheduler, "poolSize", true)).isEqualTo(5);
assertThat(globalScheduler.isDaemon()).isTrue();
});
}

@Test
void threadPoolTaskScheduler_globalAndDifferentSelectiveConfigurationSet_onlyGlobalCreated() {
contextRunner
.withPropertyValues(
"spring.cloud.gcp.pubsub.subscriber.executor-threads=5",
"spring.cloud.gcp.pubsub.subscription.subscription-name.parallel-pull-count=3")
.run(
ctx -> {

// Verify that only global thread pool task scheduler is created
ThreadPoolTaskScheduler globalScheduler =
(ThreadPoolTaskScheduler) ctx.getBean(
"globalPubSubSubscriberThreadPoolScheduler");

assertThat(globalScheduler.getThreadNamePrefix())
.isEqualTo("global-gcp-pubsub-subscriber");
assertThat(globalScheduler.isDaemon()).isTrue();
assertThat(FieldUtils.readField(globalScheduler, "poolSize", true)).isEqualTo(5);
assertThat(ctx.containsBean("threadPoolScheduler_subscription-name")).isFalse();
});
}

Expand All @@ -461,15 +398,8 @@ void subscriberExecutorProvider_globalAndDifferentSelectiveConfigurationSet_only
"spring.cloud.gcp.pubsub.subscription.subscription-name.parallel-pull-count=3")
.run(
ctx -> {
DefaultSubscriberFactory factory =
(DefaultSubscriberFactory) ctx.getBean("defaultSubscriberFactory");

// Verify that global executor provider is created and used
ExecutorProvider globalExecutorProvider =
(ExecutorProvider) ctx.getBean("globalSubscriberExecutorProvider");
assertThat(
ctx.containsBean("subscriberExecutorProvider-subscription-name")).isFalse();
assertThat(factory.getGlobalExecutorProvider()).isSameAs(globalExecutorProvider);
});
}

Expand All @@ -486,12 +416,7 @@ void subscriberExecutorProvider_globalAndSelectiveConfigurationSet_selectiveTake
ExecutorProvider selectiveExecutorProvider =
(ExecutorProvider) ctx.getBean(
"subscriberExecutorProvider-projects/fake project/subscriptions/subscription-name");
ExecutorProvider globalExecutorProvider =
(ExecutorProvider) ctx.getBean("globalSubscriberExecutorProvider");

assertThat(selectiveExecutorProvider).isNotNull();
assertThat(globalExecutorProvider).isNotNull();
assertThat(factory.getGlobalExecutorProvider()).isNotNull();
assertThat(factory.getExecutorProvider("subscription-name"))
.isSameAs(selectiveExecutorProvider);
});
Expand All @@ -507,7 +432,7 @@ void pullConfig_defaultConfigurationSet() {
assertThat(
gcpPubSubProperties.computeMaxAckExtensionPeriod(
"subscription-name", projectIdProvider.getProjectId()))
.isZero();
.isNull();
assertThat(
gcpPubSubProperties.computeMinDurationPerAckExtension(
"subscription-name", projectIdProvider.getProjectId()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,9 @@ void testPull() {
assertThat(scheduler.getThreadNamePrefix()).isEqualTo(
"gcp-pubsub-subscriber-" + fullSubscriptionNameSub1);
assertThat(scheduler.isDaemon()).isTrue();
assertThat(
(ThreadPoolTaskScheduler)
context.getBean("globalPubSubSubscriberThreadPoolScheduler"))
.isNotNull();
assertThat((ExecutorProvider) context.getBean(
"subscriberExecutorProvider-" + fullSubscriptionNameSub1))
.isNotNull();
assertThat((ExecutorProvider) context.getBean("globalSubscriberExecutorProvider"))
.isNotNull();
assertThat(gcpPubSubProperties.computeRetryableCodes(subscriptionName, projectId))
.isEqualTo(new Code[]{Code.INTERNAL});
assertThat(gcpPubSubProperties.computePullEndpoint(fullSubscriptionNameSub1, projectId))
Expand Down Expand Up @@ -240,15 +234,9 @@ void testSubscribe() {
assertThat(scheduler.getThreadNamePrefix()).isEqualTo(
"gcp-pubsub-subscriber-" + fullSubscriptionNameSub2);
assertThat(scheduler.isDaemon()).isTrue();
assertThat(
(ThreadPoolTaskScheduler)
context.getBean("globalPubSubSubscriberThreadPoolScheduler"))
.isNotNull();
assertThat((ExecutorProvider) context.getBean(
"subscriberExecutorProvider-" + fullSubscriptionNameSub2))
.isNotNull();
assertThat((ExecutorProvider) context.getBean("globalSubscriberExecutorProvider"))
.isNotNull();
} finally {
pubSubAdmin.deleteSubscription(subscriptionName);
pubSubAdmin.deleteTopic(topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ public class PubSubConfiguration {

private static final Logger logger = LoggerFactory.getLogger(PubSubConfiguration.class);

/** Default number of executor threads. */
public static final int DEFAULT_EXECUTOR_THREADS = 4;

private static final Long DEFAULT_MAX_ACK_EXTENSION_PERIOD = 0L;

/**
* Automatically extracted user-provided properties. Contains only short subscription keys
* user-provided properties, therefore do not use except in initialize().
Expand Down Expand Up @@ -225,10 +220,7 @@ public Long computeMaxAckExtensionPeriod(String subscriptionName, String project
if (maxAckExtensionPeriod != null) {
return maxAckExtensionPeriod;
}
Long globalMaxAckExtensionPeriod = this.globalSubscriber.getMaxAckExtensionPeriod();
return globalMaxAckExtensionPeriod != null
? globalMaxAckExtensionPeriod
: DEFAULT_MAX_ACK_EXTENSION_PERIOD;
return this.globalSubscriber.getMaxAckExtensionPeriod();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ public class DefaultSubscriberFactory implements SubscriberFactory {

private Map<ProjectSubscriptionName, ExecutorProvider> executorProviderMap = new ConcurrentHashMap<>();

private ExecutorProvider globalExecutorProvider;

private Code[] retryableCodes;

/**
Expand Down Expand Up @@ -362,8 +360,7 @@ SubscriberStubSettings buildGlobalSubscriberStubSettings() throws IOException {
subscriberStubSettings.setEndpoint(endpoint);
}

ExecutorProvider executor =
this.executorProvider != null ? this.executorProvider : this.globalExecutorProvider;
ExecutorProvider executor = this.executorProvider;
if (executor != null) {
subscriberStubSettings.setBackgroundExecutorProvider(executor);
}
Expand Down Expand Up @@ -452,7 +449,7 @@ public ExecutorProvider getExecutorProvider(String subscriptionName) {
if (this.executorProviderMap.containsKey(projectSubscriptionName)) {
return this.executorProviderMap.get(projectSubscriptionName);
}
return this.globalExecutorProvider;
return null;
}

/**
Expand Down Expand Up @@ -499,8 +496,11 @@ Duration getMaxAckExtensionPeriod(String subscriptionName) {
if (this.maxAckExtensionPeriod != null) {
return this.maxAckExtensionPeriod;
}
return Duration.ofSeconds(
this.pubSubConfiguration.computeMaxAckExtensionPeriod(subscriptionName, projectId));
Long maxAckExtensionPeriod = this.pubSubConfiguration.computeMaxAckExtensionPeriod(subscriptionName, projectId);
if (maxAckExtensionPeriod != null) {
return Duration.ofSeconds(maxAckExtensionPeriod);
}
return null;
}

@Nullable
Expand Down Expand Up @@ -576,14 +576,6 @@ public void setExecutorProviderMap(Map<ProjectSubscriptionName, ExecutorProvider
this.executorProviderMap = executorProviderMap;
}

public void setGlobalExecutorProvider(ExecutorProvider executorProvider) {
this.globalExecutorProvider = executorProvider;
}

public ExecutorProvider getGlobalExecutorProvider() {
return this.globalExecutorProvider;
}

public void setFlowControlSettingsMap(
Map<ProjectSubscriptionName, FlowControlSettings> flowControlSettingsMap) {
this.flowControlSettingsMap = flowControlSettingsMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ void testComputeMaxAckExtensionPeriod_returnDefault() {
Long result =
pubSubConfiguration.computeMaxAckExtensionPeriod("subscription-name", "projectId");

assertThat(result).isZero();
assertThat(result).isNull();
}

@Test
Expand Down
Loading
Loading