Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use proper hash key collision detection scheme. #85

Merged
merged 4 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion calculate_average_ebarlas.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@

sdk use java 21.0.1-graalce
JAVA_OPTS=""
time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_ebarlas measurements.txt 16
time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_ebarlas measurements.txt 8
97 changes: 67 additions & 30 deletions src/main/java/dev/morling/onebrc/CalculateAverage_ebarlas.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.List;
import java.util.TreeMap;

public class CalculateAverage_ebarlas {

private static final int HASH_FACTOR = 278;
private static final int HASH_MOD = 3_487;
private static final int MAX_KEY_SIZE = 100;
private static final int HASH_FACTOR = 433;
private static final int HASH_TBL_SIZE = 16_383; // range of allowed hash values, inclusive

public static void main(String[] args) throws IOException, InterruptedException {
if (args.length != 2) {
Expand Down Expand Up @@ -92,16 +94,11 @@ private static Stats[] foldStats(List<Partition> partitions) { // fold stats fro
var current = partitions.get(i).stats;
for (int j = 0; j < current.length; j++) {
if (current[j] != null) {
var t = target[j];
if (t == null) {
target[j] = current[j]; // copy ref from current to target
}
else {
t.min = Math.min(t.min, current[j].min);
t.max = Math.max(t.max, current[j].max);
t.sum += current[j].sum;
t.count += current[j].count;
}
var t = findInTable(target, current[j].hash, current[j].key, current[j].key.length);
t.min = Math.min(t.min, current[j].min);
t.max = Math.max(t.max, current[j].max);
t.sum += current[j].sum;
t.count += current[j].count;
}
}
}
Expand All @@ -114,7 +111,12 @@ private static void foldFootersAndHeaders(List<Partition> partitions) { // fold
var pPrev = partitions.get(i - 1);
var merged = mergeFooterAndHeader(pPrev.footer, pNext.header);
if (merged != null) {
doProcessBuffer(ByteBuffer.wrap(merged), true, pPrev.stats); // fold into prev partition
if (merged[merged.length - 1] == '\n') { // fold into prev partition
doProcessBuffer(ByteBuffer.wrap(merged), true, pPrev.stats);
}
else { // no newline appeared in partition, carry forward
pNext.footer = merged;
}
}
}
}
Expand All @@ -133,33 +135,37 @@ private static byte[] mergeFooterAndHeader(byte[] footer, byte[] header) {
}

private static Partition processBuffer(ByteBuffer buffer, boolean first) {
return doProcessBuffer(buffer, first, new Stats[HASH_MOD * 2]);
return doProcessBuffer(buffer, first, new Stats[HASH_TBL_SIZE + 1]);
}

private static Partition doProcessBuffer(ByteBuffer buffer, boolean first, Stats[] stats) {
var readingKey = true;
var keyHash = 0;
var keyStart = 0;
var negative = false;
var val = 0;
var header = first ? null : readHeader(buffer);
var readingKey = true; // reading key or value?
var keyBuf = new byte[MAX_KEY_SIZE]; // buffer for key
var keyPos = 0; // current position in key buffer
var keyHash = 0; // accumulating hash of key
var keyStart = buffer.position(); // start of key in buffer used for footer calc
var negative = false; // is value negative?
var val = 0; // accumulating value
Stats st = null;
while (buffer.hasRemaining()) {
var b = buffer.get();
if (readingKey) {
if (b == ';') {
var idx = HASH_MOD + keyHash % HASH_MOD;
if (b != ';') {
keyHash = HASH_FACTOR * keyHash + b;
keyBuf[keyPos++] = b;
}
else {
var idx = keyHash & HASH_TBL_SIZE;
st = stats[idx];
if (st == null) {
var key = new byte[buffer.position() - keyStart - 1];
buffer.get(keyStart, key, 0, key.length);
st = stats[idx] = new Stats(key);
if (st == null) { // nothing in table, eagerly claim spot
st = stats[idx] = newStats(keyBuf, keyPos, keyHash);
}
else if (!Arrays.equals(st.key, 0, st.key.length, keyBuf, 0, keyPos)) {
st = findInTable(stats, keyHash, keyBuf, keyPos);
}
readingKey = false;
}
else {
keyHash = HASH_FACTOR * keyHash + b;
}
}
else {
if (b == '\n') {
Expand All @@ -173,6 +179,7 @@ private static Partition doProcessBuffer(ByteBuffer buffer, boolean first, Stats
val = 0;
negative = false;
keyStart = buffer.position();
keyPos = 0;
}
else if (b == '-') {
negative = true;
Expand All @@ -186,6 +193,25 @@ else if (b != '.') { // skip '.' since fractional tenth unit after decimal point
return new Partition(header, footer, stats);
}

private static Stats findInTable(Stats[] stats, int hash, byte[] key, int len) { // open-addressing scan
var idx = hash & HASH_TBL_SIZE;
var st = stats[idx];
while (st != null && !Arrays.equals(st.key, 0, st.key.length, key, 0, len)) {
idx = (idx + 1) % (HASH_TBL_SIZE + 1);
st = stats[idx];
}
if (st != null) {
return st;
}
return stats[idx] = newStats(key, len, hash);
}

private static Stats newStats(byte[] buffer, int len, int hash) {
var k = new byte[len];
System.arraycopy(buffer, 0, k, 0, len);
return new Stats(k, hash);
}

private static byte[] readFooter(ByteBuffer buffer, int lineStart) { // read from line start to current pos (end-of-input)
var footer = new byte[buffer.position() - lineStart];
buffer.get(lineStart, footer, 0, footer.length);
Expand All @@ -200,18 +226,29 @@ private static byte[] readHeader(ByteBuffer buffer) { // read up to and includin
return header;
}

record Partition(byte[] header, byte[] footer, Stats[] stats) {
private static class Partition {
byte[] header;
byte[] footer;
Stats[] stats;

Partition(byte[] header, byte[] footer, Stats[] stats) {
this.header = header;
this.footer = footer;
this.stats = stats;
}
}

private static class Stats { // min, max, and sum values are modeled with integral types that represent tenths of a unit
final byte[] key;
final int hash;
int min = Integer.MAX_VALUE;
int max = Integer.MIN_VALUE;
long sum;
long count;

Stats(byte[] key) {
Stats(byte[] key, int hash) {
this.key = key;
this.hash = hash;
}
}
}
Loading