Skip to content

Commit

Permalink
Entry Array (#162)
Browse files Browse the repository at this point in the history
* Entry Array

* Fix overhead error

* Sorted count from 0 (fail test)

* Sorted count from 0 (pass test)

* Adding EntryArray that encapsulates all functionality required to keep array of entries on top of primitive longs array

* Fixing code review comments

Co-authored-by: Liran Funaro <[email protected]>
  • Loading branch information
sanastas and liran-funaro authored Jun 1, 2021
1 parent 73b7881 commit c2c882e
Show file tree
Hide file tree
Showing 8 changed files with 598 additions and 464 deletions.
139 changes: 86 additions & 53 deletions core/src/main/java/com/yahoo/oak/Chunk.java

Large diffs are not rendered by default.

399 changes: 399 additions & 0 deletions core/src/main/java/com/yahoo/oak/EntryArray.java

Large diffs are not rendered by default.

467 changes: 79 additions & 388 deletions core/src/main/java/com/yahoo/oak/EntrySet.java

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions core/src/main/java/com/yahoo/oak/InternalOakMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private Chunk<K, V> iterateChunks(final Chunk<K, V> inputChunk, K key) {
boolean overwriteExistingValueForMove(ThreadContext ctx, V newVal, Chunk<K, V> c) {
// given old entry index (inside ctx) and new value, while old value is locked,
// allocate new value, new value is going to be locked as well, write the new value
c.writeValue(ctx, newVal, true);
c.allocateValue(ctx, newVal, true);

// in order to connect/overwrite the old entry to point to new value
// we need to publish as in the normal write process
Expand Down Expand Up @@ -396,7 +396,7 @@ private boolean allocateAndLinkEntry(Chunk c, ThreadContext ctx, K key, boolean
// with existing entry scenario, otherwise we can reuse this entry because
// its value is invalid.
c.releaseKey(ctx);
if (!c.isValueRefValid(prevEi)) {
if (!c.isValueRefValidAndNotDeleted(prevEi)) {
return false;
}
// We use an existing entry only if its value reference is invalid
Expand Down Expand Up @@ -462,7 +462,7 @@ V put(K key, V value, OakTransformer<V> transformer) {
}
}

c.writeValue(ctx, value, false); // write value in place
c.allocateValue(ctx, value, false); // write value in place

if (!c.publish()) {
c.releaseNewValue(ctx);
Expand Down Expand Up @@ -522,7 +522,7 @@ Result putIfAbsent(K key, V value, OakTransformer<V> transformer) {
}
}

c.writeValue(ctx, value, false); // write value in place
c.allocateValue(ctx, value, false); // write value in place

if (!c.publish()) {
c.releaseNewValue(ctx);
Expand Down Expand Up @@ -582,7 +582,7 @@ boolean putIfAbsentComputeIfPresent(K key, V value, Consumer<OakScopedWriteBuffe
}
}

c.writeValue(ctx, value, false); // write value in place
c.allocateValue(ctx, value, false); // write value in place

if (!c.publish()) {
c.releaseNewValue(ctx);
Expand Down Expand Up @@ -661,9 +661,9 @@ Result remove(K key, V oldValue, OakTransformer<V> transformer) {

// AT THIS POINT value was marked deleted off-heap by this thread,
// continue to set the entry's value reference as deleted
assert ctx.entryIndex > 0;
assert ctx.entryIndex != EntrySet.INVALID_ENTRY_INDEX;
assert ctx.isValueValid();
ctx.valueState = EntrySet.ValueState.DELETED_NOT_FINALIZED;
ctx.entryState = EntryArray.EntryState.DELETED_NOT_FINALIZED;
finalizeDeletion(c, ctx); // includes publish/unpublish
return transformer == null ? ctx.result.withFlag(logicallyDeleted) : ctx.result.withValue(v);
}
Expand Down Expand Up @@ -1119,7 +1119,7 @@ void advance(boolean needsValue) {

if (needsValue) {
// Set value references and checks for value validity.
// if value is deleted ctx.valueState is going to be invalid
// if value is deleted ctx.entryState is going to be invalid
c.readValue(ctx);
validState = ctx.isValueValid();
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/com/yahoo/oak/Rebalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ boolean createNewChunks(ThreadContext ctx) {
ValueBuffer valueBuff = ctx.tempValue;

while (true) {
ei = currNewChunk.copyPartNoKeys(valueBuff, currFrozen, ei, entriesLowThreshold);
ei = currNewChunk.copyPartOfEntries(valueBuff, currFrozen, ei, entriesLowThreshold);
// if completed reading curr frozen chunk
if (ei == Chunk.NONE_NEXT) {
if (!iterFrozen.hasNext()) {
Expand Down Expand Up @@ -185,12 +185,12 @@ private void completeCopy(ValueBuffer tempValue, Chunk<K, V> dest, final int ei,
Iterator<Chunk<K, V>> iter = srcChunks.iterator();

Chunk<K, V> src = iter.next();
dest.copyPartNoKeys(tempValue, src, ei, maxItems);
dest.copyPartOfEntries(tempValue, src, ei, maxItems);

while (iter.hasNext()) {
Chunk<K, V> curSrc = iter.next();
int curEntryIndex = src.getFirstItemEntryIndex();
dest.copyPartNoKeys(tempValue, curSrc, curEntryIndex, maxItems);
dest.copyPartOfEntries(tempValue, curSrc, curEntryIndex, maxItems);
}
}

Expand Down
12 changes: 6 additions & 6 deletions core/src/main/java/com/yahoo/oak/ThreadContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class ThreadContext {
final KeyBuffer key;

/* The state of the value */
EntrySet.ValueState valueState;
EntryArray.EntryState entryState;

/* value is used for easier access to the off-heap memory */
final ValueBuffer value;
Expand All @@ -32,7 +32,7 @@ class ThreadContext {
/**
* This parameter encapsulates the allocation information, from when value write started
* and until value write was committed. It should not be used for other purposes, just transferred
* between writeValueStart (return parameter) to writeValueCommit (input parameter)
* between allocateValue (return parameter) to writeValueCommit (input parameter)
*/
final ValueBuffer newValue;

Expand All @@ -57,7 +57,7 @@ class ThreadContext {

ThreadContext(MemoryManager kmm, MemoryManager vmm) {
entryIndex = EntrySet.INVALID_ENTRY_INDEX;
valueState = EntrySet.ValueState.UNKNOWN;
entryState = EntryArray.EntryState.UNKNOWN;
isNewValueForMove = false;

this.key = new KeyBuffer(kmm.getEmptySlice());
Expand All @@ -74,7 +74,7 @@ void invalidate() {
value.invalidate();
newValue.invalidate();
result.invalidate();
valueState = EntrySet.ValueState.UNKNOWN;
entryState = EntryArray.EntryState.UNKNOWN;
isNewValueForMove = false;
// No need to invalidate the temporary buffers
}
Expand All @@ -100,11 +100,11 @@ boolean isKeyValid() {
}

/**
* See {@code ValueState.isValid()} for more details.
* See {@code EntryState.isValid()} for more details.
*
* @return does the entry have a valid value
*/
boolean isValueValid() {
return valueState.isValid();
return entryState.isValid();
}
}
13 changes: 9 additions & 4 deletions core/src/test/java/com/yahoo/oak/MultiThreadRangeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,13 @@ public Void call() throws InterruptedException {
public void testRange() throws ExecutorUtils.ExecutionError {
executor.submitTasks(NUM_THREADS, i -> new MultiThreadRangeTest.RunThreads(latch));

final int numOfItems = 5 * MAX_ITEMS_PER_CHUNK;
final int maxItem = numOfItems * 2;

// fill
Random r = new Random();
for (int i = 5 * MAX_ITEMS_PER_CHUNK; i > 0; ) {
Integer j = r.nextInt(10 * MAX_ITEMS_PER_CHUNK);
for (int i = numOfItems; i > 0; ) {
Integer j = r.nextInt(maxItem);
if (oak.zc().putIfAbsent(j, j)) {
i--;
}
Expand All @@ -84,12 +87,14 @@ public void testRange() throws ExecutorUtils.ExecutionError {
latch.countDown();
executor.shutdown(TIME_LIMIT_IN_SECONDS);

Assert.assertEquals(numOfItems, oak.size());

int size = 0;
for (int i = 0; i < 10 * MAX_ITEMS_PER_CHUNK; i++) {
for (int i = 0; i < maxItem; i++) {
if (oak.get(i) != null) {
size++;
}
}
Assert.assertEquals(5 * MAX_ITEMS_PER_CHUNK, size);
Assert.assertEquals(numOfItems, size);
}
}
10 changes: 8 additions & 2 deletions core/src/test/java/com/yahoo/oak/OverheadTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.yahoo.oak.common.OakCommonBuildersFactory;
import com.yahoo.oak.common.integer.OakIntSerializer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -33,7 +34,7 @@ public void init() {
}

@Test
public void main() {
public void validateOverhead() {
Random r = new Random();
for (int i = 0; i < (int) Math.round(NUM_OF_ENTRIES * 0.5); ) {
Integer key = r.nextInt(NUM_OF_ENTRIES);
Expand All @@ -49,8 +50,13 @@ public void main() {
double usedHeapMemoryMB = (double) (heapSize - heapFreeSize) / M;
double usedOffHeapMemoryMB = (double) (oak.getValuesMemoryManager().allocated()) / M;


double heapOverhead = usedHeapMemoryMB / (usedHeapMemoryMB + usedOffHeapMemoryMB);

System.out.println("Observed On Heap Overhead: " + heapOverhead);
assert heapOverhead < MAX_ON_HEAP_OVERHEAD_PERCENTAGE;
Assert.assertTrue(
"Observed On Heap Overhead: " + heapOverhead,
heapOverhead < MAX_ON_HEAP_OVERHEAD_PERCENTAGE
);
}
}

0 comments on commit c2c882e

Please sign in to comment.