Skip to content

Commit

Permalink
Merge pull request #8 from ambud/main
Browse files Browse the repository at this point in the history
Optimize file system reads in MemQ to leverage channels to reduce memory copy
  • Loading branch information
ambud authored Aug 19, 2022
2 parents 34d120f + 167fb66 commit 93fe718
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 100 deletions.
13 changes: 8 additions & 5 deletions memq-client/pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.pinterest.memq</groupId>
Expand Down Expand Up @@ -56,7 +54,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version><!--$NO-MVN-MAN-VER$ -->
<version>4.12</version> <!--$NO-MVN-MAN-VER$ -->
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -98,6 +96,11 @@
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.8.0</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down Expand Up @@ -150,4 +153,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public enum Compression {
ZSTD(2, 0,
is -> new ZstdInputStreamNoFinalizer(is, RecyclingBufferPool.INSTANCE),
os -> new ZstdOutputStreamNoFinalizer(os, RecyclingBufferPool.INSTANCE)
);
)));

public byte id;
public int minBufferSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
Expand Down Expand Up @@ -78,6 +80,7 @@
* This StorageHandler is used for local filesystem based PubSub.
*/
@StorageHandlerName(name = "filesystem")
@SuppressWarnings("unused")
public class FileSystemStorageHandler extends ReadBrokerStorageHandler {

public static final String OPTIMIZATION_SENDFILE = "optimization.sendfile";
Expand Down Expand Up @@ -125,8 +128,7 @@ public void initWriter(Properties properties,
this.notificationPublishingTimer = MiscUtils.oneMinuteWindowTimer(registry,
"output.notification.publish.latency");
}
this.persistTimer = MiscUtils.oneMinuteWindowTimer(registry,
"storage.persist.latency");
this.persistTimer = MiscUtils.oneMinuteWindowTimer(registry, "storage.persist.latency");
this.timeoutExceptionCounter = registry.counter("output.timeout.exceptions");
this.requestExecutor = Executors.newCachedThreadPool(new DaemonThreadFactory());
this.executionTimer = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory());
Expand Down Expand Up @@ -169,7 +171,7 @@ public void writeOutput(int sizeInBytes,
CompositeByteBuf content = CustomS3Async2StorageHandler
.messageAndHeaderToCompositeBuffer(buffers, batchHeader);
Context persistTime = persistTimer.time();
String writePath = speculativeUpload(relativeFileName, content);
String writePath = speculativeUpload(sizeInBytes, relativeFileName, content);
persistTime.stop();
buffers.forEach(ReferenceCounted::release);
content.release();
Expand All @@ -189,7 +191,8 @@ public void writeOutput(int sizeInBytes,
}
}

private String speculativeUpload(String baseFileName,
private String speculativeUpload(int sizeInBytes,
String baseFileName,
CompositeByteBuf content) throws WriteFailedException {
int attempt = 0;
Map<String, Future<String>> futureMap = new HashMap<>();
Expand All @@ -198,52 +201,68 @@ private String speculativeUpload(String baseFileName,
final int currentRetryTimeoutMs = retryTimeoutMillis;
String result = null;
boolean hasSucceeded = false;
while (attempt < maxAttempts) {
final int timeout = attempt == currentMaxAttempts - 1 ? LAST_ATTEMPT_TIMEOUT
: currentRetryTimeoutMs;
CompletableFuture<String> task = new CompletableFuture<>();
if (maxAttempts == 1) {
// optimize for single write path and don't use executors
final int localAttemptId = attempt;
// add attempt number as a suffix
final String key = topic + "/" + HOSTNAME + "/" + baseFileName + "_" + localAttemptId;
Callable<String> uploadAttempt = writeFileTask(content, task, localAttemptId, key);
Future<String> future = requestExecutor.submit(uploadAttempt);
futureMap.put(key, future);
taskMap.put(key, task);

CompletableFuture<String> resultFuture = anyUploadResultOrTimeout(taskMap.values(),
Duration.ofMillis(timeout));
CompletableFuture<String> task = new CompletableFuture<>();
String key = new StringBuilder().append(topic).append("/").append(HOSTNAME)
.append("/").append(baseFileName).append("_").append(localAttemptId).toString();
try {
result = resultFuture.get();
registry.counter("storage.fs.succeeded").inc();
hasSucceeded = true;
} catch (ExecutionException ee) {
if (ee.getCause() instanceof TimeoutException) {
timeoutExceptionCounter.inc();
} else {
logger.log(Level.SEVERE, "Request failed", ee);
}
return writeFileTask(sizeInBytes, content, task, localAttemptId, key).call();
} catch (Exception e) {
logger.log(Level.SEVERE, "Request failed", e);
}
attempt++;
}
for (Map.Entry<String, Future<String>> entry : futureMap.entrySet()) {
if (result != null && entry.getKey().endsWith(result)) {
continue;
throw new WriteFailedException(e);
}
entry.getValue().cancel(true);
}
if (result == null) {
throw new WriteFailedException("All upload attempts failed");
} else if (!hasSucceeded) {
throw new WriteFailedException("Upload failed due to error out: " + result);
} else {
registry.counter("storage.fs.attempt." + attempt).inc();
return result;
while (attempt < maxAttempts) {
final int timeout = attempt == currentMaxAttempts - 1 ? LAST_ATTEMPT_TIMEOUT
: currentRetryTimeoutMs;
CompletableFuture<String> task = new CompletableFuture<>();
final int localAttemptId = attempt;
// add attempt number as a suffix
final String key = new StringBuilder().append(topic).append("/").append(HOSTNAME)
.append("/").append(baseFileName).append("_").append(localAttemptId).toString();
Callable<String> uploadAttempt = writeFileTask(sizeInBytes, content, task, localAttemptId,
key);
Future<String> future = requestExecutor.submit(uploadAttempt);
futureMap.put(key, future);
taskMap.put(key, task);

CompletableFuture<String> resultFuture = anyUploadResultOrTimeout(taskMap.values(),
Duration.ofMillis(timeout));
try {
result = resultFuture.get();
registry.counter("storage.fs.succeeded").inc();
hasSucceeded = true;
} catch (ExecutionException ee) {
if (ee.getCause() instanceof TimeoutException) {
timeoutExceptionCounter.inc();
} else {
logger.log(Level.SEVERE, "Request failed", ee);
}
} catch (Exception e) {
logger.log(Level.SEVERE, "Request failed", e);
}
attempt++;
}
for (Map.Entry<String, Future<String>> entry : futureMap.entrySet()) {
if (result != null && entry.getKey().endsWith(result)) {
continue;
}
entry.getValue().cancel(true);
}
if (result == null) {
throw new WriteFailedException("All upload attempts failed");
} else if (!hasSucceeded) {
throw new WriteFailedException("Upload failed due to error out: " + result);
} else {
registry.counter("storage.fs.attempt." + attempt).inc();
return result;
}
}
}

public Callable<String> writeFileTask(CompositeByteBuf content,
public Callable<String> writeFileTask(int sizeInBytes,
CompositeByteBuf content,
CompletableFuture<String> task,
final int attemptId,
final String key) {
Expand All @@ -253,14 +272,8 @@ public String call() throws Exception {
try {
File fileToWrite = new File(storageDirs[attemptId], key);
fileToWrite.getParentFile().mkdirs();
try (OutputStream os = new BufferedOutputStream(new FileOutputStream(fileToWrite))) {
if (!dryrun) {
content.readBytes(os, content.readableBytes());
}
os.close();
} catch (Exception e) {
throw new WriteFailedException(e);
}
writeViaChannel(sizeInBytes, content, fileToWrite);
// writeViaOutputstream(content, fileToWrite);
String ur = fileToWrite.getAbsolutePath();
task.complete(ur);
return ur;
Expand All @@ -269,6 +282,37 @@ public String call() throws Exception {
throw e;
}
}

private void writeViaOutputstream(CompositeByteBuf content,
File fileToWrite) throws WriteFailedException {
try (OutputStream os = new BufferedOutputStream(new FileOutputStream(fileToWrite))) {
if (!dryrun) {
byte[] buffer = new byte[content.readableBytes()];
content.readBytes(buffer);
os.write(buffer);
// content.readBytes(os, content.readableBytes());
}
os.close();
FileChannel fileChannel = null;
} catch (Exception e) {
throw new WriteFailedException(e);
}
}

private void writeViaChannel(int sizeInBytes,
CompositeByteBuf content,
File fileToWrite) throws FileNotFoundException, IOException {
RandomAccessFile raf = new RandomAccessFile(fileToWrite, "rw");
FileChannel channel = raf.getChannel();
int readableBytes = content.readableBytes();
int bytesRead = 0;
while (bytesRead < readableBytes) {
bytesRead += content.readBytes(channel, 0, readableBytes - bytesRead);
}
channel.force(true);
channel.close();
raf.close();
}
};
}

Expand Down Expand Up @@ -395,8 +439,7 @@ public BatchHeader fetchHeaderForBatch(JsonObject objectNotification) throws IOE
} else {
BatchData batch;
try {
batch = readBatchHeader(objectNotification.get(TOPIC).getAsString(),
objectNotification);
batch = readBatchHeader(objectNotification.get(TOPIC).getAsString(), objectNotification);
} catch (Exception e) {
throw new IOException(e);
}
Expand Down Expand Up @@ -434,8 +477,8 @@ public DataInputStream fetchMessageAtIndex(JsonObject objectNotification,
try {
logger.fine(
() -> "Making index message fetch request:" + objectNotification + " index:" + index);
batch = readBatchAtIndex(objectNotification.get(TOPIC).getAsString(),
objectNotification, index);
batch = readBatchAtIndex(objectNotification.get(TOPIC).getAsString(), objectNotification,
index);
} catch (Exception e) {
throw new IOException(e);
}
Expand All @@ -451,8 +494,7 @@ public DataInputStream fetchMessageAtIndex(JsonObject objectNotification,
* @return
*/
protected boolean isValidReadRequest(String topic, String filePath) {
return storageDirList.stream()
.anyMatch(storageDir -> filePath.startsWith(storageDir));
return storageDirList.stream().anyMatch(storageDir -> filePath.startsWith(storageDir));
}

public CompletableFuture<String> anyUploadResultOrTimeout(Collection<CompletableFuture<String>> tasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

import org.apache.commons.codec.digest.DigestUtils;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -447,7 +449,7 @@ public static List<ByteBuf> messageToBufferList(List<Message> messages) {

public static CompositeByteBuf messageAndHeaderToCompositeBuffer(final List<ByteBuf> messageByteBufs,
ByteBuf batchHeaders) {
CompositeByteBuf byteBuf = ByteBufAllocator.DEFAULT.compositeBuffer();
CompositeByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.compositeBuffer();
byteBuf.addComponent(true, batchHeaders.retainedDuplicate());
byteBuf.addComponents(true,
messageByteBufs.stream().map(ByteBuf::retainedDuplicate).collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public static byte[] getMemqBatchData(String baseLogMessage,
for (ByteBuffer byteBuffer : bufList) {
os.write(byteBuffer.array());
}
System.out.println("Created:" + bufList.size() + " buffers");
return os.toByteArray();
}

Expand All @@ -96,7 +95,7 @@ public static byte[] createMessage(String baseLogMessage,
List<byte[]> messageIdHashes,
boolean enableTestHeaders) throws IOException {
Semaphore maxRequestLock = new Semaphore(1);
MemqNettyRequest task = new MemqNettyRequest("xyz", 1L, Compression.GZIP,
MemqNettyRequest task = new MemqNettyRequest("xyz", 1L, compression,
maxRequestLock, true, 1024 * 1024, 100, null, null, 10_000, false);
for (int k = 0; k < logMessageCount; k++) {
byte[] bytes = getLogMessageBytes.apply(baseLogMessage, k);
Expand Down
Loading

0 comments on commit 93fe718

Please sign in to comment.