Skip to content

Commit

Permalink
Support long polling in rocketmq proxy in the protocol (#5788)
Browse files Browse the repository at this point in the history
* Add long polling

* Change rocketmq-proto version to 2.0.2

* fix checkstyle

* Fix rocketmq-proto version

Signed-off-by: Li Zhanhui <[email protected]>

* Change pollTime to timeRemaining

* fix test

Signed-off-by: Li Zhanhui <[email protected]>
Co-authored-by: Li Zhanhui <[email protected]>
  • Loading branch information
drpmma and lizhanhui authored Dec 30, 2022
1 parent 59dfe8d commit 83c4a8f
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 41 deletions.
2 changes: 1 addition & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ maven_install(
"org.bouncycastle:bcpkix-jdk15on:1.69",
"com.google.code.gson:gson:2.8.9",
"com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2",
"org.apache.rocketmq:rocketmq-proto:2.0.1",
"org.apache.rocketmq:rocketmq-proto:2.0.2",
"com.google.protobuf:protobuf-java:3.20.1",
"com.google.protobuf:protobuf-java-util:3.20.1",
"com.conversantmedia:disruptor:1.2.10",
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
<annotations-api.version>6.0.53</annotations-api.version>
<extra-enforcer-rules.version>1.0-beta-4</extra-enforcer-rules.version>
<concurrentlinkedhashmap-lru.version>1.4.2</concurrentlinkedhashmap-lru.version>
<rocketmq-proto.version>2.0.1</rocketmq-proto.version>
<rocketmq-proto.version>2.0.2</rocketmq-proto.version>
<grpc.version>1.50.0</grpc.version>
<protobuf.version>3.20.1</protobuf.version>
<disruptor.version>1.2.10</disruptor.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public class ProxyConfig implements ConfigFile {
private long grpcClientProducerBackoffInitialMillis = 10;
private long grpcClientProducerBackoffMaxMillis = 1000;
private int grpcClientProducerBackoffMultiplier = 2;
private long grpcClientConsumerLongPollingTimeoutMillis = Duration.ofSeconds(30).toMillis();
private long grpcClientConsumerMinLongPollingTimeoutMillis = Duration.ofSeconds(5).toMillis();
private long grpcClientConsumerMaxLongPollingTimeoutMillis = Duration.ofSeconds(20).toMillis();
private int grpcClientConsumerLongPollingBatchSize = 32;
private long grpcClientIdleTimeMills = Duration.ofSeconds(120).toMillis();

Expand Down Expand Up @@ -598,12 +599,20 @@ public void setGrpcClientProducerBackoffMultiplier(int grpcClientProducerBackoff
this.grpcClientProducerBackoffMultiplier = grpcClientProducerBackoffMultiplier;
}

public long getGrpcClientConsumerLongPollingTimeoutMillis() {
return grpcClientConsumerLongPollingTimeoutMillis;
public long getGrpcClientConsumerMinLongPollingTimeoutMillis() {
return grpcClientConsumerMinLongPollingTimeoutMillis;
}

public void setGrpcClientConsumerLongPollingTimeoutMillis(long grpcClientConsumerLongPollingTimeoutMillis) {
this.grpcClientConsumerLongPollingTimeoutMillis = grpcClientConsumerLongPollingTimeoutMillis;
public void setGrpcClientConsumerMinLongPollingTimeoutMillis(long grpcClientConsumerMinLongPollingTimeoutMillis) {
this.grpcClientConsumerMinLongPollingTimeoutMillis = grpcClientConsumerMinLongPollingTimeoutMillis;
}

public long getGrpcClientConsumerMaxLongPollingTimeoutMillis() {
return grpcClientConsumerMaxLongPollingTimeoutMillis;
}

public void setGrpcClientConsumerMaxLongPollingTimeoutMillis(long grpcClientConsumerMaxLongPollingTimeoutMillis) {
this.grpcClientConsumerMaxLongPollingTimeoutMillis = grpcClientConsumerMaxLongPollingTimeoutMillis;
}

public int getGrpcClientConsumerLongPollingBatchSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ protected static Settings mergeSubscriptionData(Settings settings, SubscriptionG

resultSettingsBuilder.getSubscriptionBuilder()
.setReceiveBatchSize(config.getGrpcClientConsumerLongPollingBatchSize())
.setLongPollingTimeout(Durations.fromMillis(config.getGrpcClientConsumerLongPollingTimeoutMillis()))
.setLongPollingTimeout(Durations.fromMillis(config.getGrpcClientConsumerMaxLongPollingTimeoutMillis()))
.setFifo(groupConfig.isConsumeMessageOrderly());

resultSettingsBuilder.getBackoffPolicyBuilder().setMaxAttempts(groupConfig.getRetryMaxTimes() + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,27 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request,
ProxyConfig config = ConfigurationManager.getProxyConfig();

Long timeRemaining = ctx.getRemainingMs();
long pollTime = timeRemaining - Durations.toMillis(settings.getRequestTimeout()) / 2;
if (pollTime < 0) {
pollTime = 0;
long pollingTime;
if (request.hasLongPollingTimeout()) {
pollingTime = Durations.toMillis(request.getLongPollingTimeout());
} else {
pollingTime = timeRemaining - Durations.toMillis(settings.getRequestTimeout()) / 2;
}
if (pollingTime < config.getGrpcClientConsumerMinLongPollingTimeoutMillis()) {
pollingTime = config.getGrpcClientConsumerMinLongPollingTimeoutMillis();
}
if (pollingTime > config.getGrpcClientConsumerMaxLongPollingTimeoutMillis()) {
pollingTime = config.getGrpcClientConsumerMaxLongPollingTimeoutMillis();
}
if (pollTime > config.getGrpcClientConsumerLongPollingTimeoutMillis()) {
pollTime = config.getGrpcClientConsumerLongPollingTimeoutMillis();

if (pollingTime > timeRemaining) {
if (timeRemaining >= config.getGrpcClientConsumerMinLongPollingTimeoutMillis()) {
pollingTime = timeRemaining;
} else {
writer.writeAndComplete(ctx, Code.ILLEGAL_POLLING_TIME, "The deadline time remaining is not enough" +
" for polling, please check network condition");
return;
}
}

validateTopicAndConsumerGroup(request.getMessageQueue().getTopic(), request.getGroup());
Expand All @@ -100,37 +115,37 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request,
}

this.messagingProcessor.popMessage(
ctx,
new ReceiveMessageQueueSelector(
request.getMessageQueue().getBroker().getName()
),
group,
topic,
request.getBatchSize(),
actualInvisibleTime,
pollTime,
ConsumeInitMode.MAX,
subscriptionData,
fifo,
new PopMessageResultFilterImpl(maxAttempts),
timeRemaining
).thenAccept(popResult -> {
if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) {
if (PopStatus.FOUND.equals(popResult.getPopStatus())) {
List<MessageExt> messageExtList = popResult.getMsgFoundList();
for (MessageExt messageExt : messageExtList) {
String receiptHandle = messageExt.getProperty(MessageConst.PROPERTY_POP_CK);
if (receiptHandle != null) {
MessageReceiptHandle messageReceiptHandle =
new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
messageExt.getQueueOffset(), messageExt.getReconsumeTimes());
receiptHandleProcessor.addReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()), group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
ctx,
new ReceiveMessageQueueSelector(
request.getMessageQueue().getBroker().getName()
),
group,
topic,
request.getBatchSize(),
actualInvisibleTime,
pollingTime,
ConsumeInitMode.MAX,
subscriptionData,
fifo,
new PopMessageResultFilterImpl(maxAttempts),
timeRemaining
).thenAccept(popResult -> {
if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) {
if (PopStatus.FOUND.equals(popResult.getPopStatus())) {
List<MessageExt> messageExtList = popResult.getMsgFoundList();
for (MessageExt messageExt : messageExtList) {
String receiptHandle = messageExt.getProperty(MessageConst.PROPERTY_POP_CK);
if (receiptHandle != null) {
MessageReceiptHandle messageReceiptHandle =
new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
messageExt.getQueueOffset(), messageExt.getReconsumeTimes());
receiptHandleProcessor.addReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()), group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
}
}
}
}
}
writer.writeAndComplete(ctx, request, popResult);
})
writer.writeAndComplete(ctx, request, popResult);
})
.exceptionally(t -> {
writer.writeAndComplete(ctx, request, t);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
@Before
public void before() throws Throwable {
super.before();
ConfigurationManager.getProxyConfig().setGrpcClientConsumerMinLongPollingTimeoutMillis(0);
this.receiveMessageActivity = new ReceiveMessageActivity(messagingProcessor, receiptHandleProcessor,
grpcClientSettingsManager, grpcChannelManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public void setUp() throws Exception {
ConfigurationManager.getProxyConfig().setRocketMQClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());
ConfigurationManager.getProxyConfig().setHeartbeatSyncerTopicClusterName(brokerController1.getBrokerConfig().getBrokerClusterName());
ConfigurationManager.getProxyConfig().setMinInvisibleTimeMillsForRecv(3);
ConfigurationManager.getProxyConfig().setGrpcClientConsumerMinLongPollingTimeoutMillis(0);
}

protected MessagingServiceGrpc.MessagingServiceStub createStub(Channel channel) {
Expand Down

0 comments on commit 83c4a8f

Please sign in to comment.