Skip to content

Commit

Permalink
Merge pull request #5 from kmgowda/kmg-pulsar
Browse files Browse the repository at this point in the history
Apache pulsar benchmarking Implementation
  • Loading branch information
kmgowda authored Jan 7, 2020
2 parents 84720b0 + 268bcb6 commit 46edffb
Show file tree
Hide file tree
Showing 12 changed files with 287 additions and 32 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ buildscript {
"io.pravega:pravega-common:0.5.0",
"commons-cli:commons-cli:1.3.1",
"org.apache.commons:commons-csv:1.5",
"org.apache.kafka:kafka-clients:2.3.0"
"org.apache.kafka:kafka-clients:2.3.0",
"org.apache.pulsar:pulsar-client:2.4.2"

runtime "org.slf4j:slf4j-simple:1.7.14"
}
Expand Down
124 changes: 113 additions & 11 deletions src/main/java/io/perf/PerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import io.perf.drivers.kafka.KafkaReaderWorker;
import io.perf.drivers.kafka.KafkaWriterWorker;

import io.perf.drivers.pulsar.PulsarWriterWorker;
import io.perf.drivers.pulsar.PulsarReaderWorker;

import io.pravega.client.ClientConfig;
import io.pravega.client.ClientFactory;
import io.pravega.client.stream.ReaderGroup;
Expand All @@ -34,6 +37,8 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import org.apache.pulsar.client.api.PulsarClient;

import java.io.IOException;
import java.net.URISyntaxException;

Expand All @@ -59,6 +64,7 @@
import java.util.concurrent.ExecutorService;
import java.util.Properties;
import java.util.Locale;
import java.io.IOException;

/**
* Performance benchmark for Pravega.
Expand Down Expand Up @@ -102,6 +108,7 @@ public static void main(String[] args) {
options.addOption("readcsv", true, "CSV file to record read latencies");
options.addOption("fork", true, "Use Fork join Pool");
options.addOption("kafka", true, "Kafka Benchmarking");
options.addOption("pulsar", true, "Pulsar Benchmarking");

options.addOption("help", false, "Help message");

Expand Down Expand Up @@ -143,10 +150,22 @@ public void run() {
executor.awaitTermination(1, TimeUnit.SECONDS);
perfTest.shutdown(System.currentTimeMillis());
if (consumers != null) {
consumers.forEach(ReaderWorker::close);
consumers.forEach(c-> {
try {
c.close();
} catch (IOException ex) {
ex.printStackTrace();
}
});
}
if (producers != null) {
producers.forEach(WriterWorker::close);
producers.forEach(c-> {
try {
c.close();
} catch (IOException ex) {
ex.printStackTrace();
}
});
}
perfTest.closeReaderGroup();
} catch (InterruptedException ex) {
Expand All @@ -160,10 +179,22 @@ public void run() {
executor.awaitTermination(1, TimeUnit.SECONDS);
perfTest.shutdown(System.currentTimeMillis());
if (consumers != null) {
consumers.forEach(ReaderWorker::close);
consumers.forEach(c-> {
try {
c.close();
} catch (IOException ex) {
ex.printStackTrace();
}
});
}
if (producers != null) {
producers.forEach(WriterWorker::close);
producers.forEach(c-> {
try {
c.close();
} catch (IOException ex) {
ex.printStackTrace();
}
});
}
perfTest.closeReaderGroup();
} catch (Exception ex) {
Expand All @@ -178,7 +209,12 @@ public static Test createTest(long startTime, CommandLine commandline, Options o
if (runKafka) {
return new KafkaTest(startTime, commandline);
} else {
return new PravegaTest(startTime, commandline);
boolean runPulsar = Boolean.parseBoolean(commandline.getOptionValue("pulsar", "false"));
if (runPulsar) {
return new PulsarTest(startTime,commandline);
} else {
return new PravegaTest(startTime, commandline);
}
}
} catch (IllegalArgumentException ex) {
ex.printStackTrace();
Expand Down Expand Up @@ -367,7 +403,7 @@ public ExecutorService getExecutor() {

public abstract void closeReaderGroup();

public abstract List<WriterWorker> getProducers();
public abstract List<WriterWorker> getProducers() throws IOException;

public abstract List<ReaderWorker> getConsumers() throws URISyntaxException;

Expand Down Expand Up @@ -418,7 +454,7 @@ public List<WriterWorker> getProducers() {
.map(i -> new PravegaTransactionWriterWorker(i, eventsPerProducer,
runtimeSec, false,
messageSize, startTime,
produceStats, streamName,
produceStats, streamName, TIMEOUT,
eventsPerSec, writeAndRead, factory,
transactionPerCommit))
.collect(Collectors.toList());
Expand All @@ -428,7 +464,7 @@ public List<WriterWorker> getProducers() {
.map(i -> new PravegaWriterWorker(i, eventsPerProducer,
EventsPerFlush, runtimeSec, false,
messageSize, startTime, produceStats,
streamName, eventsPerSec, writeAndRead, factory))
streamName, TIMEOUT, eventsPerSec, writeAndRead, factory))
.collect(Collectors.toList());
}
} else {
Expand All @@ -445,7 +481,7 @@ public List<ReaderWorker> getConsumers() throws URISyntaxException {
.boxed()
.map(i -> new PravegaReaderWorker(i, eventsPerConsumer,
runtimeSec, startTime, consumeStats,
rdGrpName, TIMEOUT, writeAndRead, factory))
streamName, rdGrpName, TIMEOUT, writeAndRead, factory))
.collect(Collectors.toList());
} else {
readers = null;
Expand Down Expand Up @@ -519,8 +555,8 @@ public List<WriterWorker> getProducers() {
.boxed()
.map(i -> new KafkaWriterWorker(i, eventsPerProducer,
EventsPerFlush, runtimeSec, false,
messageSize, startTime, produceStats,
streamName, eventsPerSec, writeAndRead, producerConfig))
messageSize, startTime, produceStats, streamName,
TIMEOUT, eventsPerSec, writeAndRead, producerConfig))
.collect(Collectors.toList());
}
} else {
Expand Down Expand Up @@ -550,4 +586,70 @@ public void closeReaderGroup() {
}
}


static private class PulsarTest extends Test {
final PulsarClient client;

PulsarTest(long startTime, CommandLine commandline) throws
IllegalArgumentException, URISyntaxException, InterruptedException, Exception {
super(startTime, commandline);
client = PulsarClient.builder().serviceUrl(controllerUri).build();
}

public List<WriterWorker> getProducers() throws IOException {
final List<WriterWorker> writers;

if (producerCount > 0) {
if (transactionPerCommit > 0) {
throw new IllegalArgumentException("Pulsar Transactions are not supported");
} else {
writers = IntStream.range(0, producerCount)
.boxed()
.map(i -> {
try {
return new PulsarWriterWorker(i, eventsPerProducer,
EventsPerFlush, runtimeSec, false,
messageSize, startTime, produceStats,
streamName, TIMEOUT, eventsPerSec, writeAndRead, client);
} catch (IOException ex) {
ex.printStackTrace();
return null;
}
}).collect(Collectors.toList());
}
} else {
writers = null;
}

return writers;
}

public List<ReaderWorker> getConsumers() throws URISyntaxException {
final List<ReaderWorker> readers;
if (consumerCount > 0) {
readers = IntStream.range(0, consumerCount)
.boxed()
.map(i -> {
try {
return new PulsarReaderWorker(i, eventsPerConsumer,
runtimeSec, startTime, consumeStats,
streamName, rdGrpName, TIMEOUT, writeAndRead, client);
} catch (IOException ex) {
ex.printStackTrace();
return null;
}
}).collect(Collectors.toList());
} else {
readers = null;
}
return readers;
}

@Override
public void closeReaderGroup() {

}
}


}
8 changes: 4 additions & 4 deletions src/main/java/io/perf/core/ReaderWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ public abstract class ReaderWorker extends Worker implements Callable<Void> {
final private boolean writeAndRead;

public ReaderWorker(int readerId, int events, int secondsToRun, long start,
PerfStats stats, String readerGrp, int timeout, boolean writeAndRead) {
super(readerId, events, secondsToRun, 0, start, stats, readerGrp, timeout);
PerfStats stats, String streamName, String readerGrp, int timeout, boolean writeAndRead) {
super(readerId, events, secondsToRun, 0, start, stats, streamName, readerGrp, timeout);

this.writeAndRead = writeAndRead;
this.perf = createBenchmark();
Expand All @@ -46,12 +46,12 @@ private Performance createBenchmark() {
/**
* read the data.
*/
public abstract byte[] readData();
public abstract byte[] readData() throws IOException;

/**
* close the consumer/reader.
*/
public abstract void close();
public abstract void close() throws IOException;

@Override
public Void call() throws InterruptedException, ExecutionException, IOException {
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/io/perf/core/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@ public abstract class Worker {
public final int messageSize;
public final int timeout;
public final String streamName;
public final String groupName;
public final long startTime;
public final PerfStats stats;
public final int secondsToRun;

Worker(int sensorId, int events, int secondsToRun,
Worker(int workerID, int events, int secondsToRun,
int messageSize, long start, PerfStats stats,
String streamName, int timeout) {
this.workerID = sensorId;
String streamName, String groupName, int timeout) {
this.workerID = workerID;
this.events = events;
this.secondsToRun = secondsToRun;
this.startTime = start;
this.stats = stats;
this.streamName = streamName;
this.groupName = groupName;
this.messageSize = messageSize;
this.timeout = timeout;
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/perf/core/WriterWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public abstract class WriterWorker extends Worker implements Callable<Void> {

public WriterWorker(int sensorId, int events, int EventsPerFlush, int secondsToRun,
boolean isRandomKey, int messageSize, long start,
PerfStats stats, String streamName, int eventsPerSec, boolean writeAndRead) {
PerfStats stats, String streamName, int timeout, int eventsPerSec, boolean writeAndRead) {

super(sensorId, events, secondsToRun, messageSize, start, stats, streamName, 0);
super(sensorId, events, secondsToRun, messageSize, start, stats, streamName, null, timeout);
this.eventsPerSec = eventsPerSec;
this.EventsPerFlush = EventsPerFlush;
this.writeAndRead = writeAndRead;
Expand Down Expand Up @@ -97,12 +97,12 @@ private Performance createBenchmark() {
/**
* Flush the producer data.
*/
public abstract void flush();
public abstract void flush() throws IOException;

/**
* Flush the producer data.
*/
public abstract void close();
public abstract void close() throws IOException;


@Override
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/perf/drivers/kafka/KafkaReaderWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class KafkaReaderWorker extends ReaderWorker {
public KafkaReaderWorker(int readerId, int events, int secondsToRun,
long start, PerfStats stats, String partition,
int timeout, boolean writeAndRead, Properties consumerProps) {
super(readerId, events, secondsToRun, start, stats, partition, timeout, writeAndRead);
super(readerId, events, secondsToRun, start, stats, null, partition, timeout, writeAndRead);

this.consumer = new KafkaConsumer<>(consumerProps);
this.consumer.subscribe(Arrays.asList(partition));
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/perf/drivers/kafka/KafkaWriterWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ public class KafkaWriterWorker extends WriterWorker {

public KafkaWriterWorker(int sensorId, int events, int flushEvents,
int secondsToRun, boolean isRandomKey, int messageSize,
long start, PerfStats stats, String streamName,
long start, PerfStats stats, String streamName, int timeout,
int eventsPerSec, boolean writeAndRead, Properties producerProps) {

super(sensorId, events, flushEvents,
secondsToRun, isRandomKey, messageSize,
start, stats, streamName, eventsPerSec, writeAndRead);
start, stats, streamName, timeout, eventsPerSec, writeAndRead);

this.producer = new KafkaProducer<>(producerProps);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public class PravegaReaderWorker extends ReaderWorker {
private final EventStreamReader<byte[]> reader;

public PravegaReaderWorker(int readerId, int events, int secondsToRun,
long start, PerfStats stats, String readergrp,
long start, PerfStats stats, String streamName, String readergrp,
int timeout, boolean writeAndRead, ClientFactory factory) {
super(readerId, events, secondsToRun, start, stats, readergrp, timeout, writeAndRead);
super(readerId, events, secondsToRun, start, stats, streamName, readergrp, timeout, writeAndRead);

final String readerSt = Integer.toString(readerId);
reader = factory.createReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ public class PravegaTransactionWriterWorker extends PravegaWriterWorker {
public PravegaTransactionWriterWorker(int sensorId, int events,
int secondsToRun, boolean isRandomKey,
int messageSize, long start,
PerfStats stats, String streamName, int eventsPerSec, boolean writeAndRead,
PerfStats stats, String streamName, int timeout,
int eventsPerSec, boolean writeAndRead,
ClientFactory factory, int transactionsPerCommit) {

super(sensorId, events, Integer.MAX_VALUE, secondsToRun, isRandomKey,
messageSize, start, stats, streamName, eventsPerSec, writeAndRead, factory);
messageSize, start, stats, streamName, timeout, eventsPerSec, writeAndRead, factory);

this.transactionsPerCommit = transactionsPerCommit;
eventCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ public class PravegaWriterWorker extends WriterWorker {

public PravegaWriterWorker(int sensorId, int events, int EventsPerFlush, int secondsToRun,
boolean isRandomKey, int messageSize, long start,
PerfStats stats, String streamName, int eventsPerSec,
PerfStats stats, String streamName, int timeout, int eventsPerSec,
boolean writeAndRead, ClientFactory factory) {

super(sensorId, events, EventsPerFlush,
secondsToRun, isRandomKey, messageSize, start,
stats, streamName, eventsPerSec, writeAndRead);
stats, streamName, timeout, eventsPerSec, writeAndRead);

this.producer = factory.createEventWriter(streamName,
new ByteArraySerializer(),
Expand Down
Loading

0 comments on commit 46edffb

Please sign in to comment.