Skip to content

Commit

Permalink
[improve][broker] Avoid record inactiveproducers when deduplication i…
Browse files Browse the repository at this point in the history
…s disable. (#21193)

Co-authored-by: Jiwe Guo <[email protected]>
  • Loading branch information
lifepuzzlefun and Technoboy- authored Dec 21, 2023
1 parent b944f10 commit 8beac8b
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,10 @@ private boolean isDeduplicationEnabled() {
* Topic will call this method whenever a producer connects.
*/
public void producerAdded(String producerName) {
if (!isEnabled()) {
return;
}

// Producer is no-longer inactive
inactiveProducers.remove(producerName);
}
Expand All @@ -482,6 +486,10 @@ public void producerAdded(String producerName) {
* Topic will call this method whenever a producer disconnects.
*/
public void producerRemoved(String producerName) {
if (!isEnabled()) {
return;
}

// Producer is no-longer active
inactiveProducers.put(producerName, System.currentTimeMillis());
}
Expand All @@ -493,6 +501,14 @@ public synchronized void purgeInactiveProducers() {
long minimumActiveTimestamp = System.currentTimeMillis() - TimeUnit.MINUTES
.toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());

// if not enabled just clear all inactive producer record.
if (!isEnabled()) {
if (!inactiveProducers.isEmpty()) {
inactiveProducers.clear();
}
return;
}

Iterator<Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator();
boolean hasInactive = false;
while (mapIterator.hasNext()) {
Expand Down Expand Up @@ -545,5 +561,10 @@ ManagedCursor getManagedCursor() {
return managedCursor;
}

@VisibleForTesting
Map<String, Long> getInactiveProducers() {
return inactiveProducers;
}

private static final Logger log = LoggerFactory.getLogger(MessageDeduplication.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import java.lang.reflect.Field;
Expand All @@ -47,17 +48,24 @@
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class MessageDuplicationTest {
public class MessageDuplicationTest extends BrokerTestBase {

private static final int BROKER_DEDUPLICATION_ENTRIES_INTERVAL = 10;
private static final int BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS = 10;
Expand Down Expand Up @@ -443,4 +451,43 @@ public void completed(Exception e, long ledgerId, long entryId) {
}
});
}

@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
this.conf.setBrokerDeduplicationEnabled(true);
super.baseSetup();
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testMessageDeduplication() throws Exception {
String topicName = "persistent://prop/ns-abc/testMessageDeduplication";
String producerName = "test-producer";
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING)
.producerName(producerName)
.topic(topicName)
.create();
final PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
.getTopicIfExists(topicName).get().orElse(null);
assertNotNull(persistentTopic);
final MessageDeduplication messageDeduplication = persistentTopic.getMessageDeduplication();
assertFalse(messageDeduplication.getInactiveProducers().containsKey(producerName));
producer.close();
Awaitility.await().untilAsserted(() -> assertTrue(messageDeduplication.getInactiveProducers().containsKey(producerName)));
admin.topicPolicies().setDeduplicationStatus(topicName, false);
Awaitility.await().untilAsserted(() -> {
final Boolean deduplicationStatus = admin.topicPolicies().getDeduplicationStatus(topicName);
Assert.assertNotNull(deduplicationStatus);
Assert.assertFalse(deduplicationStatus);
});
messageDeduplication.purgeInactiveProducers();
assertTrue(messageDeduplication.getInactiveProducers().isEmpty());
}
}

0 comments on commit 8beac8b

Please sign in to comment.