From d3ba8a77e880a5f0c0fd11da71d771a425f8dd56 Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Fri, 5 Jan 2024 15:56:23 +0000 Subject: [PATCH 01/21] Initial chunked impl --- calculate_average_JamalMulla.sh | 5 +- .../onebrc/CalculateAverage_JamalMulla.java | 317 ++++++------------ 2 files changed, 108 insertions(+), 214 deletions(-) diff --git a/calculate_average_JamalMulla.sh b/calculate_average_JamalMulla.sh index 228d56bfb..6ce6afecf 100755 --- a/calculate_average_JamalMulla.sh +++ b/calculate_average_JamalMulla.sh @@ -15,5 +15,6 @@ # limitations under the License. # -JAVA_OPTS="--enable-preview -XX:+UnlockExperimentalVMOptions -XX:+TrustFinalNonStaticFields -XX:+UseTransparentHugePages" -java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_JamalMulla + +JAVA_OPTS="" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_JamalMulla diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index 770588556..073a50eab 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -15,46 +15,29 @@ */ package dev.morling.onebrc; -import sun.misc.Unsafe; - +import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.lang.foreign.Arena; -import java.lang.reflect.Field; +import java.nio.CharBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.util.*; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; public class CalculateAverage_JamalMulla { - private static final Map global = new HashMap<>(); private static final String FILE = "./measurements.txt"; - private static final Unsafe UNSAFE = initUnsafe(); - private static final Lock lock = new ReentrantLock(); - private static final int FNV_32_INIT = 0x811c9dc5; - private static final int FNV_32_PRIME = 0x01000193; - - private static Unsafe initUnsafe() { - try { - Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); - theUnsafe.setAccessible(true); - return (Unsafe) theUnsafe.get(Unsafe.class); - } - catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException(e); - } - } private static final class ResultRow { - private int min; - private int max; - private long sum; - private int count; + private double min; + private double max; - private ResultRow(int v) { + private double sum; + private long count; + + private ResultRow(double v) { this.min = v; this.max = v; this.sum = v; @@ -62,12 +45,42 @@ private ResultRow(int v) { } public String toString() { - return round(min) + "/" + round((double) (sum) / count) + "/" + round(max); + return round(min) + "/" + round(sum / count) + "/" + round(max); } private double round(double value) { - return Math.round(value) / 10.0; + return Math.round(value * 10.0) / 10.0; + } + + public double min() { + return min; + } + + public double mean() { + return sum / count; } + + public double max() { + return max; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (ResultRow) obj; + return Double.doubleToLongBits(this.min) == Double.doubleToLongBits(that.min) && + Double.doubleToLongBits(this.sum) == Double.doubleToLongBits(that.sum) && + Double.doubleToLongBits(this.max) == Double.doubleToLongBits(that.max); + } + + @Override + public int hashCode() { + return Objects.hash(min, sum, max); + } + } private record Chunk(Long start, Long length) { @@ -75,229 +88,109 @@ private record Chunk(Long start, Long length) { static List getChunks(int numThreads, FileChannel channel) throws IOException { // get all chunk boundaries - final long filebytes = channel.size(); - final long roughChunkSize = filebytes / numThreads; - final List chunks = new ArrayList<>(numThreads); - final long mappedAddress = channel.map(FileChannel.MapMode.READ_ONLY, 0, filebytes, Arena.global()).address(); + long filebytes = channel.size(); + long roughChunkSize = filebytes / numThreads; + List chunks = new ArrayList<>(); + long chunkStart = 0; - long chunkLength = Math.min(filebytes - chunkStart - 1, roughChunkSize); - while (chunkStart < filebytes) { + long chunkLength = roughChunkSize; + for (int i = 0; i < numThreads - 1; i++) { // unlikely we need to read more than this many bytes to find the next newline - MappedByteBuffer mbb = channel.map(FileChannel.MapMode.READ_ONLY, chunkStart + chunkLength, - Math.min(Math.min(filebytes - chunkStart - chunkLength, chunkLength), 100)); - + MappedByteBuffer mbb = channel.map(FileChannel.MapMode.READ_ONLY, chunkStart + chunkLength, 100); while (mbb.get() != 0xA /* \n */) { chunkLength++; } - chunks.add(new Chunk(mappedAddress + chunkStart, chunkLength + 1)); + chunks.add(new Chunk(chunkStart, chunkLength + 1)); // to skip the nl in the next chunk chunkStart += chunkLength + 1; - chunkLength = Math.min(filebytes - chunkStart - 1, roughChunkSize); + chunkLength = roughChunkSize; } + // for the last chunk, we can set it to what's left + chunks.add(new Chunk(chunkStart, filebytes - chunkStart)); return chunks; } private static class CalculateTask implements Runnable { - private final SimplerHashMap results; + private final FileChannel channel; + private final Map results; private final Chunk chunk; - public CalculateTask(Chunk chunk) { - this.results = new SimplerHashMap(); + public CalculateTask(FileChannel fileChannel, Map results, Chunk chunk) { + this.channel = fileChannel; + this.results = results; this.chunk = chunk; } @Override public void run() { - // no names bigger than this - final byte[] nameBytes = new byte[100]; - short nameIndex = 0; - int ot; - // fnv hash - int hash = FNV_32_INIT; - - long i = chunk.start; - final long cl = chunk.start + chunk.length; - while (i < cl) { - byte c; - while ((c = UNSAFE.getByte(i++)) != 0x3B /* semi-colon */) { - nameBytes[nameIndex++] = c; - hash ^= c; - hash *= FNV_32_PRIME; + StringBuilder nameSb = new StringBuilder(); + StringBuilder valSb = new StringBuilder(); + boolean inName = true; + MappedByteBuffer mappedByteBuffer; + try { + mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, chunk.start, chunk.length); + } + catch (IOException e) { + throw new RuntimeException(e); + } + CharBuffer cb = StandardCharsets.UTF_8.decode(mappedByteBuffer); + int index = 0; + while (cb.hasRemaining()) { + char c = cb.get(); + if (c == ';') { + // no longer in name + inName = false; } - - // temperature value follows - c = UNSAFE.getByte(i++); - // we know the val has to be between -99.9 and 99.8 - // always with a single fractional digit - // represented as a byte array of either 4 or 5 characters - if (c == 0x2D /* minus sign */) { - // could be either n.x or nn.x - if (UNSAFE.getByte(i + 3) == 0xA) { - ot = (UNSAFE.getByte(i++) - 48) * 10; // char 1 + else if (c == '\n') { + // back to name and reset buffers at end + String name = nameSb.toString(); + double t = Double.parseDouble(valSb.toString()); + if (results.containsKey(name)) { + ResultRow rr = results.get(name); + rr.min = Math.min(rr.min, t); + rr.max = Math.max(rr.max, t); + rr.count++; + rr.sum += t; } else { - ot = (UNSAFE.getByte(i++) - 48) * 100; // char 1 - ot += (UNSAFE.getByte(i++) - 48) * 10; // char 2 + results.put(name, new ResultRow(t)); } - i++; // skip dot - ot += (UNSAFE.getByte(i++) - 48); // char 2 - ot = -ot; + inName = true; + nameSb.setLength(0); + valSb.setLength(0); } - else { - // could be either n.x or nn.x - if (UNSAFE.getByte(i + 2) == 0xA) { - ot = (c - 48) * 10; // char 1 - } - else { - ot = (c - 48) * 100; // char 1 - ot += (UNSAFE.getByte(i++) - 48) * 10; // char 2 - } - i++; // skip dot - ot += (UNSAFE.getByte(i++) - 48); // char 3 + else if (inName) { + nameSb.append(c); } - - i++;// nl - hash &= 65535; - results.putOrMerge(nameBytes, nameIndex, hash, ot); - // reset - nameIndex = 0; - hash = 0x811c9dc5; - } - - // merge results with overall results - List all = results.getAll(); - lock.lock(); - try { - for (MapEntry me : all) { - ResultRow rr; - ResultRow lr = me.row; - if ((rr = global.get(me.key)) != null) { - rr.min = Math.min(rr.min, lr.min); - rr.max = Math.max(rr.max, lr.max); - rr.count += lr.count; - rr.sum += lr.sum; - } - else { - global.put(me.key, lr); - } + else { + valSb.append(c); } - } - finally { - lock.unlock(); + index++; } } } public static void main(String[] args) throws IOException, InterruptedException { - FileChannel channel = new RandomAccessFile(FILE, "r").getChannel(); - int numThreads = 1; - if (channel.size() > 64000) { - numThreads = Runtime.getRuntime().availableProcessors(); - } + Map results = new ConcurrentHashMap<>(); + + RandomAccessFile raFile = new RandomAccessFile(FILE, "r"); + FileChannel channel = raFile.getChannel(); + + int numThreads = 64; List chunks = getChunks(numThreads, channel); List threads = new ArrayList<>(); for (Chunk chunk : chunks) { - Thread thread = new Thread(new CalculateTask(chunk)); - thread.setPriority(Thread.MAX_PRIORITY); - thread.start(); - threads.add(thread); + Thread t = Thread.ofVirtual().name(chunk.toString()).start(new CalculateTask(channel, results, chunk)); + threads.add(t); } + for (Thread t : threads) { t.join(); } - // create treemap just to sort - System.out.println(new TreeMap<>(global)); - } - - record MapEntry(String key, ResultRow row) { - } - - static class SimplerHashMap { - // can't have more than 10000 unique keys but want to match max hash - final int MAPSIZE = 65536; - final ResultRow[] slots = new ResultRow[MAPSIZE]; - final byte[][] keys = new byte[MAPSIZE][]; - - public void putOrMerge(final byte[] key, final short length, final int hash, final int temp) { - int slot = hash; - ResultRow slotValue; - - // Linear probe for open slot - while ((slotValue = slots[slot]) != null && (keys[slot].length != length || !unsafeEquals(keys[slot], key, length))) { - slot++; - } - - // existing - if (slotValue != null) { - slotValue.min = Math.min(slotValue.min, temp); - slotValue.max = Math.max(slotValue.max, temp); - slotValue.sum += temp; - slotValue.count++; - return; - } - - // new value - slots[slot] = new ResultRow(temp); - byte[] bytes = new byte[length]; - System.arraycopy(key, 0, bytes, 0, length); - keys[slot] = bytes; - } - - static boolean unsafeEquals(final byte[] a, final byte[] b, final short length) { - // byte by byte comparisons are slow, so do as big chunks as possible - final int baseOffset = Unsafe.ARRAY_BYTE_BASE_OFFSET; - - short i = 0; - // round down to nearest power of 8 - for (; i < (length & -8); i += 8) { - if (UNSAFE.getLong(a, i + baseOffset) != UNSAFE.getLong(b, i + baseOffset)) { - return false; - } - } - if (i == length) { - return true; - } - // leftover ints - for (; i < (length - i & -4); i += 4) { - if (UNSAFE.getInt(a, i + baseOffset) != UNSAFE.getInt(b, i + baseOffset)) { - return false; - } - } - if (i == length) { - return true; - } - // leftover shorts - for (; i < (length - i & -2); i += 2) { - if (UNSAFE.getShort(a, i + baseOffset) != UNSAFE.getShort(b, i + baseOffset)) { - return false; - } - } - if (i == length) { - return true; - } - // leftover bytes - for (; i < (length - i); i++) { - if (UNSAFE.getByte(a, i + baseOffset) != UNSAFE.getByte(b, i + baseOffset)) { - return false; - } - } - - return true; - } - // Get all pairs - public List getAll() { - final List result = new ArrayList<>(slots.length); - for (int i = 0; i < slots.length; i++) { - ResultRow slotValue = slots[i]; - if (slotValue != null) { - result.add(new MapEntry(new String(keys[i], StandardCharsets.UTF_8), slotValue)); - } - } - return result; - } + // just to sort + System.out.println(new TreeMap<>(results)); } - } From e564255caebf23b54c5d2cda130e4a35f00a12ba Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Fri, 5 Jan 2024 16:39:19 +0000 Subject: [PATCH 02/21] Bytes instead of chars --- .../onebrc/CalculateAverage_JamalMulla.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index 073a50eab..8247f6c02 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -125,8 +125,9 @@ public CalculateTask(FileChannel fileChannel, Map results, Ch @Override public void run() { - StringBuilder nameSb = new StringBuilder(); - StringBuilder valSb = new StringBuilder(); + // no names bigger than this + byte[] nameBytes = new byte[127]; + byte[] valBytes = new byte[127]; boolean inName = true; MappedByteBuffer mappedByteBuffer; try { @@ -135,18 +136,18 @@ public void run() { catch (IOException e) { throw new RuntimeException(e); } - CharBuffer cb = StandardCharsets.UTF_8.decode(mappedByteBuffer); - int index = 0; - while (cb.hasRemaining()) { - char c = cb.get(); - if (c == ';') { + short nameIndex = 0; + short valIndex = 0; + while (mappedByteBuffer.hasRemaining()) { + byte c = mappedByteBuffer.get(); + if (c == 0x3B /* Semicolon */) { // no longer in name inName = false; } - else if (c == '\n') { + else if (c == 0xA /* Newline */) { // back to name and reset buffers at end - String name = nameSb.toString(); - double t = Double.parseDouble(valSb.toString()); + String name = new String(nameBytes, 0, nameIndex, StandardCharsets.UTF_8); + double t = Double.parseDouble(new String(valBytes, 0, valIndex, StandardCharsets.UTF_8)); if (results.containsKey(name)) { ResultRow rr = results.get(name); rr.min = Math.min(rr.min, t); @@ -158,16 +159,15 @@ else if (c == '\n') { results.put(name, new ResultRow(t)); } inName = true; - nameSb.setLength(0); - valSb.setLength(0); + nameIndex = 0; + valIndex = 0; } else if (inName) { - nameSb.append(c); + nameBytes[nameIndex++] = c; } else { - valSb.append(c); + valBytes[valIndex++] = c; } - index++; } } } From 81802e081a7a526939df1e1eeb61da73be19ef3e Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Sat, 6 Jan 2024 17:52:08 +0000 Subject: [PATCH 03/21] Improved number parsing --- .../onebrc/CalculateAverage_JamalMulla.java | 92 ++++++++++++++++--- 1 file changed, 77 insertions(+), 15 deletions(-) diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index 8247f6c02..405a0eb77 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -111,15 +111,26 @@ static List getChunks(int numThreads, FileChannel channel) throws IOExcep return chunks; } + private static int fnv(byte[] bytes, int length) { + int hash = 0x811c9dc5; + for (int i = 0; i < length; i++) { + hash ^= bytes[i]; + hash *= 0x01000193; + } + return hash; + } + private static class CalculateTask implements Runnable { private final FileChannel channel; private final Map results; + private final Map global; private final Chunk chunk; - public CalculateTask(FileChannel fileChannel, Map results, Chunk chunk) { + public CalculateTask(FileChannel fileChannel, Map global, Chunk chunk) { this.channel = fileChannel; - this.results = results; + this.results = new HashMap<>(); + this.global = global; this.chunk = chunk; } @@ -127,7 +138,6 @@ public CalculateTask(FileChannel fileChannel, Map results, Ch public void run() { // no names bigger than this byte[] nameBytes = new byte[127]; - byte[] valBytes = new byte[127]; boolean inName = true; MappedByteBuffer mappedByteBuffer; try { @@ -137,7 +147,7 @@ public void run() { throw new RuntimeException(e); } short nameIndex = 0; - short valIndex = 0; + double ot = 0; while (mappedByteBuffer.hasRemaining()) { byte c = mappedByteBuffer.get(); if (c == 0x3B /* Semicolon */) { @@ -147,28 +157,78 @@ public void run() { else if (c == 0xA /* Newline */) { // back to name and reset buffers at end String name = new String(nameBytes, 0, nameIndex, StandardCharsets.UTF_8); - double t = Double.parseDouble(new String(valBytes, 0, valIndex, StandardCharsets.UTF_8)); - if (results.containsKey(name)) { - ResultRow rr = results.get(name); - rr.min = Math.min(rr.min, t); - rr.max = Math.max(rr.max, t); + // int nameHash = fnv(nameBytes, nameIndex); + ResultRow rr; + if ((rr = results.get(name)) != null) { + rr.min = Math.min(rr.min, ot); + rr.max = Math.max(rr.max, ot); rr.count++; - rr.sum += t; + rr.sum += ot; } else { - results.put(name, new ResultRow(t)); + results.put(name, new ResultRow(ot)); } inName = true; nameIndex = 0; - valIndex = 0; } else if (inName) { nameBytes[nameIndex++] = c; } else { - valBytes[valIndex++] = c; + // we know the val has to be between -99.9 and 99.8 + // always with a single fractional digit + // represented as a byte array of either 4 or 5 characters + if (c == 0x2D /* minus sign */) { + // minus sign so number will be negative + + // could be either n.x or nn.x + // char 3 + // skip dot + if (mappedByteBuffer.get(mappedByteBuffer.position() + 3) == 0xA) { + ot = (mappedByteBuffer.get() - 48) * 10; // char 1 + } + else { + ot = (mappedByteBuffer.get() - 48) * 100; // char 1 + ot += (mappedByteBuffer.get() - 48) * 10; // char 2 + } + mappedByteBuffer.get(); // skip dot + ot += (mappedByteBuffer.get() - 48); // char 2 + ot = -(ot / 10f); + } + else { + + // could be either n.x or nn.x + // char 3 + // skip dot + if (mappedByteBuffer.get(mappedByteBuffer.position() + 2) == 0xA) { + ot = (c - 48) * 10; // char 1 + } + else { + ot = (c - 48) * 100; // char 1 + ot += (mappedByteBuffer.get() - 48) * 10; // char 2 + } + mappedByteBuffer.get(); // skip dot + ot += (mappedByteBuffer.get() - 48); // char 3 + ot = ot / 10f; + } + } + } + + // merge my results with overall results + for (String k : results.keySet()) { + ResultRow rr; + ResultRow lr = results.get(k); + if ((rr = global.get(k)) != null) { + rr.min = Math.min(rr.min, lr.min); + rr.max = Math.max(rr.max, lr.max); + rr.count += lr.count; + rr.sum += lr.sum; + } + else { + global.put(k, lr); } } + } } @@ -178,11 +238,13 @@ public static void main(String[] args) throws IOException, InterruptedException RandomAccessFile raFile = new RandomAccessFile(FILE, "r"); FileChannel channel = raFile.getChannel(); - int numThreads = 64; + int numThreads = Runtime.getRuntime().availableProcessors(); List chunks = getChunks(numThreads, channel); List threads = new ArrayList<>(); for (Chunk chunk : chunks) { - Thread t = Thread.ofVirtual().name(chunk.toString()).start(new CalculateTask(channel, results, chunk)); + // Thread t = Thread.ofVirtual().name(chunk.toString()).start(new CalculateTask(channel, results, chunk)); + Thread t = new Thread(new CalculateTask(channel, results, chunk)); + t.start(); threads.add(t); } From 78e3c3b198a91e7094d26f299eb771291333d421 Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Sun, 7 Jan 2024 00:31:03 +0000 Subject: [PATCH 04/21] Custom hashmap --- .../onebrc/CalculateAverage_JamalMulla.java | 87 ++++++++++++------- 1 file changed, 57 insertions(+), 30 deletions(-) diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index 405a0eb77..5ff4206db 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -123,13 +123,13 @@ private static int fnv(byte[] bytes, int length) { private static class CalculateTask implements Runnable { private final FileChannel channel; - private final Map results; + private final SimplerHashMap results; private final Map global; private final Chunk chunk; public CalculateTask(FileChannel fileChannel, Map global, Chunk chunk) { this.channel = fileChannel; - this.results = new HashMap<>(); + this.results = new SimplerHashMap(); this.global = global; this.chunk = chunk; } @@ -155,19 +155,7 @@ public void run() { inName = false; } else if (c == 0xA /* Newline */) { - // back to name and reset buffers at end - String name = new String(nameBytes, 0, nameIndex, StandardCharsets.UTF_8); - // int nameHash = fnv(nameBytes, nameIndex); - ResultRow rr; - if ((rr = results.get(name)) != null) { - rr.min = Math.min(rr.min, ot); - rr.max = Math.max(rr.max, ot); - rr.count++; - rr.sum += ot; - } - else { - results.put(name, new ResultRow(ot)); - } + results.putOrMerge(nameBytes, nameIndex, ot); inName = true; nameIndex = 0; } @@ -179,11 +167,7 @@ else if (inName) { // always with a single fractional digit // represented as a byte array of either 4 or 5 characters if (c == 0x2D /* minus sign */) { - // minus sign so number will be negative - // could be either n.x or nn.x - // char 3 - // skip dot if (mappedByteBuffer.get(mappedByteBuffer.position() + 3) == 0xA) { ot = (mappedByteBuffer.get() - 48) * 10; // char 1 } @@ -196,10 +180,7 @@ else if (inName) { ot = -(ot / 10f); } else { - // could be either n.x or nn.x - // char 3 - // skip dot if (mappedByteBuffer.get(mappedByteBuffer.position() + 2) == 0xA) { ot = (c - 48) * 10; // char 1 } @@ -214,21 +195,20 @@ else if (inName) { } } - // merge my results with overall results - for (String k : results.keySet()) { + // merge results with overall results + for (MapEntry me : results.getAll()) { ResultRow rr; - ResultRow lr = results.get(k); - if ((rr = global.get(k)) != null) { + ResultRow lr = me.row; + if ((rr = global.get(me.key)) != null) { rr.min = Math.min(rr.min, lr.min); rr.max = Math.max(rr.max, lr.max); rr.count += lr.count; rr.sum += lr.sum; } else { - global.put(k, lr); + global.put(me.key, lr); } } - } } @@ -241,8 +221,7 @@ public static void main(String[] args) throws IOException, InterruptedException int numThreads = Runtime.getRuntime().availableProcessors(); List chunks = getChunks(numThreads, channel); List threads = new ArrayList<>(); - for (Chunk chunk : chunks) { - // Thread t = Thread.ofVirtual().name(chunk.toString()).start(new CalculateTask(channel, results, chunk)); + for (Chunk chunk : chunks){ Thread t = new Thread(new CalculateTask(channel, results, chunk)); t.start(); threads.add(t); @@ -255,4 +234,52 @@ public static void main(String[] args) throws IOException, InterruptedException // just to sort System.out.println(new TreeMap<>(results)); } + + record MapEntry(String key, ResultRow row) { + } + + static class SimplerHashMap { + // can't have more than 10000 unique keys but need size to be power of 2 for masking + int MAPSIZE = 16384; + ResultRow[] slots = new ResultRow[MAPSIZE]; + byte[][] keys = new byte[MAPSIZE][]; + + public void putOrMerge(byte[] key, int length, double temp) { + int hash = fnv(key, length); + int slot = hash & (MAPSIZE - 1); + ResultRow slotValue = slots[slot]; + + // Linear probe for open slot + while (slotValue != null && (keys[slot].length != length || !Arrays.equals(keys[slot], 0, length, key, 0, length))) { + slot = (slot + 1) & (MAPSIZE - 1); + slotValue = slots[slot]; + } + ResultRow value = slotValue; + if (value == null) { + slots[slot] = new ResultRow(temp); + byte[] bytes = new byte[length]; + System.arraycopy(key, 0, bytes, 0, length); + keys[slot] = bytes; + } + else { + value.min = Math.min(value.min, temp); + value.max = Math.max(value.max, temp); + value.sum += temp; + value.count += 1; + } + } + + // Get all pairs + public List getAll() { + List result = new ArrayList<>(slots.length); + for (int i = 0; i < slots.length; i++) { + ResultRow slotValue = slots[i]; + if (slotValue != null) { + result.add(new MapEntry(new String(keys[i], StandardCharsets.UTF_8), slotValue)); + } + } + return result; + } + } + } From 7e40d06d4644a4eeae75cc2205c464dd5caab17f Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Mon, 8 Jan 2024 10:06:01 +0000 Subject: [PATCH 05/21] Graal and some tuning --- ...ulla.sh => calculate_average_jamalmulla.sh | 7 +- ....java => CalculateAverage_jamalmulla.java} | 75 +++++++++---------- 2 files changed, 39 insertions(+), 43 deletions(-) rename calculate_average_JamalMulla.sh => calculate_average_jamalmulla.sh (79%) rename src/main/java/dev/morling/onebrc/{CalculateAverage_JamalMulla.java => CalculateAverage_jamalmulla.java} (82%) diff --git a/calculate_average_JamalMulla.sh b/calculate_average_jamalmulla.sh similarity index 79% rename from calculate_average_JamalMulla.sh rename to calculate_average_jamalmulla.sh index 6ce6afecf..5cfad3034 100755 --- a/calculate_average_JamalMulla.sh +++ b/calculate_average_jamalmulla.sh @@ -15,6 +15,7 @@ # limitations under the License. # - -JAVA_OPTS="" -time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_JamalMulla +source "$HOME/.sdkman/bin/sdkman-init.sh" +sdk use java 21.0.1-graal 1>&2 +JAVA_OPTS="--enable-preview -XX:+AlwaysPreTouch" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_jamalmulla diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_jamalmulla.java similarity index 82% rename from src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java rename to src/main/java/dev/morling/onebrc/CalculateAverage_jamalmulla.java index 5ff4206db..fb3f4eef6 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_jamalmulla.java @@ -15,18 +15,15 @@ */ package dev.morling.onebrc; -import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.nio.CharBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; -public class CalculateAverage_JamalMulla { +public class CalculateAverage_jamalmulla { private static final String FILE = "./measurements.txt"; @@ -52,18 +49,6 @@ private double round(double value) { return Math.round(value * 10.0) / 10.0; } - public double min() { - return min; - } - - public double mean() { - return sum / count; - } - - public double max() { - return max; - } - @Override public boolean equals(Object obj) { if (obj == this) @@ -111,13 +96,13 @@ static List getChunks(int numThreads, FileChannel channel) throws IOExcep return chunks; } - private static int fnv(byte[] bytes, int length) { + private static int fnv(final byte[] bytes, int length) { int hash = 0x811c9dc5; for (int i = 0; i < length; i++) { hash ^= bytes[i]; hash *= 0x01000193; } - return hash; + return ((hash >> 16) ^ hash) & 65535; } private static class CalculateTask implements Runnable { @@ -137,7 +122,7 @@ public CalculateTask(FileChannel fileChannel, Map global, Chu @Override public void run() { // no names bigger than this - byte[] nameBytes = new byte[127]; + byte[] nameBytes = new byte[100]; boolean inName = true; MappedByteBuffer mappedByteBuffer; try { @@ -148,8 +133,11 @@ public void run() { } short nameIndex = 0; double ot = 0; - while (mappedByteBuffer.hasRemaining()) { - byte c = mappedByteBuffer.get(); + + int i = 0; + long cl = chunk.length; + while (i < cl) { + byte c = mappedByteBuffer.get(i++); if (c == 0x3B /* Semicolon */) { // no longer in name inName = false; @@ -168,28 +156,28 @@ else if (inName) { // represented as a byte array of either 4 or 5 characters if (c == 0x2D /* minus sign */) { // could be either n.x or nn.x - if (mappedByteBuffer.get(mappedByteBuffer.position() + 3) == 0xA) { - ot = (mappedByteBuffer.get() - 48) * 10; // char 1 + if (mappedByteBuffer.get(i + 3) == 0xA) { + ot = (mappedByteBuffer.get(i++) - 48) * 10; // char 1 } else { - ot = (mappedByteBuffer.get() - 48) * 100; // char 1 - ot += (mappedByteBuffer.get() - 48) * 10; // char 2 + ot = (mappedByteBuffer.get(i++) - 48) * 100; // char 1 + ot += (mappedByteBuffer.get(i++) - 48) * 10; // char 2 } - mappedByteBuffer.get(); // skip dot - ot += (mappedByteBuffer.get() - 48); // char 2 + mappedByteBuffer.get(i++); // skip dot + ot += (mappedByteBuffer.get(i++) - 48); // char 2 ot = -(ot / 10f); } else { // could be either n.x or nn.x - if (mappedByteBuffer.get(mappedByteBuffer.position() + 2) == 0xA) { + if (mappedByteBuffer.get(i + 2) == 0xA) { ot = (c - 48) * 10; // char 1 } else { ot = (c - 48) * 100; // char 1 - ot += (mappedByteBuffer.get() - 48) * 10; // char 2 + ot += (mappedByteBuffer.get(i++) - 48) * 10; // char 2 } - mappedByteBuffer.get(); // skip dot - ot += (mappedByteBuffer.get() - 48); // char 3 + mappedByteBuffer.get(i++); // skip dot + ot += (mappedByteBuffer.get(i++) - 48); // char 3 ot = ot / 10f; } } @@ -221,7 +209,7 @@ public static void main(String[] args) throws IOException, InterruptedException int numThreads = Runtime.getRuntime().availableProcessors(); List chunks = getChunks(numThreads, channel); List threads = new ArrayList<>(); - for (Chunk chunk : chunks){ + for (Chunk chunk : chunks) { Thread t = new Thread(new CalculateTask(channel, results, chunk)); t.start(); threads.add(t); @@ -239,20 +227,19 @@ record MapEntry(String key, ResultRow row) { } static class SimplerHashMap { - // can't have more than 10000 unique keys but need size to be power of 2 for masking - int MAPSIZE = 16384; + // based on spullara'ss + // can't have more than 10000 unique keys butwant to match max hash + int MAPSIZE = 65536; ResultRow[] slots = new ResultRow[MAPSIZE]; byte[][] keys = new byte[MAPSIZE][]; public void putOrMerge(byte[] key, int length, double temp) { - int hash = fnv(key, length); - int slot = hash & (MAPSIZE - 1); + int slot = fnv(key, length); ResultRow slotValue = slots[slot]; // Linear probe for open slot - while (slotValue != null && (keys[slot].length != length || !Arrays.equals(keys[slot], 0, length, key, 0, length))) { - slot = (slot + 1) & (MAPSIZE - 1); - slotValue = slots[slot]; + while (slotValue != null && (keys[slot].length != length || !arrayEquals(keys[slot], key, length))) { + slotValue = slots[++slot]; } ResultRow value = slotValue; if (value == null) { @@ -265,8 +252,16 @@ public void putOrMerge(byte[] key, int length, double temp) { value.min = Math.min(value.min, temp); value.max = Math.max(value.max, temp); value.sum += temp; - value.count += 1; + value.count++; + } + } + + private boolean arrayEquals(final byte[] a, final byte[] b, final int length) { + for (int i = 0; i < length; i++) { + if (a[i] != b[i]) + return false; } + return true; } // Get all pairs From 31de444dbc358a40d5f32b4481cc0ef9e24be5fa Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Mon, 8 Jan 2024 10:51:37 +0000 Subject: [PATCH 06/21] Fix segmenting --- .../onebrc/CalculateAverage_jamalmulla.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_jamalmulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_jamalmulla.java index fb3f4eef6..8c474d562 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_jamalmulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_jamalmulla.java @@ -76,12 +76,16 @@ static List getChunks(int numThreads, FileChannel channel) throws IOExcep long filebytes = channel.size(); long roughChunkSize = filebytes / numThreads; List chunks = new ArrayList<>(); + // System.out.println("filebytes:" + filebytes + " roughsize: " + roughChunkSize + " numthreads: " + numThreads); long chunkStart = 0; - long chunkLength = roughChunkSize; - for (int i = 0; i < numThreads - 1; i++) { + long chunkLength = Math.min(filebytes - chunkStart - 1, roughChunkSize); + while (chunkStart < filebytes) { // unlikely we need to read more than this many bytes to find the next newline - MappedByteBuffer mbb = channel.map(FileChannel.MapMode.READ_ONLY, chunkStart + chunkLength, 100); + // System.out.println("Chunk start: " + chunkStart + " chunkLength: " + chunkLength); + MappedByteBuffer mbb = channel.map(FileChannel.MapMode.READ_ONLY, chunkStart + chunkLength, + Math.min(Math.min(filebytes - chunkStart - chunkLength, chunkLength), 100)); + while (mbb.get() != 0xA /* \n */) { chunkLength++; } @@ -89,10 +93,11 @@ static List getChunks(int numThreads, FileChannel channel) throws IOExcep chunks.add(new Chunk(chunkStart, chunkLength + 1)); // to skip the nl in the next chunk chunkStart += chunkLength + 1; - chunkLength = roughChunkSize; + chunkLength = Math.min(filebytes - chunkStart - 1, roughChunkSize); } + // System.out.println(chunks); // for the last chunk, we can set it to what's left - chunks.add(new Chunk(chunkStart, filebytes - chunkStart)); + // chunks.add(new Chunk(chunkStart, filebytes - chunkStart)); return chunks; } @@ -205,8 +210,10 @@ public static void main(String[] args) throws IOException, InterruptedException RandomAccessFile raFile = new RandomAccessFile(FILE, "r"); FileChannel channel = raFile.getChannel(); - - int numThreads = Runtime.getRuntime().availableProcessors(); + int numThreads = 1; + if (channel.size() > 64000) { + numThreads = Runtime.getRuntime().availableProcessors(); + } List chunks = getChunks(numThreads, channel); List threads = new ArrayList<>(); for (Chunk chunk : chunks) { @@ -218,7 +225,6 @@ public static void main(String[] args) throws IOException, InterruptedException for (Thread t : threads) { t.join(); } - // just to sort System.out.println(new TreeMap<>(results)); } From 42b34d4a7163082dc2c077c70fa314b5867d9bcd Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Mon, 8 Jan 2024 11:21:55 +0000 Subject: [PATCH 07/21] Fix casing --- ...ate_average_jamalmulla.sh => calculate_average_JamalMulla.sh | 2 +- ...Average_jamalmulla.java => CalculateAverage_JamalMulla.java} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename calculate_average_jamalmulla.sh => calculate_average_JamalMulla.sh (93%) rename src/main/java/dev/morling/onebrc/{CalculateAverage_jamalmulla.java => CalculateAverage_JamalMulla.java} (99%) diff --git a/calculate_average_jamalmulla.sh b/calculate_average_JamalMulla.sh similarity index 93% rename from calculate_average_jamalmulla.sh rename to calculate_average_JamalMulla.sh index 5cfad3034..2a5233acf 100755 --- a/calculate_average_jamalmulla.sh +++ b/calculate_average_JamalMulla.sh @@ -18,4 +18,4 @@ source "$HOME/.sdkman/bin/sdkman-init.sh" sdk use java 21.0.1-graal 1>&2 JAVA_OPTS="--enable-preview -XX:+AlwaysPreTouch" -time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_jamalmulla +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_JamalMulla diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_jamalmulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java similarity index 99% rename from src/main/java/dev/morling/onebrc/CalculateAverage_jamalmulla.java rename to src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index 8c474d562..46ac730e2 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_jamalmulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -23,7 +23,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; -public class CalculateAverage_jamalmulla { +public class CalculateAverage_JamalMulla { private static final String FILE = "./measurements.txt"; From c3b5966b577b955f607d9b8382d857d9160cd31f Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Mon, 8 Jan 2024 13:59:26 +0000 Subject: [PATCH 08/21] Unsafe --- .../onebrc/CalculateAverage_JamalMulla.java | 76 +++++++++++++------ 1 file changed, 52 insertions(+), 24 deletions(-) diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index 46ac730e2..91ce1af32 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -15,8 +15,12 @@ */ package dev.morling.onebrc; +import sun.misc.Unsafe; + import java.io.IOException; import java.io.RandomAccessFile; +import java.lang.foreign.Arena; +import java.lang.reflect.Field; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; @@ -27,6 +31,19 @@ public class CalculateAverage_JamalMulla { private static final String FILE = "./measurements.txt"; + private static final Unsafe UNSAFE = initUnsafe(); + + private static Unsafe initUnsafe() { + try { + Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); + theUnsafe.setAccessible(true); + return (Unsafe) theUnsafe.get(Unsafe.class); + } + catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + private static final class ResultRow { private double min; private double max; @@ -76,6 +93,7 @@ static List getChunks(int numThreads, FileChannel channel) throws IOExcep long filebytes = channel.size(); long roughChunkSize = filebytes / numThreads; List chunks = new ArrayList<>(); + long mappedAddress = channel.map(FileChannel.MapMode.READ_ONLY, 0, filebytes, Arena.global()).address(); // System.out.println("filebytes:" + filebytes + " roughsize: " + roughChunkSize + " numthreads: " + numThreads); long chunkStart = 0; @@ -90,7 +108,7 @@ static List getChunks(int numThreads, FileChannel channel) throws IOExcep chunkLength++; } - chunks.add(new Chunk(chunkStart, chunkLength + 1)); + chunks.add(new Chunk(mappedAddress + chunkStart, chunkLength + 1)); // to skip the nl in the next chunk chunkStart += chunkLength + 1; chunkLength = Math.min(filebytes - chunkStart - 1, roughChunkSize); @@ -129,20 +147,20 @@ public void run() { // no names bigger than this byte[] nameBytes = new byte[100]; boolean inName = true; - MappedByteBuffer mappedByteBuffer; - try { - mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, chunk.start, chunk.length); - } - catch (IOException e) { - throw new RuntimeException(e); - } + // MappedByteBuffer mappedByteBuffer; + // try { + // mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, chunk.start, chunk.length); + // } + // catch (IOException e) { + // throw new RuntimeException(e); + // } short nameIndex = 0; double ot = 0; - int i = 0; - long cl = chunk.length; + long i = chunk.start; + final long cl = chunk.start + chunk.length; while (i < cl) { - byte c = mappedByteBuffer.get(i++); + byte c = UNSAFE.getByte(i++); if (c == 0x3B /* Semicolon */) { // no longer in name inName = false; @@ -161,28 +179,28 @@ else if (inName) { // represented as a byte array of either 4 or 5 characters if (c == 0x2D /* minus sign */) { // could be either n.x or nn.x - if (mappedByteBuffer.get(i + 3) == 0xA) { - ot = (mappedByteBuffer.get(i++) - 48) * 10; // char 1 + if (UNSAFE.getByte(i + 3) == 0xA) { + ot = (UNSAFE.getByte(i++) - 48) * 10; // char 1 } else { - ot = (mappedByteBuffer.get(i++) - 48) * 100; // char 1 - ot += (mappedByteBuffer.get(i++) - 48) * 10; // char 2 + ot = (UNSAFE.getByte(i++) - 48) * 100; // char 1 + ot += (UNSAFE.getByte(i++) - 48) * 10; // char 2 } - mappedByteBuffer.get(i++); // skip dot - ot += (mappedByteBuffer.get(i++) - 48); // char 2 + i++; // skip dot + ot += (UNSAFE.getByte(i++) - 48); // char 2 ot = -(ot / 10f); } else { // could be either n.x or nn.x - if (mappedByteBuffer.get(i + 2) == 0xA) { + if (UNSAFE.getByte(i + 2) == 0xA) { ot = (c - 48) * 10; // char 1 } else { ot = (c - 48) * 100; // char 1 - ot += (mappedByteBuffer.get(i++) - 48) * 10; // char 2 + ot += (UNSAFE.getByte(i++) - 48) * 10; // char 2 } - mappedByteBuffer.get(i++); // skip dot - ot += (mappedByteBuffer.get(i++) - 48); // char 3 + i++; // skip dot + ot += (UNSAFE.getByte(i++) - 48); // char 3 ot = ot / 10f; } } @@ -244,7 +262,7 @@ public void putOrMerge(byte[] key, int length, double temp) { ResultRow slotValue = slots[slot]; // Linear probe for open slot - while (slotValue != null && (keys[slot].length != length || !arrayEquals(keys[slot], key, length))) { + while (slotValue != null && (keys[slot].length != length || !unsafeEquals(keys[slot], key, length))) { slotValue = slots[++slot]; } ResultRow value = slotValue; @@ -255,8 +273,8 @@ public void putOrMerge(byte[] key, int length, double temp) { keys[slot] = bytes; } else { - value.min = Math.min(value.min, temp); - value.max = Math.max(value.max, temp); + value.min = (value.min <= temp) ? value.min : temp; + value.max = (value.max >= temp) ? value.max : temp; value.sum += temp; value.count++; } @@ -270,6 +288,16 @@ private boolean arrayEquals(final byte[] a, final byte[] b, final int length) { return true; } + static boolean unsafeEquals(final byte[] a, final byte[] b, final int length) { + int baseOffset = UNSAFE.arrayBaseOffset(byte[].class); + for (int i = 0; i < length; i++) { + if (UNSAFE.getByte(a, i + baseOffset) != UNSAFE.getByte(b, i + baseOffset)) { + return false; + } + } + return true; + } + // Get all pairs public List getAll() { List result = new ArrayList<>(slots.length); From 6ab6882c2d790266be55eb7f415605580d52fc6f Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Mon, 8 Jan 2024 15:04:48 +0000 Subject: [PATCH 09/21] Inlining hash calc --- calculate_average_JamalMulla.sh | 2 +- .../onebrc/CalculateAverage_JamalMulla.java | 56 ++++++++----------- 2 files changed, 23 insertions(+), 35 deletions(-) diff --git a/calculate_average_JamalMulla.sh b/calculate_average_JamalMulla.sh index 2a5233acf..7b9a603a0 100755 --- a/calculate_average_JamalMulla.sh +++ b/calculate_average_JamalMulla.sh @@ -17,5 +17,5 @@ source "$HOME/.sdkman/bin/sdkman-init.sh" sdk use java 21.0.1-graal 1>&2 -JAVA_OPTS="--enable-preview -XX:+AlwaysPreTouch" +JAVA_OPTS="--enable-preview -XX:+UnlockExperimentalVMOptions -XX:+TrustFinalNonStaticFields -XX:+UseNUMA -XX:+UseTransparentHugePages" time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_JamalMulla diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index 91ce1af32..34075fa88 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -45,13 +45,13 @@ private static Unsafe initUnsafe() { } private static final class ResultRow { - private double min; - private double max; + private int min; + private int max; - private double sum; - private long count; + private long sum; + private int count; - private ResultRow(double v) { + private ResultRow(int v) { this.min = v; this.max = v; this.sum = v; @@ -59,11 +59,11 @@ private ResultRow(double v) { } public String toString() { - return round(min) + "/" + round(sum / count) + "/" + round(max); + return round(min) + "/" + round((double) (sum) / count) + "/" + round(max); } private double round(double value) { - return Math.round(value * 10.0) / 10.0; + return Math.round(value) / 10.0; } @Override @@ -147,15 +147,9 @@ public void run() { // no names bigger than this byte[] nameBytes = new byte[100]; boolean inName = true; - // MappedByteBuffer mappedByteBuffer; - // try { - // mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, chunk.start, chunk.length); - // } - // catch (IOException e) { - // throw new RuntimeException(e); - // } short nameIndex = 0; - double ot = 0; + int ot = 0; + int hash = 0x811c9dc5; long i = chunk.start; final long cl = chunk.start + chunk.length; @@ -166,12 +160,16 @@ public void run() { inName = false; } else if (c == 0xA /* Newline */) { - results.putOrMerge(nameBytes, nameIndex, ot); + hash = ((hash >> 16) ^ hash) & 65535; + results.putOrMerge(nameBytes, nameIndex, hash, ot); inName = true; nameIndex = 0; + hash = 0x811c9dc5; } else if (inName) { nameBytes[nameIndex++] = c; + hash ^= c; + hash *= 0x01000193; } else { // we know the val has to be between -99.9 and 99.8 @@ -188,7 +186,7 @@ else if (inName) { } i++; // skip dot ot += (UNSAFE.getByte(i++) - 48); // char 2 - ot = -(ot / 10f); + ot = -ot; } else { // could be either n.x or nn.x @@ -201,7 +199,6 @@ else if (inName) { } i++; // skip dot ot += (UNSAFE.getByte(i++) - 48); // char 3 - ot = ot / 10f; } } } @@ -257,37 +254,28 @@ static class SimplerHashMap { ResultRow[] slots = new ResultRow[MAPSIZE]; byte[][] keys = new byte[MAPSIZE][]; - public void putOrMerge(byte[] key, int length, double temp) { - int slot = fnv(key, length); + public void putOrMerge(byte[] key, int length, int hash, int temp) { + int slot = hash; ResultRow slotValue = slots[slot]; // Linear probe for open slot while (slotValue != null && (keys[slot].length != length || !unsafeEquals(keys[slot], key, length))) { slotValue = slots[++slot]; } - ResultRow value = slotValue; - if (value == null) { + if (slotValue == null) { slots[slot] = new ResultRow(temp); byte[] bytes = new byte[length]; System.arraycopy(key, 0, bytes, 0, length); keys[slot] = bytes; } else { - value.min = (value.min <= temp) ? value.min : temp; - value.max = (value.max >= temp) ? value.max : temp; - value.sum += temp; - value.count++; + slotValue.min = Math.min(slotValue.min, temp); + slotValue.max = Math.max(slotValue.max, temp); + slotValue.sum += temp; + slotValue.count++; } } - private boolean arrayEquals(final byte[] a, final byte[] b, final int length) { - for (int i = 0; i < length; i++) { - if (a[i] != b[i]) - return false; - } - return true; - } - static boolean unsafeEquals(final byte[] a, final byte[] b, final int length) { int baseOffset = UNSAFE.arrayBaseOffset(byte[].class); for (int i = 0; i < length; i++) { From c7ad3746ce4fda78867bf7f4d47c4b0c3d29682a Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Mon, 8 Jan 2024 16:20:49 +0000 Subject: [PATCH 10/21] Improved loop --- .../onebrc/CalculateAverage_JamalMulla.java | 76 +++++++++---------- 1 file changed, 36 insertions(+), 40 deletions(-) diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index 34075fa88..8ebac4ad6 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -146,61 +146,57 @@ public CalculateTask(FileChannel fileChannel, Map global, Chu public void run() { // no names bigger than this byte[] nameBytes = new byte[100]; - boolean inName = true; short nameIndex = 0; - int ot = 0; + int ot; int hash = 0x811c9dc5; long i = chunk.start; final long cl = chunk.start + chunk.length; while (i < cl) { - byte c = UNSAFE.getByte(i++); - if (c == 0x3B /* Semicolon */) { - // no longer in name - inName = false; - } - else if (c == 0xA /* Newline */) { - hash = ((hash >> 16) ^ hash) & 65535; - results.putOrMerge(nameBytes, nameIndex, hash, ot); - inName = true; - nameIndex = 0; - hash = 0x811c9dc5; - } - else if (inName) { + byte c; + while ((c = UNSAFE.getByte(i++)) != 0x3B) { nameBytes[nameIndex++] = c; hash ^= c; hash *= 0x01000193; } + + // the temp + c = UNSAFE.getByte(i++); + // we know the val has to be between -99.9 and 99.8 + // always with a single fractional digit + // represented as a byte array of either 4 or 5 characters + if (c == 0x2D /* minus sign */) { + // could be either n.x or nn.x + if (UNSAFE.getByte(i + 3) == 0xA) { + ot = (UNSAFE.getByte(i++) - 48) * 10; // char 1 + } + else { + ot = (UNSAFE.getByte(i++) - 48) * 100; // char 1 + ot += (UNSAFE.getByte(i++) - 48) * 10; // char 2 + } + i++; // skip dot + ot += (UNSAFE.getByte(i++) - 48); // char 2 + ot = -ot; + } else { - // we know the val has to be between -99.9 and 99.8 - // always with a single fractional digit - // represented as a byte array of either 4 or 5 characters - if (c == 0x2D /* minus sign */) { - // could be either n.x or nn.x - if (UNSAFE.getByte(i + 3) == 0xA) { - ot = (UNSAFE.getByte(i++) - 48) * 10; // char 1 - } - else { - ot = (UNSAFE.getByte(i++) - 48) * 100; // char 1 - ot += (UNSAFE.getByte(i++) - 48) * 10; // char 2 - } - i++; // skip dot - ot += (UNSAFE.getByte(i++) - 48); // char 2 - ot = -ot; + // could be either n.x or nn.x + if (UNSAFE.getByte(i + 2) == 0xA) { + ot = (c - 48) * 10; // char 1 } else { - // could be either n.x or nn.x - if (UNSAFE.getByte(i + 2) == 0xA) { - ot = (c - 48) * 10; // char 1 - } - else { - ot = (c - 48) * 100; // char 1 - ot += (UNSAFE.getByte(i++) - 48) * 10; // char 2 - } - i++; // skip dot - ot += (UNSAFE.getByte(i++) - 48); // char 3 + ot = (c - 48) * 100; // char 1 + ot += (UNSAFE.getByte(i++) - 48) * 10; // char 2 } + i++; // skip dot + ot += (UNSAFE.getByte(i++) - 48); // char 3 } + + i++;// nl + hash = ((hash >> 16) ^ hash) & 65535; + results.putOrMerge(nameBytes, nameIndex, hash, ot); + // reset + nameIndex = 0; + hash = 0x811c9dc5; } // merge results with overall results From 4eb2eda883249d781646b4f6542cd44b85867dca Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Mon, 8 Jan 2024 17:12:14 +0000 Subject: [PATCH 11/21] Cleanup --- .../onebrc/CalculateAverage_JamalMulla.java | 25 ++++--------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index 8ebac4ad6..d27e226b5 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -90,17 +90,14 @@ private record Chunk(Long start, Long length) { static List getChunks(int numThreads, FileChannel channel) throws IOException { // get all chunk boundaries - long filebytes = channel.size(); - long roughChunkSize = filebytes / numThreads; - List chunks = new ArrayList<>(); - long mappedAddress = channel.map(FileChannel.MapMode.READ_ONLY, 0, filebytes, Arena.global()).address(); - // System.out.println("filebytes:" + filebytes + " roughsize: " + roughChunkSize + " numthreads: " + numThreads); - + final long filebytes = channel.size(); + final long roughChunkSize = filebytes / numThreads; + final List chunks = new ArrayList<>(); + final long mappedAddress = channel.map(FileChannel.MapMode.READ_ONLY, 0, filebytes, Arena.global()).address(); long chunkStart = 0; long chunkLength = Math.min(filebytes - chunkStart - 1, roughChunkSize); while (chunkStart < filebytes) { // unlikely we need to read more than this many bytes to find the next newline - // System.out.println("Chunk start: " + chunkStart + " chunkLength: " + chunkLength); MappedByteBuffer mbb = channel.map(FileChannel.MapMode.READ_ONLY, chunkStart + chunkLength, Math.min(Math.min(filebytes - chunkStart - chunkLength, chunkLength), 100)); @@ -113,21 +110,9 @@ static List getChunks(int numThreads, FileChannel channel) throws IOExcep chunkStart += chunkLength + 1; chunkLength = Math.min(filebytes - chunkStart - 1, roughChunkSize); } - // System.out.println(chunks); - // for the last chunk, we can set it to what's left - // chunks.add(new Chunk(chunkStart, filebytes - chunkStart)); return chunks; } - private static int fnv(final byte[] bytes, int length) { - int hash = 0x811c9dc5; - for (int i = 0; i < length; i++) { - hash ^= bytes[i]; - hash *= 0x01000193; - } - return ((hash >> 16) ^ hash) & 65535; - } - private static class CalculateTask implements Runnable { private final FileChannel channel; @@ -250,7 +235,7 @@ static class SimplerHashMap { ResultRow[] slots = new ResultRow[MAPSIZE]; byte[][] keys = new byte[MAPSIZE][]; - public void putOrMerge(byte[] key, int length, int hash, int temp) { + public void putOrMerge(final byte[] key, final int length, final int hash, final int temp) { int slot = hash; ResultRow slotValue = slots[slot]; From 58c33ff242de4447fa9d87b3d2a03ee69e076fca Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Mon, 8 Jan 2024 22:37:38 +0000 Subject: [PATCH 12/21] Speeding up equals --- .../onebrc/CalculateAverage_JamalMulla.java | 92 +++++++++---------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index d27e226b5..0109debd2 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -30,7 +30,6 @@ public class CalculateAverage_JamalMulla { private static final String FILE = "./measurements.txt"; - private static final Unsafe UNSAFE = initUnsafe(); private static Unsafe initUnsafe() { @@ -47,7 +46,6 @@ private static Unsafe initUnsafe() { private static final class ResultRow { private int min; private int max; - private long sum; private int count; @@ -65,24 +63,6 @@ public String toString() { private double round(double value) { return Math.round(value) / 10.0; } - - @Override - public boolean equals(Object obj) { - if (obj == this) - return true; - if (obj == null || obj.getClass() != this.getClass()) - return false; - var that = (ResultRow) obj; - return Double.doubleToLongBits(this.min) == Double.doubleToLongBits(that.min) && - Double.doubleToLongBits(this.sum) == Double.doubleToLongBits(that.sum) && - Double.doubleToLongBits(this.max) == Double.doubleToLongBits(that.max); - } - - @Override - public int hashCode() { - return Objects.hash(min, sum, max); - } - } private record Chunk(Long start, Long length) { @@ -92,7 +72,7 @@ static List getChunks(int numThreads, FileChannel channel) throws IOExcep // get all chunk boundaries final long filebytes = channel.size(); final long roughChunkSize = filebytes / numThreads; - final List chunks = new ArrayList<>(); + final List chunks = new ArrayList<>(numThreads); final long mappedAddress = channel.map(FileChannel.MapMode.READ_ONLY, 0, filebytes, Arena.global()).address(); long chunkStart = 0; long chunkLength = Math.min(filebytes - chunkStart - 1, roughChunkSize); @@ -115,13 +95,11 @@ static List getChunks(int numThreads, FileChannel channel) throws IOExcep private static class CalculateTask implements Runnable { - private final FileChannel channel; private final SimplerHashMap results; private final Map global; private final Chunk chunk; - public CalculateTask(FileChannel fileChannel, Map global, Chunk chunk) { - this.channel = fileChannel; + public CalculateTask(Map global, Chunk chunk) { this.results = new SimplerHashMap(); this.global = global; this.chunk = chunk; @@ -133,6 +111,7 @@ public void run() { byte[] nameBytes = new byte[100]; short nameIndex = 0; int ot; + // fnv hash int hash = 0x811c9dc5; long i = chunk.start; @@ -145,7 +124,7 @@ public void run() { hash *= 0x01000193; } - // the temp + // temperature value follows c = UNSAFE.getByte(i++); // we know the val has to be between -99.9 and 99.8 // always with a single fractional digit @@ -213,15 +192,14 @@ public static void main(String[] args) throws IOException, InterruptedException List chunks = getChunks(numThreads, channel); List threads = new ArrayList<>(); for (Chunk chunk : chunks) { - Thread t = new Thread(new CalculateTask(channel, results, chunk)); - t.start(); - threads.add(t); + Thread thread = new Thread(new CalculateTask(results, chunk)); + thread.start(); + threads.add(thread); } - for (Thread t : threads) { t.join(); } - // just to sort + // create treemap just to sort System.out.println(new TreeMap<>(results)); } @@ -235,35 +213,57 @@ static class SimplerHashMap { ResultRow[] slots = new ResultRow[MAPSIZE]; byte[][] keys = new byte[MAPSIZE][]; - public void putOrMerge(final byte[] key, final int length, final int hash, final int temp) { + public void putOrMerge(final byte[] key, final short length, final int hash, final int temp) { int slot = hash; ResultRow slotValue = slots[slot]; - - // Linear probe for open slot - while (slotValue != null && (keys[slot].length != length || !unsafeEquals(keys[slot], key, length))) { - slotValue = slots[++slot]; - } - if (slotValue == null) { - slots[slot] = new ResultRow(temp); - byte[] bytes = new byte[length]; - System.arraycopy(key, 0, bytes, 0, length); - keys[slot] = bytes; - } - else { + // found existing so update + if (slotValue != null && keys[slot].length == length && unsafeEquals(keys[slot], key, length)) { slotValue.min = Math.min(slotValue.min, temp); slotValue.max = Math.max(slotValue.max, temp); slotValue.sum += temp; slotValue.count++; + return; } + // Linear probe for open slot + while (slotValue != null && (keys[slot].length != length || !unsafeEquals(keys[slot], key, length))) { + slotValue = slots[++slot]; + } + slots[slot] = new ResultRow(temp); + byte[] bytes = new byte[length]; + System.arraycopy(key, 0, bytes, 0, length); + keys[slot] = bytes; } - static boolean unsafeEquals(final byte[] a, final byte[] b, final int length) { - int baseOffset = UNSAFE.arrayBaseOffset(byte[].class); - for (int i = 0; i < length; i++) { + static boolean unsafeEquals(final byte[] a, final byte[] b, final short length) { + // byte by byte comparisons are slow, so do as big chunks as possible + final int baseOffset = Unsafe.ARRAY_BYTE_BASE_OFFSET; + + short i = 0; + // round down to nearest power of 8 + for (; i < (length & -8); i += 8) { + if (UNSAFE.getLong(a, i + baseOffset) != UNSAFE.getLong(b, i + baseOffset)) { + return false; + } + } + // leftover ints + for (; i < (length - i & -4); i += 4) { + if (UNSAFE.getInt(a, i + baseOffset) != UNSAFE.getInt(b, i + baseOffset)) { + return false; + } + } + // leftover shorts + for (; i < (length - i & -2); i += 2) { + if (UNSAFE.getShort(a, i + baseOffset) != UNSAFE.getShort(b, i + baseOffset)) { + return false; + } + } + // leftover bytes + for (; i < (length - i); i++) { if (UNSAFE.getByte(a, i + baseOffset) != UNSAFE.getByte(b, i + baseOffset)) { return false; } } + return true; } From 756120a63344cb1456e16b77c479c805889c0d50 Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Mon, 8 Jan 2024 22:43:39 +0000 Subject: [PATCH 13/21] Simplifying hash --- .../onebrc/CalculateAverage_JamalMulla.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index 0109debd2..9cc96ce03 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -118,7 +118,7 @@ public void run() { final long cl = chunk.start + chunk.length; while (i < cl) { byte c; - while ((c = UNSAFE.getByte(i++)) != 0x3B) { + while ((c = UNSAFE.getByte(i++)) != 0x3B /* semi-colon */) { nameBytes[nameIndex++] = c; hash ^= c; hash *= 0x01000193; @@ -156,7 +156,7 @@ public void run() { } i++;// nl - hash = ((hash >> 16) ^ hash) & 65535; + hash &= 65535; results.putOrMerge(nameBytes, nameIndex, hash, ot); // reset nameIndex = 0; @@ -215,19 +215,23 @@ static class SimplerHashMap { public void putOrMerge(final byte[] key, final short length, final int hash, final int temp) { int slot = hash; - ResultRow slotValue = slots[slot]; - // found existing so update - if (slotValue != null && keys[slot].length == length && unsafeEquals(keys[slot], key, length)) { + ResultRow slotValue; + + // Linear probe for open slot + while ((slotValue = slots[slot]) != null && (keys[slot].length != length || !unsafeEquals(keys[slot], key, length))) { + slot++; + } + + // existing + if (slotValue != null) { slotValue.min = Math.min(slotValue.min, temp); slotValue.max = Math.max(slotValue.max, temp); slotValue.sum += temp; slotValue.count++; return; } - // Linear probe for open slot - while (slotValue != null && (keys[slot].length != length || !unsafeEquals(keys[slot], key, length))) { - slotValue = slots[++slot]; - } + + // new value slots[slot] = new ResultRow(temp); byte[] bytes = new byte[length]; System.arraycopy(key, 0, bytes, 0, length); From ee2b765f2b53ac9af674dc3f77f25b38852a0105 Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Tue, 9 Jan 2024 16:11:00 +0000 Subject: [PATCH 14/21] Replace concurrenthashmap with lock --- .../onebrc/CalculateAverage_JamalMulla.java | 65 ++++++++++--------- 1 file changed, 36 insertions(+), 29 deletions(-) diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index 9cc96ce03..6ae6067d9 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -25,12 +25,17 @@ import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public class CalculateAverage_JamalMulla { + static Map global = new HashMap<>(); private static final String FILE = "./measurements.txt"; private static final Unsafe UNSAFE = initUnsafe(); + private static final Lock lock = new ReentrantLock(); + private static final int FNV_32_INIT = 0x811c9dc5; + private static final int FNV_32_PRIME = 0x01000193; private static Unsafe initUnsafe() { try { @@ -96,12 +101,10 @@ static List getChunks(int numThreads, FileChannel channel) throws IOExcep private static class CalculateTask implements Runnable { private final SimplerHashMap results; - private final Map global; private final Chunk chunk; - public CalculateTask(Map global, Chunk chunk) { + public CalculateTask(Chunk chunk) { this.results = new SimplerHashMap(); - this.global = global; this.chunk = chunk; } @@ -112,7 +115,7 @@ public void run() { short nameIndex = 0; int ot; // fnv hash - int hash = 0x811c9dc5; + int hash = FNV_32_INIT; long i = chunk.start; final long cl = chunk.start + chunk.length; @@ -121,7 +124,7 @@ public void run() { while ((c = UNSAFE.getByte(i++)) != 0x3B /* semi-colon */) { nameBytes[nameIndex++] = c; hash ^= c; - hash *= 0x01000193; + hash *= FNV_32_PRIME; } // temperature value follows @@ -164,27 +167,31 @@ public void run() { } // merge results with overall results - for (MapEntry me : results.getAll()) { - ResultRow rr; - ResultRow lr = me.row; - if ((rr = global.get(me.key)) != null) { - rr.min = Math.min(rr.min, lr.min); - rr.max = Math.max(rr.max, lr.max); - rr.count += lr.count; - rr.sum += lr.sum; - } - else { - global.put(me.key, lr); + List all = results.getAll(); + lock.lock(); + try { + for (MapEntry me : all) { + ResultRow rr; + ResultRow lr = me.row; + if ((rr = global.get(me.key)) != null) { + rr.min = Math.min(rr.min, lr.min); + rr.max = Math.max(rr.max, lr.max); + rr.count += lr.count; + rr.sum += lr.sum; + } + else { + global.put(me.key, lr); + } } } + finally { + lock.unlock(); + } } } public static void main(String[] args) throws IOException, InterruptedException { - Map results = new ConcurrentHashMap<>(); - - RandomAccessFile raFile = new RandomAccessFile(FILE, "r"); - FileChannel channel = raFile.getChannel(); + FileChannel channel = new RandomAccessFile(FILE, "r").getChannel(); int numThreads = 1; if (channel.size() > 64000) { numThreads = Runtime.getRuntime().availableProcessors(); @@ -192,7 +199,8 @@ public static void main(String[] args) throws IOException, InterruptedException List chunks = getChunks(numThreads, channel); List threads = new ArrayList<>(); for (Chunk chunk : chunks) { - Thread thread = new Thread(new CalculateTask(results, chunk)); + Thread thread = new Thread(new CalculateTask(chunk)); + thread.setPriority(Thread.MAX_PRIORITY); thread.start(); threads.add(thread); } @@ -200,18 +208,17 @@ public static void main(String[] args) throws IOException, InterruptedException t.join(); } // create treemap just to sort - System.out.println(new TreeMap<>(results)); + System.out.println(new TreeMap<>(global)); } record MapEntry(String key, ResultRow row) { } static class SimplerHashMap { - // based on spullara'ss - // can't have more than 10000 unique keys butwant to match max hash - int MAPSIZE = 65536; - ResultRow[] slots = new ResultRow[MAPSIZE]; - byte[][] keys = new byte[MAPSIZE][]; + // can't have more than 10000 unique keys but want to match max hash + final int MAPSIZE = 65536; + final ResultRow[] slots = new ResultRow[MAPSIZE]; + final byte[][] keys = new byte[MAPSIZE][]; public void putOrMerge(final byte[] key, final short length, final int hash, final int temp) { int slot = hash; @@ -273,7 +280,7 @@ static boolean unsafeEquals(final byte[] a, final byte[] b, final short length) // Get all pairs public List getAll() { - List result = new ArrayList<>(slots.length); + final List result = new ArrayList<>(slots.length); for (int i = 0; i < slots.length; i++) { ResultRow slotValue = slots[i]; if (slotValue != null) { From db62fc3f3d4fabaac165a7aa7f8b0355706f13d4 Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Wed, 10 Jan 2024 21:42:20 +0000 Subject: [PATCH 15/21] Small changes --- calculate_average_JamalMulla.sh | 2 +- .../morling/onebrc/CalculateAverage_JamalMulla.java | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/calculate_average_JamalMulla.sh b/calculate_average_JamalMulla.sh index 7b9a603a0..b7e4709bc 100755 --- a/calculate_average_JamalMulla.sh +++ b/calculate_average_JamalMulla.sh @@ -17,5 +17,5 @@ source "$HOME/.sdkman/bin/sdkman-init.sh" sdk use java 21.0.1-graal 1>&2 -JAVA_OPTS="--enable-preview -XX:+UnlockExperimentalVMOptions -XX:+TrustFinalNonStaticFields -XX:+UseNUMA -XX:+UseTransparentHugePages" +JAVA_OPTS="--enable-preview -XX:+UnlockExperimentalVMOptions -XX:+TrustFinalNonStaticFields -XX:+UseTransparentHugePages" time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_JamalMulla diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index 6ae6067d9..770588556 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -30,7 +30,7 @@ public class CalculateAverage_JamalMulla { - static Map global = new HashMap<>(); + private static final Map global = new HashMap<>(); private static final String FILE = "./measurements.txt"; private static final Unsafe UNSAFE = initUnsafe(); private static final Lock lock = new ReentrantLock(); @@ -111,7 +111,7 @@ public CalculateTask(Chunk chunk) { @Override public void run() { // no names bigger than this - byte[] nameBytes = new byte[100]; + final byte[] nameBytes = new byte[100]; short nameIndex = 0; int ot; // fnv hash @@ -256,18 +256,27 @@ static boolean unsafeEquals(final byte[] a, final byte[] b, final short length) return false; } } + if (i == length) { + return true; + } // leftover ints for (; i < (length - i & -4); i += 4) { if (UNSAFE.getInt(a, i + baseOffset) != UNSAFE.getInt(b, i + baseOffset)) { return false; } } + if (i == length) { + return true; + } // leftover shorts for (; i < (length - i & -2); i += 2) { if (UNSAFE.getShort(a, i + baseOffset) != UNSAFE.getShort(b, i + baseOffset)) { return false; } } + if (i == length) { + return true; + } // leftover bytes for (; i < (length - i); i++) { if (UNSAFE.getByte(a, i + baseOffset) != UNSAFE.getByte(b, i + baseOffset)) { From 3cf1ce14af323ee8db6885a182d183b1d3e0d932 Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Thu, 11 Jan 2024 09:38:33 +0000 Subject: [PATCH 16/21] Script reorg --- calculate_average_JamalMulla.sh | 2 -- 1 file changed, 2 deletions(-) diff --git a/calculate_average_JamalMulla.sh b/calculate_average_JamalMulla.sh index b7e4709bc..2f58829eb 100755 --- a/calculate_average_JamalMulla.sh +++ b/calculate_average_JamalMulla.sh @@ -15,7 +15,5 @@ # limitations under the License. # -source "$HOME/.sdkman/bin/sdkman-init.sh" -sdk use java 21.0.1-graal 1>&2 JAVA_OPTS="--enable-preview -XX:+UnlockExperimentalVMOptions -XX:+TrustFinalNonStaticFields -XX:+UseTransparentHugePages" time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_JamalMulla From b52ce523eb222f73dea2d59705306a6a5db27aa5 Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Sat, 13 Jan 2024 21:21:18 +0000 Subject: [PATCH 17/21] Native --- calculate_average_JamalMulla.sh | 11 +++++-- prepare_JamalMulla.sh | 9 +++++- .../onebrc/CalculateAverage_JamalMulla.java | 32 +++++++++++++------ 3 files changed, 39 insertions(+), 13 deletions(-) diff --git a/calculate_average_JamalMulla.sh b/calculate_average_JamalMulla.sh index 2f58829eb..7fc77a17a 100755 --- a/calculate_average_JamalMulla.sh +++ b/calculate_average_JamalMulla.sh @@ -15,5 +15,12 @@ # limitations under the License. # -JAVA_OPTS="--enable-preview -XX:+UnlockExperimentalVMOptions -XX:+TrustFinalNonStaticFields -XX:+UseTransparentHugePages" -time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_JamalMulla + + +if [ -f target/CalculateAverage_JamalMulla_image ]; then + target/CalculateAverage_JamalMulla_image +else + JAVA_OPTS="--enable-preview -XX:+UnlockExperimentalVMOptions -XX:+PreserveFramePointer -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints + -XX:+DumpPerfMapAtExit -XX:-OmitStackTraceInFastThrow -XX:+ShowHiddenFrames -XX:+TrustFinalNonStaticFields -XX:+UseTransparentHugePages" + java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_JamalMulla +fi \ No newline at end of file diff --git a/prepare_JamalMulla.sh b/prepare_JamalMulla.sh index ec0f35f1c..119d4a70b 100755 --- a/prepare_JamalMulla.sh +++ b/prepare_JamalMulla.sh @@ -16,4 +16,11 @@ # source "$HOME/.sdkman/bin/sdkman-init.sh" -sdk use java 21.0.1-graal 1>&2 \ No newline at end of file +sdk use java 21.0.1-graal 1>&2 + +# ./mvnw clean verify removes target/ and will re-trigger native image creation. +if [ ! -f target/CalculateAverage_JamalMulla_image ]; then + NATIVE_IMAGE_OPTS="--gc=epsilon -O3 -march=native --enable-preview --strict-image-heap -H:-DeleteLocalSymbols -H:+PreserveFramePointer" + # Use -H:MethodFilter=CalculateAverage_thomaswue.* -H:Dump=:2 -H:PrintGraph=Network for IdealGraphVisualizer graph dumping. + native-image $NATIVE_IMAGE_OPTS -cp target/average-1.0.0-SNAPSHOT.jar -o target/CalculateAverage_JamalMulla_image dev.morling.onebrc.CalculateAverage_JamalMulla +fi \ No newline at end of file diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index 770588556..68e0a8b4c 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -17,6 +17,7 @@ import sun.misc.Unsafe; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.lang.foreign.Arena; @@ -32,6 +33,7 @@ public class CalculateAverage_JamalMulla { private static final Map global = new HashMap<>(); private static final String FILE = "./measurements.txt"; + private static final FileChannel channel = initChannel(); private static final Unsafe UNSAFE = initUnsafe(); private static final Lock lock = new ReentrantLock(); private static final int FNV_32_INIT = 0x811c9dc5; @@ -48,6 +50,15 @@ private static Unsafe initUnsafe() { } } + private static FileChannel initChannel() { + try { + return new RandomAccessFile(FILE, "r").getChannel(); + } + catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + } + private static final class ResultRow { private int min; private int max; @@ -73,14 +84,15 @@ private double round(double value) { private record Chunk(Long start, Long length) { } - static List getChunks(int numThreads, FileChannel channel) throws IOException { + static Chunk[] getChunks(int numThreads, FileChannel channel) throws IOException { // get all chunk boundaries final long filebytes = channel.size(); final long roughChunkSize = filebytes / numThreads; - final List chunks = new ArrayList<>(numThreads); + final Chunk[] chunks = new Chunk[numThreads]; final long mappedAddress = channel.map(FileChannel.MapMode.READ_ONLY, 0, filebytes, Arena.global()).address(); long chunkStart = 0; long chunkLength = Math.min(filebytes - chunkStart - 1, roughChunkSize); + int i = 0; while (chunkStart < filebytes) { // unlikely we need to read more than this many bytes to find the next newline MappedByteBuffer mbb = channel.map(FileChannel.MapMode.READ_ONLY, chunkStart + chunkLength, @@ -90,10 +102,11 @@ static List getChunks(int numThreads, FileChannel channel) throws IOExcep chunkLength++; } - chunks.add(new Chunk(mappedAddress + chunkStart, chunkLength + 1)); + chunks[i] = new Chunk(mappedAddress + chunkStart, chunkLength + 1); // to skip the nl in the next chunk chunkStart += chunkLength + 1; chunkLength = Math.min(filebytes - chunkStart - 1, roughChunkSize); + i++; } return chunks; } @@ -112,7 +125,7 @@ public CalculateTask(Chunk chunk) { public void run() { // no names bigger than this final byte[] nameBytes = new byte[100]; - short nameIndex = 0; + byte nameIndex = 0; int ot; // fnv hash int hash = FNV_32_INIT; @@ -191,18 +204,17 @@ public void run() { } public static void main(String[] args) throws IOException, InterruptedException { - FileChannel channel = new RandomAccessFile(FILE, "r").getChannel(); int numThreads = 1; if (channel.size() > 64000) { numThreads = Runtime.getRuntime().availableProcessors(); } - List chunks = getChunks(numThreads, channel); - List threads = new ArrayList<>(); - for (Chunk chunk : chunks) { - Thread thread = new Thread(new CalculateTask(chunk)); + Chunk[] chunks = getChunks(numThreads, channel); + Thread[] threads = new Thread[chunks.length]; + for (int i = 0; i < chunks.length; i++) { + Thread thread = new Thread(new CalculateTask(chunks[i])); thread.setPriority(Thread.MAX_PRIORITY); thread.start(); - threads.add(thread); + threads[i] = thread; } for (Thread t : threads) { t.join(); From 577fc3a3df43974374fe81e914c7dba5f8eee5c4 Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Sun, 14 Jan 2024 17:48:14 +0000 Subject: [PATCH 18/21] Lots of inlining and improvements --- calculate_average_JamalMulla.sh | 4 +- prepare_JamalMulla.sh | 3 +- .../onebrc/CalculateAverage_JamalMulla.java | 368 +++++++++--------- 3 files changed, 183 insertions(+), 192 deletions(-) diff --git a/calculate_average_JamalMulla.sh b/calculate_average_JamalMulla.sh index 7fc77a17a..c14034b80 100755 --- a/calculate_average_JamalMulla.sh +++ b/calculate_average_JamalMulla.sh @@ -20,7 +20,7 @@ if [ -f target/CalculateAverage_JamalMulla_image ]; then target/CalculateAverage_JamalMulla_image else - JAVA_OPTS="--enable-preview -XX:+UnlockExperimentalVMOptions -XX:+PreserveFramePointer -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints - -XX:+DumpPerfMapAtExit -XX:-OmitStackTraceInFastThrow -XX:+ShowHiddenFrames -XX:+TrustFinalNonStaticFields -XX:+UseTransparentHugePages" + echo "JVM version" + JAVA_OPTS="--enable-preview -XX:+UnlockExperimentalVMOptions -XX:+TrustFinalNonStaticFields -XX:+UseTransparentHugePages -XX:-TieredCompilation" java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_JamalMulla fi \ No newline at end of file diff --git a/prepare_JamalMulla.sh b/prepare_JamalMulla.sh index 119d4a70b..8b4fc401d 100755 --- a/prepare_JamalMulla.sh +++ b/prepare_JamalMulla.sh @@ -20,7 +20,6 @@ sdk use java 21.0.1-graal 1>&2 # ./mvnw clean verify removes target/ and will re-trigger native image creation. if [ ! -f target/CalculateAverage_JamalMulla_image ]; then - NATIVE_IMAGE_OPTS="--gc=epsilon -O3 -march=native --enable-preview --strict-image-heap -H:-DeleteLocalSymbols -H:+PreserveFramePointer" - # Use -H:MethodFilter=CalculateAverage_thomaswue.* -H:Dump=:2 -H:PrintGraph=Network for IdealGraphVisualizer graph dumping. + NATIVE_IMAGE_OPTS="--gc=epsilon -O3 -march=native --enable-preview --strict-image-heap -R:MaxHeapSize=64m --link-at-build-time -da -dsa --no-fallback" native-image $NATIVE_IMAGE_OPTS -cp target/average-1.0.0-SNAPSHOT.jar -o target/CalculateAverage_JamalMulla_image dev.morling.onebrc.CalculateAverage_JamalMulla fi \ No newline at end of file diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index 68e0a8b4c..c82db0ec9 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -17,27 +17,27 @@ import sun.misc.Unsafe; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.lang.foreign.Arena; import java.lang.reflect.Field; -import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class CalculateAverage_JamalMulla { - private static final Map global = new HashMap<>(); + private static final long ALL_SEMIS = 0x3B3B3B3B3B3B3B3BL; + private static final Map global = new TreeMap<>(); private static final String FILE = "./measurements.txt"; - private static final FileChannel channel = initChannel(); private static final Unsafe UNSAFE = initUnsafe(); private static final Lock lock = new ReentrantLock(); - private static final int FNV_32_INIT = 0x811c9dc5; - private static final int FNV_32_PRIME = 0x01000193; + private static final long FXSEED = 0x517cc1b727220a95L; private static Unsafe initUnsafe() { try { @@ -50,15 +50,6 @@ private static Unsafe initUnsafe() { } } - private static FileChannel initChannel() { - try { - return new RandomAccessFile(FILE, "r").getChannel(); - } - catch (FileNotFoundException e) { - throw new RuntimeException(e); - } - } - private static final class ResultRow { private int min; private int max; @@ -73,12 +64,13 @@ private ResultRow(int v) { } public String toString() { - return round(min) + "/" + round((double) (sum) / count) + "/" + round(max); - } + return STR."\{round(min)}/\{round((double) (sum) / count)}/\{round(max)}"; + } private double round(double value) { return Math.round(value) / 10.0; } + } private record Chunk(Long start, Long length) { @@ -94,11 +86,7 @@ static Chunk[] getChunks(int numThreads, FileChannel channel) throws IOException long chunkLength = Math.min(filebytes - chunkStart - 1, roughChunkSize); int i = 0; while (chunkStart < filebytes) { - // unlikely we need to read more than this many bytes to find the next newline - MappedByteBuffer mbb = channel.map(FileChannel.MapMode.READ_ONLY, chunkStart + chunkLength, - Math.min(Math.min(filebytes - chunkStart - chunkLength, chunkLength), 100)); - - while (mbb.get() != 0xA /* \n */) { + while (UNSAFE.getByte(mappedAddress + chunkStart + chunkLength) != 0xA /* \n */) { chunkLength++; } @@ -108,138 +96,101 @@ static Chunk[] getChunks(int numThreads, FileChannel channel) throws IOException chunkLength = Math.min(filebytes - chunkStart - 1, roughChunkSize); i++; } + return chunks; } - private static class CalculateTask implements Runnable { - - private final SimplerHashMap results; - private final Chunk chunk; + private static void run(Chunk chunk) { - public CalculateTask(Chunk chunk) { - this.results = new SimplerHashMap(); - this.chunk = chunk; - } + // can't have more than 10000 unique keys but want to match max hash + final int MAPSIZE = 65536; + final ResultRow[] slots = new ResultRow[MAPSIZE]; + final byte[][] keys = new byte[MAPSIZE][]; - @Override - public void run() { - // no names bigger than this - final byte[] nameBytes = new byte[100]; - byte nameIndex = 0; - int ot; - // fnv hash - int hash = FNV_32_INIT; - - long i = chunk.start; - final long cl = chunk.start + chunk.length; - while (i < cl) { - byte c; - while ((c = UNSAFE.getByte(i++)) != 0x3B /* semi-colon */) { - nameBytes[nameIndex++] = c; - hash ^= c; - hash *= FNV_32_PRIME; - } + byte nameLength; + int temp; + long hash; + + long i = chunk.start; + final long cl = chunk.start + chunk.length; + long word; + long hs; + long start; + byte c; + int slot; + ResultRow slotValue; + + while (i < cl) { + start = i; + hash = 0; + + word = UNSAFE.getLong(i); + + while ((hs = hasSemiColon(word)) == 0) { + hash = (Long.rotateLeft(hash, 5) ^ word) * FXSEED; // fxhash + i += 8; + word = UNSAFE.getLong(i); + } - // temperature value follows - c = UNSAFE.getByte(i++); - // we know the val has to be between -99.9 and 99.8 - // always with a single fractional digit - // represented as a byte array of either 4 or 5 characters - if (c == 0x2D /* minus sign */) { - // could be either n.x or nn.x - if (UNSAFE.getByte(i + 3) == 0xA) { - ot = (UNSAFE.getByte(i++) - 48) * 10; // char 1 - } - else { - ot = (UNSAFE.getByte(i++) - 48) * 100; // char 1 - ot += (UNSAFE.getByte(i++) - 48) * 10; // char 2 - } - i++; // skip dot - ot += (UNSAFE.getByte(i++) - 48); // char 2 - ot = -ot; + if (hs == 0x8000L || hs == -0x7FFFFFFFFFFF8000L) + i++; + else if (hs == 0x800000L) + i += 2; + else if (hs == 0x80000000L) + i += 3; + else if (hs == 0x8000000000L) + i += 4; + else if (hs == 0x800000000000L) + i += 5; + else if (hs == 0x80000000000000L) + i += 6; + else if (hs == 0x8000000000000000L) + i += 7; + + // fxhash of what's left ((hs >>> 7) - 1) masks off the bytes from word that are before the semicolon + hash = (Long.rotateLeft(hash, 5) ^ word & (hs >>> 7) - 1) * FXSEED; + nameLength = (byte) (i++ - start); + + // temperature value follows + c = UNSAFE.getByte(i++); + // we know the val has to be between -99.9 and 99.8 + // always with a single fractional digit + // represented as a byte array of either 4 or 5 characters + if (c == 0x2D /* minus sign */) { + // could be either n.x or nn.x + if (UNSAFE.getByte(i + 3) == 0xA) { + temp = (UNSAFE.getByte(i++) - 48) * 10; // char 1 } else { - // could be either n.x or nn.x - if (UNSAFE.getByte(i + 2) == 0xA) { - ot = (c - 48) * 10; // char 1 - } - else { - ot = (c - 48) * 100; // char 1 - ot += (UNSAFE.getByte(i++) - 48) * 10; // char 2 - } - i++; // skip dot - ot += (UNSAFE.getByte(i++) - 48); // char 3 + temp = (UNSAFE.getByte(i++) - 48) * 100; // char 1 + temp += (UNSAFE.getByte(i++) - 48) * 10; // char 2 } - - i++;// nl - hash &= 65535; - results.putOrMerge(nameBytes, nameIndex, hash, ot); - // reset - nameIndex = 0; - hash = 0x811c9dc5; + i++; // skip dot + temp += (UNSAFE.getByte(i++) - 48); // char 2 + temp = -temp; } - - // merge results with overall results - List all = results.getAll(); - lock.lock(); - try { - for (MapEntry me : all) { - ResultRow rr; - ResultRow lr = me.row; - if ((rr = global.get(me.key)) != null) { - rr.min = Math.min(rr.min, lr.min); - rr.max = Math.max(rr.max, lr.max); - rr.count += lr.count; - rr.sum += lr.sum; - } - else { - global.put(me.key, lr); - } + else { + // could be either n.x or nn.x + if (UNSAFE.getByte(i + 2) == 0xA) { + temp = (c - 48) * 10; // char 1 } + else { + temp = (c - 48) * 100; // char 1 + temp += (UNSAFE.getByte(i++) - 48) * 10; // char 2 + } + i++; // skip dot + temp += (UNSAFE.getByte(i++) - 48); // char 3 } - finally { - lock.unlock(); - } - } - } - - public static void main(String[] args) throws IOException, InterruptedException { - int numThreads = 1; - if (channel.size() > 64000) { - numThreads = Runtime.getRuntime().availableProcessors(); - } - Chunk[] chunks = getChunks(numThreads, channel); - Thread[] threads = new Thread[chunks.length]; - for (int i = 0; i < chunks.length; i++) { - Thread thread = new Thread(new CalculateTask(chunks[i])); - thread.setPriority(Thread.MAX_PRIORITY); - thread.start(); - threads[i] = thread; - } - for (Thread t : threads) { - t.join(); - } - // create treemap just to sort - System.out.println(new TreeMap<>(global)); - } - - record MapEntry(String key, ResultRow row) { - } - - static class SimplerHashMap { - // can't have more than 10000 unique keys but want to match max hash - final int MAPSIZE = 65536; - final ResultRow[] slots = new ResultRow[MAPSIZE]; - final byte[][] keys = new byte[MAPSIZE][]; - public void putOrMerge(final byte[] key, final short length, final int hash, final int temp) { - int slot = hash; - ResultRow slotValue; + i++;// nl + // xor folding + slot = (int) (hash ^ hash >> 32) & 65535; // Linear probe for open slot - while ((slotValue = slots[slot]) != null && (keys[slot].length != length || !unsafeEquals(keys[slot], key, length))) { - slot++; + while (slots[slot] != null && !unsafeEquals(keys[slot], start, nameLength)) { + slot = ++slot % MAPSIZE; } + slotValue = slots[slot]; // existing if (slotValue != null) { @@ -247,69 +198,110 @@ public void putOrMerge(final byte[] key, final short length, final int hash, fin slotValue.max = Math.max(slotValue.max, temp); slotValue.sum += temp; slotValue.count++; - return; } - - // new value - slots[slot] = new ResultRow(temp); - byte[] bytes = new byte[length]; - System.arraycopy(key, 0, bytes, 0, length); - keys[slot] = bytes; + else { + // new value + slots[slot] = new ResultRow(temp); + byte[] bytes = new byte[nameLength]; + // copy the name bytes + UNSAFE.copyMemory(null, start, bytes, Unsafe.ARRAY_BYTE_BASE_OFFSET, nameLength); + keys[slot] = bytes; + } } - static boolean unsafeEquals(final byte[] a, final byte[] b, final short length) { - // byte by byte comparisons are slow, so do as big chunks as possible - final int baseOffset = Unsafe.ARRAY_BYTE_BASE_OFFSET; + // merge results with overall results + final List result = new ArrayList<>(); + for (int j = 0; j < slots.length; j++) { + slotValue = slots[j]; + if (slotValue != null) { + result.add(new MapEntry(new String(keys[j], StandardCharsets.UTF_8), slotValue)); + } + } - short i = 0; - // round down to nearest power of 8 - for (; i < (length & -8); i += 8) { - if (UNSAFE.getLong(a, i + baseOffset) != UNSAFE.getLong(b, i + baseOffset)) { - return false; + lock.lock(); + try { + ResultRow rr; + ResultRow lr; + for (MapEntry me : result) { + lr = me.row; + if ((rr = global.get(me.key)) != null) { + rr.min = Math.min(rr.min, lr.min); + rr.max = Math.max(rr.max, lr.max); + rr.count += lr.count; + rr.sum += lr.sum; } - } - if (i == length) { - return true; - } - // leftover ints - for (; i < (length - i & -4); i += 4) { - if (UNSAFE.getInt(a, i + baseOffset) != UNSAFE.getInt(b, i + baseOffset)) { - return false; + else { + global.put(me.key, lr); } } - if (i == length) { - return true; + } + finally { + lock.unlock(); + } + + } + + static boolean unsafeEquals(final byte[] a, final long b_address, final short length) { + // byte by byte comparisons are slow, so do as big chunks as possible + final int baseOffset = Unsafe.ARRAY_BYTE_BASE_OFFSET; + + short i = 0; + // round down to nearest power of 8 + for (; i < (length & -8); i += 8) { + if (UNSAFE.getLong(a, i + baseOffset) != UNSAFE.getLong(b_address + i)) { + return false; } - // leftover shorts - for (; i < (length - i & -2); i += 2) { - if (UNSAFE.getShort(a, i + baseOffset) != UNSAFE.getShort(b, i + baseOffset)) { - return false; - } + } + // leftover ints + for (; i < (length - i & -4); i += 4) { + if (UNSAFE.getInt(a, i + baseOffset) != UNSAFE.getInt(b_address + i)) { + return false; } - if (i == length) { - return true; + } + // leftover shorts + for (; i < (length - i & -2); i += 2) { + if (UNSAFE.getShort(a, i + baseOffset) != UNSAFE.getShort(b_address + i)) { + return false; } - // leftover bytes - for (; i < (length - i); i++) { - if (UNSAFE.getByte(a, i + baseOffset) != UNSAFE.getByte(b, i + baseOffset)) { - return false; - } + } + // leftover bytes + for (; i < length - i; i++) { + if (UNSAFE.getByte(a, i + baseOffset) != UNSAFE.getByte(b_address + i)) { + return false; } - - return true; } - // Get all pairs - public List getAll() { - final List result = new ArrayList<>(slots.length); - for (int i = 0; i < slots.length; i++) { - ResultRow slotValue = slots[i]; - if (slotValue != null) { - result.add(new MapEntry(new String(keys[i], StandardCharsets.UTF_8), slotValue)); - } - } - return result; + return true; + } + + private static long hasSemiColon(final long n) { + // long filled just with semicolon + // taken from https://graphics.stanford.edu/~seander/bithacks.html#ValueInWord + final long v = n ^ ALL_SEMIS; + return (v - 0x0101010101010101L) & (~v & 0x8080808080808080L); + } + + public static void main(String[] args) throws IOException, InterruptedException { + int numThreads = 1; + FileChannel channel = new RandomAccessFile(FILE, "r").getChannel(); + if (channel.size() > 64000) { + numThreads = Runtime.getRuntime().availableProcessors(); } + Chunk[] chunks = getChunks(numThreads, channel); + Thread[] threads = new Thread[chunks.length]; + for (int i = 0; i < chunks.length; i++) { + int finalI = i; + Thread thread = new Thread(() -> run(chunks[finalI])); + thread.setPriority(Thread.MAX_PRIORITY); + threads[i] = thread; + thread.start(); + } + for (Thread t : threads) { + t.join(); + } + System.out.println(global); } + record MapEntry(String key, ResultRow row) { + } } From fc897b6c1f89152ab87396757ca731a405fccd35 Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Sat, 20 Jan 2024 22:30:49 +0000 Subject: [PATCH 19/21] Add back length check --- .../java/dev/morling/onebrc/CalculateAverage_JamalMulla.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index c82db0ec9..b541d3779 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -187,7 +187,7 @@ else if (hs == 0x8000000000000000L) slot = (int) (hash ^ hash >> 32) & 65535; // Linear probe for open slot - while (slots[slot] != null && !unsafeEquals(keys[slot], start, nameLength)) { + while (slots[slot] != null && nameLength != keys[slot].length && !unsafeEquals(keys[slot], start, nameLength)) { slot = ++slot % MAPSIZE; } slotValue = slots[slot]; From b58548ed07e1697cdf8ff0a8265d25f93bd51b86 Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Sun, 21 Jan 2024 01:49:11 +0000 Subject: [PATCH 20/21] Fixes --- calculate_average_JamalMulla.sh | 1 - prepare_JamalMulla.sh | 2 +- .../onebrc/CalculateAverage_JamalMulla.java | 103 +++++++----------- 3 files changed, 42 insertions(+), 64 deletions(-) diff --git a/calculate_average_JamalMulla.sh b/calculate_average_JamalMulla.sh index c14034b80..119263bad 100755 --- a/calculate_average_JamalMulla.sh +++ b/calculate_average_JamalMulla.sh @@ -20,7 +20,6 @@ if [ -f target/CalculateAverage_JamalMulla_image ]; then target/CalculateAverage_JamalMulla_image else - echo "JVM version" JAVA_OPTS="--enable-preview -XX:+UnlockExperimentalVMOptions -XX:+TrustFinalNonStaticFields -XX:+UseTransparentHugePages -XX:-TieredCompilation" java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_JamalMulla fi \ No newline at end of file diff --git a/prepare_JamalMulla.sh b/prepare_JamalMulla.sh index 8b4fc401d..4d4403141 100755 --- a/prepare_JamalMulla.sh +++ b/prepare_JamalMulla.sh @@ -20,6 +20,6 @@ sdk use java 21.0.1-graal 1>&2 # ./mvnw clean verify removes target/ and will re-trigger native image creation. if [ ! -f target/CalculateAverage_JamalMulla_image ]; then - NATIVE_IMAGE_OPTS="--gc=epsilon -O3 -march=native --enable-preview --strict-image-heap -R:MaxHeapSize=64m --link-at-build-time -da -dsa --no-fallback" + NATIVE_IMAGE_OPTS="--gc=epsilon -O3 -march=native --enable-preview --strict-image-heap --link-at-build-time -R:MaxHeapSize=64m -da -dsa --no-fallback --initialize-at-build-time=dev.morling.onebrc.CalculateAverage_JamalMulla" native-image $NATIVE_IMAGE_OPTS -cp target/average-1.0.0-SNAPSHOT.jar -o target/CalculateAverage_JamalMulla_image dev.morling.onebrc.CalculateAverage_JamalMulla fi \ No newline at end of file diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index b541d3779..03fedaf5a 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -23,8 +23,6 @@ import java.lang.reflect.Field; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.locks.Lock; @@ -90,11 +88,10 @@ static Chunk[] getChunks(int numThreads, FileChannel channel) throws IOException chunkLength++; } - chunks[i] = new Chunk(mappedAddress + chunkStart, chunkLength + 1); + chunks[i++] = new Chunk(mappedAddress + chunkStart, chunkLength + 1); // to skip the nl in the next chunk chunkStart += chunkLength + 1; chunkLength = Math.min(filebytes - chunkStart - 1, roughChunkSize); - i++; } return chunks; @@ -107,7 +104,7 @@ private static void run(Chunk chunk) { final ResultRow[] slots = new ResultRow[MAPSIZE]; final byte[][] keys = new byte[MAPSIZE][]; - byte nameLength; + int nameLength; int temp; long hash; @@ -132,62 +129,52 @@ private static void run(Chunk chunk) { word = UNSAFE.getLong(i); } - if (hs == 0x8000L || hs == -0x7FFFFFFFFFFF8000L) - i++; - else if (hs == 0x800000L) - i += 2; - else if (hs == 0x80000000L) - i += 3; - else if (hs == 0x8000000000L) - i += 4; - else if (hs == 0x800000000000L) - i += 5; - else if (hs == 0x80000000000000L) - i += 6; - else if (hs == 0x8000000000000000L) - i += 7; + i += Long.numberOfTrailingZeros(hs) >> 3; // fxhash of what's left ((hs >>> 7) - 1) masks off the bytes from word that are before the semicolon hash = (Long.rotateLeft(hash, 5) ^ word & (hs >>> 7) - 1) * FXSEED; - nameLength = (byte) (i++ - start); + nameLength = (int) (i++ - start); // temperature value follows c = UNSAFE.getByte(i++); // we know the val has to be between -99.9 and 99.8 // always with a single fractional digit // represented as a byte array of either 4 or 5 characters - if (c == 0x2D /* minus sign */) { + if (c != 0x2D /* minus sign */) { // could be either n.x or nn.x - if (UNSAFE.getByte(i + 3) == 0xA) { - temp = (UNSAFE.getByte(i++) - 48) * 10; // char 1 + if (UNSAFE.getByte(i + 2) == 0xA) { + temp = (c - 48) * 10; // char 1 } else { - temp = (UNSAFE.getByte(i++) - 48) * 100; // char 1 + temp = (c - 48) * 100; // char 1 temp += (UNSAFE.getByte(i++) - 48) * 10; // char 2 } - i++; // skip dot - temp += (UNSAFE.getByte(i++) - 48); // char 2 - temp = -temp; + // i++; // skip dot + temp += (UNSAFE.getByte(++i) - 48); // char 3 } else { // could be either n.x or nn.x - if (UNSAFE.getByte(i + 2) == 0xA) { - temp = (c - 48) * 10; // char 1 + if (UNSAFE.getByte(i + 3) == 0xA) { + temp = (UNSAFE.getByte(i) - 48) * 10; // char 1 + i += 2; } else { - temp = (c - 48) * 100; // char 1 - temp += (UNSAFE.getByte(i++) - 48) * 10; // char 2 + temp = (UNSAFE.getByte(i) - 48) * 100; // char 1 + temp += (UNSAFE.getByte(i + 1) - 48) * 10; // char 2 + i += 3; } - i++; // skip dot - temp += (UNSAFE.getByte(i++) - 48); // char 3 + // i++; // skip dot + temp += (UNSAFE.getByte(i) - 48); // char 2 + temp = -temp; } + i += 2; - i++;// nl + // i++;// nl // xor folding slot = (int) (hash ^ hash >> 32) & 65535; // Linear probe for open slot - while (slots[slot] != null && nameLength != keys[slot].length && !unsafeEquals(keys[slot], start, nameLength)) { + while (slots[slot] != null && !unsafeEquals(keys[slot], start, nameLength)) { slot = ++slot % MAPSIZE; } slotValue = slots[slot]; @@ -210,28 +197,23 @@ else if (hs == 0x8000000000000000L) } // merge results with overall results - final List result = new ArrayList<>(); - for (int j = 0; j < slots.length; j++) { - slotValue = slots[j]; - if (slotValue != null) { - result.add(new MapEntry(new String(keys[j], StandardCharsets.UTF_8), slotValue)); - } - } - + ResultRow rr; + String key; lock.lock(); try { - ResultRow rr; - ResultRow lr; - for (MapEntry me : result) { - lr = me.row; - if ((rr = global.get(me.key)) != null) { - rr.min = Math.min(rr.min, lr.min); - rr.max = Math.max(rr.max, lr.max); - rr.count += lr.count; - rr.sum += lr.sum; - } - else { - global.put(me.key, lr); + for (int j = 0; j < slots.length; j++) { + slotValue = slots[j]; + if (slotValue != null) { + key = new String(keys[j], StandardCharsets.UTF_8); + if ((rr = global.get(key)) != null) { + rr.min = Math.min(rr.min, slotValue.min); + rr.max = Math.max(rr.max, slotValue.max); + rr.count += slotValue.count; + rr.sum += slotValue.sum; + } + else { + global.put(key, slotValue); + } } } } @@ -241,11 +223,11 @@ else if (hs == 0x8000000000000000L) } - static boolean unsafeEquals(final byte[] a, final long b_address, final short length) { + static boolean unsafeEquals(final byte[] a, final long b_address, final int length) { // byte by byte comparisons are slow, so do as big chunks as possible final int baseOffset = Unsafe.ARRAY_BYTE_BASE_OFFSET; - short i = 0; + int i = 0; // round down to nearest power of 8 for (; i < (length & -8); i += 8) { if (UNSAFE.getLong(a, i + baseOffset) != UNSAFE.getLong(b_address + i)) { @@ -277,7 +259,7 @@ static boolean unsafeEquals(final byte[] a, final long b_address, final short le private static long hasSemiColon(final long n) { // long filled just with semicolon // taken from https://graphics.stanford.edu/~seander/bithacks.html#ValueInWord - final long v = n ^ ALL_SEMIS; + long v = n ^ ALL_SEMIS; return (v - 0x0101010101010101L) & (~v & 0x8080808080808080L); } @@ -293,15 +275,12 @@ public static void main(String[] args) throws IOException, InterruptedException int finalI = i; Thread thread = new Thread(() -> run(chunks[finalI])); thread.setPriority(Thread.MAX_PRIORITY); - threads[i] = thread; thread.start(); + threads[i] = thread; } for (Thread t : threads) { t.join(); } System.out.println(global); } - - record MapEntry(String key, ResultRow row) { - } } From 2d13d0295b736119f9a1c7dbcfa5b7137f762c55 Mon Sep 17 00:00:00 2001 From: Jamal Mulla Date: Sat, 27 Jan 2024 22:35:04 +0000 Subject: [PATCH 21/21] Small changes --- prepare_JamalMulla.sh | 2 +- .../onebrc/CalculateAverage_JamalMulla.java | 125 +++++++++--------- 2 files changed, 60 insertions(+), 67 deletions(-) diff --git a/prepare_JamalMulla.sh b/prepare_JamalMulla.sh index 4d4403141..d950d43ce 100755 --- a/prepare_JamalMulla.sh +++ b/prepare_JamalMulla.sh @@ -16,7 +16,7 @@ # source "$HOME/.sdkman/bin/sdkman-init.sh" -sdk use java 21.0.1-graal 1>&2 +sdk use java 21.0.2-graal 1>&2 # ./mvnw clean verify removes target/ and will re-trigger native image creation. if [ ! -f target/CalculateAverage_JamalMulla_image ]; then diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java index 03fedaf5a..7daf1997f 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JamalMulla.java @@ -37,6 +37,17 @@ public class CalculateAverage_JamalMulla { private static final Lock lock = new ReentrantLock(); private static final long FXSEED = 0x517cc1b727220a95L; + private static final long[] masks = { + 0x0, + 0x00000000000000FFL, + 0x000000000000FFFFL, + 0x0000000000FFFFFFL, + 0x00000000FFFFFFFFL, + 0x000000FFFFFFFFFFL, + 0x0000FFFFFFFFFFFFL, + 0x00FFFFFFFFFFFFFFL + }; + private static Unsafe initUnsafe() { try { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); @@ -53,17 +64,21 @@ private static final class ResultRow { private int max; private long sum; private int count; + private final long keyStart; + private final byte keyLength; - private ResultRow(int v) { + private ResultRow(int v, final long keyStart, final byte keyLength) { this.min = v; this.max = v; this.sum = v; this.count = 1; + this.keyStart = keyStart; + this.keyLength = keyLength; } public String toString() { - return STR."\{round(min)}/\{round((double) (sum) / count)}/\{round(max)}"; - } + return round(min) + "/" + round((double) (sum) / count) + "/" + round(max); + } private double round(double value) { return Math.round(value) / 10.0; @@ -102,9 +117,8 @@ private static void run(Chunk chunk) { // can't have more than 10000 unique keys but want to match max hash final int MAPSIZE = 65536; final ResultRow[] slots = new ResultRow[MAPSIZE]; - final byte[][] keys = new byte[MAPSIZE][]; - int nameLength; + byte nameLength; int temp; long hash; @@ -115,6 +129,7 @@ private static void run(Chunk chunk) { long start; byte c; int slot; + long n; ResultRow slotValue; while (i < cl) { @@ -123,17 +138,21 @@ private static void run(Chunk chunk) { word = UNSAFE.getLong(i); - while ((hs = hasSemiColon(word)) == 0) { - hash = (Long.rotateLeft(hash, 5) ^ word) * FXSEED; // fxhash + while (true) { + n = word ^ ALL_SEMIS; + hs = (n - 0x0101010101010101L) & (~n & 0x8080808080808080L); + if (hs != 0) + break; + hash = (hash ^ word) * FXSEED; i += 8; word = UNSAFE.getLong(i); } i += Long.numberOfTrailingZeros(hs) >> 3; - // fxhash of what's left ((hs >>> 7) - 1) masks off the bytes from word that are before the semicolon - hash = (Long.rotateLeft(hash, 5) ^ word & (hs >>> 7) - 1) * FXSEED; - nameLength = (int) (i++ - start); + // hash of what's left ((hs >>> 7) - 1) masks off the bytes from word that are before the semicolon + hash = (hash ^ word & (hs >>> 7) - 1) * FXSEED; + nameLength = (byte) (i++ - start); // temperature value follows c = UNSAFE.getByte(i++); @@ -149,7 +168,6 @@ private static void run(Chunk chunk) { temp = (c - 48) * 100; // char 1 temp += (UNSAFE.getByte(i++) - 48) * 10; // char 2 } - // i++; // skip dot temp += (UNSAFE.getByte(++i) - 48); // char 3 } else { @@ -163,56 +181,57 @@ private static void run(Chunk chunk) { temp += (UNSAFE.getByte(i + 1) - 48) * 10; // char 2 i += 3; } - // i++; // skip dot temp += (UNSAFE.getByte(i) - 48); // char 2 temp = -temp; } i += 2; - // i++;// nl // xor folding slot = (int) (hash ^ hash >> 32) & 65535; // Linear probe for open slot - while (slots[slot] != null && !unsafeEquals(keys[slot], start, nameLength)) { - slot = ++slot % MAPSIZE; + while ((slotValue = slots[slot]) != null && (slotValue.keyLength != nameLength || !unsafeEquals(slotValue.keyStart, start, nameLength))) { + slot = (slot + 1) % MAPSIZE; } - slotValue = slots[slot]; // existing if (slotValue != null) { - slotValue.min = Math.min(slotValue.min, temp); - slotValue.max = Math.max(slotValue.max, temp); slotValue.sum += temp; slotValue.count++; + if (temp > slotValue.max) { + slotValue.max = temp; + continue; + } + if (temp < slotValue.min) + slotValue.min = temp; + } else { // new value - slots[slot] = new ResultRow(temp); - byte[] bytes = new byte[nameLength]; - // copy the name bytes - UNSAFE.copyMemory(null, start, bytes, Unsafe.ARRAY_BYTE_BASE_OFFSET, nameLength); - keys[slot] = bytes; + slots[slot] = new ResultRow(temp, start, nameLength); } } // merge results with overall results ResultRow rr; String key; + byte[] bytes; lock.lock(); try { - for (int j = 0; j < slots.length; j++) { - slotValue = slots[j]; - if (slotValue != null) { - key = new String(keys[j], StandardCharsets.UTF_8); + for (ResultRow resultRow : slots) { + if (resultRow != null) { + bytes = new byte[resultRow.keyLength]; + // copy the name bytes + UNSAFE.copyMemory(null, resultRow.keyStart, bytes, Unsafe.ARRAY_BYTE_BASE_OFFSET, resultRow.keyLength); + key = new String(bytes, StandardCharsets.UTF_8); if ((rr = global.get(key)) != null) { - rr.min = Math.min(rr.min, slotValue.min); - rr.max = Math.max(rr.max, slotValue.max); - rr.count += slotValue.count; - rr.sum += slotValue.sum; + rr.min = Math.min(rr.min, resultRow.min); + rr.max = Math.max(rr.max, resultRow.max); + rr.count += resultRow.count; + rr.sum += resultRow.sum; } else { - global.put(key, slotValue); + global.put(key, resultRow); } } } @@ -223,44 +242,17 @@ private static void run(Chunk chunk) { } - static boolean unsafeEquals(final byte[] a, final long b_address, final int length) { + static boolean unsafeEquals(final long a_address, final long b_address, final byte b_length) { // byte by byte comparisons are slow, so do as big chunks as possible - final int baseOffset = Unsafe.ARRAY_BYTE_BASE_OFFSET; - - int i = 0; - // round down to nearest power of 8 - for (; i < (length & -8); i += 8) { - if (UNSAFE.getLong(a, i + baseOffset) != UNSAFE.getLong(b_address + i)) { - return false; - } - } - // leftover ints - for (; i < (length - i & -4); i += 4) { - if (UNSAFE.getInt(a, i + baseOffset) != UNSAFE.getInt(b_address + i)) { + byte i = 0; + for (; i < (b_length & -8); i += 8) { + if (UNSAFE.getLong(a_address + i) != UNSAFE.getLong(b_address + i)) { return false; } } - // leftover shorts - for (; i < (length - i & -2); i += 2) { - if (UNSAFE.getShort(a, i + baseOffset) != UNSAFE.getShort(b_address + i)) { - return false; - } - } - // leftover bytes - for (; i < length - i; i++) { - if (UNSAFE.getByte(a, i + baseOffset) != UNSAFE.getByte(b_address + i)) { - return false; - } - } - - return true; - } - - private static long hasSemiColon(final long n) { - // long filled just with semicolon - // taken from https://graphics.stanford.edu/~seander/bithacks.html#ValueInWord - long v = n ^ ALL_SEMIS; - return (v - 0x0101010101010101L) & (~v & 0x8080808080808080L); + if (i == b_length) + return true; + return (UNSAFE.getLong(a_address + i) & masks[b_length - i]) == (UNSAFE.getLong(b_address + i) & masks[b_length - i]); } public static void main(String[] args) throws IOException, InterruptedException { @@ -282,5 +274,6 @@ public static void main(String[] args) throws IOException, InterruptedException t.join(); } System.out.println(global); + channel.close(); } }