Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
Avoid payload copy when inserting into managed ledger cache (#4197)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored May 3, 2019
1 parent 0dac8d1 commit e9619fa
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 15 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ managedLedgerNumSchedulerThreads=8
# running in the same broker. By default, uses 1/5th of available direct memory
managedLedgerCacheSizeMB=

# Whether we should make a copy of the entry payloads when inserting in cache
managedLedgerCacheCopyEntries=false

# Threshold to which bring down the cache level when eviction is triggered
managedLedgerCacheEvictionWatermark=0.9

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ managedLedgerNumSchedulerThreads=4
# running in the same broker. By default, uses 1/5th of available direct memory
managedLedgerCacheSizeMB=

# Whether we should make a copy of the entry payloads when inserting in cache
managedLedgerCacheCopyEntries=false

# Threshold to which bring down the cache level when eviction is triggered
managedLedgerCacheEvictionWatermark=0.9

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ public class ManagedLedgerFactoryConfig {
* Threshould to consider a cursor as "backlogged"
*/
private long thresholdBackloggedCursor = 1000;

/**
* Whether we should make a copy of the entry payloads when inserting in cache
*/
private boolean copyEntriesInCache = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@ public class EntryCacheImpl implements EntryCache {
private final EntryCacheManager manager;
private final ManagedLedgerImpl ml;
private final RangeCache<PositionImpl, EntryImpl> entries;
private final boolean copyEntries;

private static final double MB = 1024 * 1024;

public EntryCacheImpl(EntryCacheManager manager, ManagedLedgerImpl ml) {
public EntryCacheImpl(EntryCacheManager manager, ManagedLedgerImpl ml, boolean copyEntries) {
this.manager = manager;
this.ml = ml;
this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp);
this.copyEntries = copyEntries;

if (log.isDebugEnabled()) {
log.debug("[{}] Initialized managed-ledger entry cache", ml.getName());
Expand Down Expand Up @@ -95,6 +97,31 @@ public boolean insert(EntryImpl entry) {
entry.getLength());
}

ByteBuf cachedData = null;
if (copyEntries) {
cachedData = copyEntry(entry);
if (cachedData == null) {
return false;
}
} else {
// Use retain here to have the same counter increase as in the copy entry scenario
cachedData = entry.getDataBuffer().retain();
}

PositionImpl position = entry.getPosition();
EntryImpl cacheEntry = EntryImpl.create(position, cachedData);
cachedData.release();
if (entries.put(position, cacheEntry)) {
manager.entryAdded(entry.getLength());
return true;
} else {
// entry was not inserted into cache, we need to discard it
cacheEntry.release();
return false;
}
}

private ByteBuf copyEntry(EntryImpl entry) {
// Copy the entry into a buffer owned by the cache. The reason is that the incoming entry is retaining a buffer
// from netty, usually allocated in 64Kb chunks. So if we just retain the entry without copying it, we might
// retain actually the full 64Kb even for a small entry
Expand All @@ -103,8 +130,8 @@ public boolean insert(EntryImpl entry) {
try {
cachedData = ALLOCATOR.directBuffer(size, size);
} catch (Throwable t) {
log.warn("[{}] Failed to allocate buffer for entry cache: {}", ml.getName(), t.getMessage(), t);
return false;
log.warn("[{}] Failed to allocate buffer for entry cache: {}", ml.getName(), t.getMessage());
return null;
}

if (size > 0) {
Expand All @@ -114,17 +141,7 @@ public boolean insert(EntryImpl entry) {
entryBuf.readerIndex(readerIdx);
}

PositionImpl position = entry.getPosition();
EntryImpl cacheEntry = EntryImpl.create(position, cachedData);
cachedData.release();
if (entries.put(position, cacheEntry)) {
manager.entryAdded(entry.getLength());
return true;
} else {
// entry was not inserted into cache, we need to discard it
cacheEntry.release();
return false;
}
return cachedData;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public EntryCache getEntryCache(ManagedLedgerImpl ml) {
return new EntryCacheDisabled(ml);
}

EntryCache newEntryCache = new EntryCacheImpl(this, ml);
EntryCache newEntryCache = new EntryCacheImpl(this, ml, mlFactory.getConfig().isCopyEntriesInCache());
EntryCache currentEntryCache = caches.putIfAbsent(ml.getName(), newEntryCache);
if (currentEntryCache != null) {
return currentEntryCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " running in the same broker. By default, uses 1/5th of available direct memory")
private int managedLedgerCacheSizeMB = Math.max(64,
(int) (PlatformDependent.maxDirectMemory() / 5 / (1024 * 1024)));
@FieldContext(category = CATEGORY_STORAGE_ML, doc = "Whether we should make a copy of the entry payloads when inserting in cache")
private boolean managedLedgerCacheCopyEntries = false;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Threshold to which bring down the cache level when eviction is triggered"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public ManagedLedgerClientFactory(ServiceConfiguration conf, ZooKeeper zkClient,
managedLedgerFactoryConfig.setCacheEvictionFrequency(conf.getManagedLedgerCacheEvictionFrequency());
managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis(conf.getManagedLedgerCacheEvictionTimeThresholdMillis());
managedLedgerFactoryConfig.setThresholdBackloggedCursor(conf.getManagedLedgerCursorBackloggedThreshold());
managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());

this.managedLedgerFactory = new ManagedLedgerFactoryImpl(bkClient, zkClient, managedLedgerFactoryConfig);
}
Expand Down
1 change: 1 addition & 0 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|managedLedgerDefaultWriteQuorum| Number of copies to store for each message |2|
|managedLedgerDefaultAckQuorum| Number of guaranteed copies (acks to wait before write is complete) |2|
|managedLedgerCacheSizeMB| Amount of memory to use for caching data payload in managed ledger. This memory is allocated from JVM direct memory and it’s shared across all the topics running in the same broker. By default, uses 1/5th of available direct memory ||
|managedLedgerCacheCopyEntries| Whether we should make a copy of the entry payloads when inserting in cache| false|
|managedLedgerCacheEvictionWatermark| Threshold to which bring down the cache level when eviction is triggered |0.9|
|managedLedgerCacheEvictionFrequency| Configure the cache eviction frequency for the managed ledger cache (evictions/sec) | 100.0 |
|managedLedgerCacheEvictionTimeThresholdMillis| All entries that have stayed in cache for more than the configured time, will be evicted | 1000 |
Expand Down

0 comments on commit e9619fa

Please sign in to comment.