From e9619fa597bcb099da79ff9857a854bca86a63db Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 3 May 2019 10:09:40 -0700 Subject: [PATCH] Avoid payload copy when inserting into managed ledger cache (#4197) --- conf/broker.conf | 3 ++ conf/standalone.conf | 3 ++ .../mledger/ManagedLedgerFactoryConfig.java | 5 +++ .../mledger/impl/EntryCacheImpl.java | 45 +++++++++++++------ .../mledger/impl/EntryCacheManager.java | 2 +- .../pulsar/broker/ServiceConfiguration.java | 2 + .../broker/ManagedLedgerClientFactory.java | 1 + site2/docs/reference-configuration.md | 1 + 8 files changed, 47 insertions(+), 15 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index fb2cb1e36e04b..4ae8522044a79 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -437,6 +437,9 @@ managedLedgerNumSchedulerThreads=8 # running in the same broker. By default, uses 1/5th of available direct memory managedLedgerCacheSizeMB= +# Whether we should make a copy of the entry payloads when inserting in cache +managedLedgerCacheCopyEntries=false + # Threshold to which bring down the cache level when eviction is triggered managedLedgerCacheEvictionWatermark=0.9 diff --git a/conf/standalone.conf b/conf/standalone.conf index b8f13c51a0489..7187a95ec1e66 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -298,6 +298,9 @@ managedLedgerNumSchedulerThreads=4 # running in the same broker. By default, uses 1/5th of available direct memory managedLedgerCacheSizeMB= +# Whether we should make a copy of the entry payloads when inserting in cache +managedLedgerCacheCopyEntries=false + # Threshold to which bring down the cache level when eviction is triggered managedLedgerCacheEvictionWatermark=0.9 diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java index 40b815df76613..b5d0a7c23610b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java @@ -51,4 +51,9 @@ public class ManagedLedgerFactoryConfig { * Threshould to consider a cursor as "backlogged" */ private long thresholdBackloggedCursor = 1000; + + /** + * Whether we should make a copy of the entry payloads when inserting in cache + */ + private boolean copyEntriesInCache = false; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java index 053e9002cc977..c08fb9c1d4f5f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java @@ -51,13 +51,15 @@ public class EntryCacheImpl implements EntryCache { private final EntryCacheManager manager; private final ManagedLedgerImpl ml; private final RangeCache entries; + private final boolean copyEntries; private static final double MB = 1024 * 1024; - public EntryCacheImpl(EntryCacheManager manager, ManagedLedgerImpl ml) { + public EntryCacheImpl(EntryCacheManager manager, ManagedLedgerImpl ml, boolean copyEntries) { this.manager = manager; this.ml = ml; this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp); + this.copyEntries = copyEntries; if (log.isDebugEnabled()) { log.debug("[{}] Initialized managed-ledger entry cache", ml.getName()); @@ -95,6 +97,31 @@ public boolean insert(EntryImpl entry) { entry.getLength()); } + ByteBuf cachedData = null; + if (copyEntries) { + cachedData = copyEntry(entry); + if (cachedData == null) { + return false; + } + } else { + // Use retain here to have the same counter increase as in the copy entry scenario + cachedData = entry.getDataBuffer().retain(); + } + + PositionImpl position = entry.getPosition(); + EntryImpl cacheEntry = EntryImpl.create(position, cachedData); + cachedData.release(); + if (entries.put(position, cacheEntry)) { + manager.entryAdded(entry.getLength()); + return true; + } else { + // entry was not inserted into cache, we need to discard it + cacheEntry.release(); + return false; + } + } + + private ByteBuf copyEntry(EntryImpl entry) { // Copy the entry into a buffer owned by the cache. The reason is that the incoming entry is retaining a buffer // from netty, usually allocated in 64Kb chunks. So if we just retain the entry without copying it, we might // retain actually the full 64Kb even for a small entry @@ -103,8 +130,8 @@ public boolean insert(EntryImpl entry) { try { cachedData = ALLOCATOR.directBuffer(size, size); } catch (Throwable t) { - log.warn("[{}] Failed to allocate buffer for entry cache: {}", ml.getName(), t.getMessage(), t); - return false; + log.warn("[{}] Failed to allocate buffer for entry cache: {}", ml.getName(), t.getMessage()); + return null; } if (size > 0) { @@ -114,17 +141,7 @@ public boolean insert(EntryImpl entry) { entryBuf.readerIndex(readerIdx); } - PositionImpl position = entry.getPosition(); - EntryImpl cacheEntry = EntryImpl.create(position, cachedData); - cachedData.release(); - if (entries.put(position, cacheEntry)) { - manager.entryAdded(entry.getLength()); - return true; - } else { - // entry was not inserted into cache, we need to discard it - cacheEntry.release(); - return false; - } + return cachedData; } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java index 526796e3c26bd..2bcf111b05bc3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java @@ -81,7 +81,7 @@ public EntryCache getEntryCache(ManagedLedgerImpl ml) { return new EntryCacheDisabled(ml); } - EntryCache newEntryCache = new EntryCacheImpl(this, ml); + EntryCache newEntryCache = new EntryCacheImpl(this, ml, mlFactory.getConfig().isCopyEntriesInCache()); EntryCache currentEntryCache = caches.putIfAbsent(ml.getName(), newEntryCache); if (currentEntryCache != null) { return currentEntryCache; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 3825a86b45fc1..5428d557a90e5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -763,6 +763,8 @@ public class ServiceConfiguration implements PulsarConfiguration { + " running in the same broker. By default, uses 1/5th of available direct memory") private int managedLedgerCacheSizeMB = Math.max(64, (int) (PlatformDependent.maxDirectMemory() / 5 / (1024 * 1024))); + @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Whether we should make a copy of the entry payloads when inserting in cache") + private boolean managedLedgerCacheCopyEntries = false; @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Threshold to which bring down the cache level when eviction is triggered" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index e3a52d8d7d5fe..1f401d5671b9e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -50,6 +50,7 @@ public ManagedLedgerClientFactory(ServiceConfiguration conf, ZooKeeper zkClient, managedLedgerFactoryConfig.setCacheEvictionFrequency(conf.getManagedLedgerCacheEvictionFrequency()); managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis(conf.getManagedLedgerCacheEvictionTimeThresholdMillis()); managedLedgerFactoryConfig.setThresholdBackloggedCursor(conf.getManagedLedgerCursorBackloggedThreshold()); + managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries()); this.managedLedgerFactory = new ManagedLedgerFactoryImpl(bkClient, zkClient, managedLedgerFactoryConfig); } diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index 1f828adc17a12..a783f3a75442f 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -175,6 +175,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di |managedLedgerDefaultWriteQuorum| Number of copies to store for each message |2| |managedLedgerDefaultAckQuorum| Number of guaranteed copies (acks to wait before write is complete) |2| |managedLedgerCacheSizeMB| Amount of memory to use for caching data payload in managed ledger. This memory is allocated from JVM direct memory and it’s shared across all the topics running in the same broker. By default, uses 1/5th of available direct memory || +|managedLedgerCacheCopyEntries| Whether we should make a copy of the entry payloads when inserting in cache| false| |managedLedgerCacheEvictionWatermark| Threshold to which bring down the cache level when eviction is triggered |0.9| |managedLedgerCacheEvictionFrequency| Configure the cache eviction frequency for the managed ledger cache (evictions/sec) | 100.0 | |managedLedgerCacheEvictionTimeThresholdMillis| All entries that have stayed in cache for more than the configured time, will be evicted | 1000 |