Skip to content

Commit

Permalink
less copying, primitive builder (#776)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Oct 26, 2022
1 parent fac8c01 commit 6ebca4a
Show file tree
Hide file tree
Showing 10 changed files with 934 additions and 222 deletions.
44 changes: 41 additions & 3 deletions src/main/java/io/nats/client/impl/Headers.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.function.BiConsumer;

import static io.nats.client.support.NatsConstants.*;
import static java.nio.charset.StandardCharsets.US_ASCII;

/**
* An object that represents a map of keys to a list of values. It does not accept
Expand Down Expand Up @@ -357,7 +358,8 @@ public int serializedLength() {
return dataLength + NON_DATA_BYTES;
}

private static final int NON_DATA_BYTES = HEADER_VERSION_BYTES_PLUS_CRLF.length + 2;
private static final int HVCRLF_BYTES = HEADER_VERSION_BYTES_PLUS_CRLF.length;
private static final int NON_DATA_BYTES = HVCRLF_BYTES + 2;

/**
* Returns the serialized bytes.
Expand All @@ -366,16 +368,18 @@ public int serializedLength() {
*/
public byte[] getSerialized() {
if (serialized == null) {
serialized = appendSerialized(new ByteArrayBuilder(dataLength + NON_DATA_BYTES)).toByteArray();
serialized = new byte[serializedLength()];
serializeToArray(0, serialized);
}
return serialized;
}

/**
* @deprecated
* Appends the serialized bytes to the builder.
*
* @return the builder
*/
@Deprecated
public ByteArrayBuilder appendSerialized(ByteArrayBuilder bab) {
bab.append(HEADER_VERSION_BYTES_PLUS_CRLF);
for (String key : valuesMap.keySet()) {
Expand All @@ -390,6 +394,40 @@ public ByteArrayBuilder appendSerialized(ByteArrayBuilder bab) {
return bab;
}

/**
* Write the header to the byte array. Assumes that the caller has
* already validated that the destination array is large enough by using getSerialized()
* @param destPosition the position index in destination byte array to start
* @param dest the byte array to write to
* @return the length of the header
*/
public int serializeToArray(int destPosition, byte[] dest) {
System.arraycopy(HEADER_VERSION_BYTES_PLUS_CRLF, 0, dest, destPosition, HVCRLF_BYTES);
destPosition += HVCRLF_BYTES;

for (Map.Entry<String, List<String>> entry : valuesMap.entrySet()) {
List<String> values = entry.getValue();
for (String value : values) {
byte[] bytes = entry.getKey().getBytes(US_ASCII);
System.arraycopy(bytes, 0, dest, destPosition, bytes.length);
destPosition += bytes.length;

dest[destPosition++] = COLON;

bytes = value.getBytes(US_ASCII);
System.arraycopy(bytes, 0, dest, destPosition, bytes.length);
destPosition += bytes.length;

dest[destPosition++] = CR;
dest[destPosition++] = LF;
}
}
dest[destPosition++] = CR;
dest[destPosition] = LF;

return serializedLength();
}

/**
* Check the key to ensure it matches the specification for keys.
*
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/io/nats/client/impl/MessageQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ boolean push(NatsMessage msg, boolean internal) {
// If we aren't running, then we need to obey the filter lock
// to avoid ordering problems
if (!internal && this.discardWhenFull) {
boolean myOffer = this.queue.offer(msg);
return myOffer;
return this.queue.offer(msg);
}
if (!this.offer(msg)) {
throw new IllegalStateException("Output queue is full " + queue.size());
Expand Down
13 changes: 6 additions & 7 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ void unsubscribe(NatsSubscription sub, int after) {
}

if (!isConnected()) {
return;// We will setup sub on reconnect or ignore
return; // We will set up sub on reconnect or ignore
}

sendUnsub(sub, after);
Expand Down Expand Up @@ -905,7 +905,7 @@ String reSubscribe(NatsSubscription sub, String subject, String queueName) {

void sendSubscriptionMessage(String sid, String subject, String queueName, boolean treatAsInternal) {
if (!isConnected()) {
return; // We will setup sub on reconnect or ignore
return; // We will set up sub on reconnect or ignore
}

ByteArrayBuilder bab = new ByteArrayBuilder(UTF_8).append(SUB_SP_BYTES).append(subject);
Expand Down Expand Up @@ -1208,7 +1208,7 @@ void sendConnect(String serverURI) throws IOException {
ServerInfo info = this.serverInfo.get();
CharBuffer connectOptions = this.options.buildProtocolConnectOptionsString(serverURI, info.isAuthRequired(), info.getNonce());
ByteArrayBuilder bab =
new ByteArrayBuilder(OP_CONNECT_SP_LEN + connectOptions.limit(), ByteArrayBuilder.DEFAULT_OTHER_ALLOCATION, UTF_8)
new ByteArrayBuilder(OP_CONNECT_SP_LEN + connectOptions.limit(), UTF_8)
.append(CONNECT_SP_BYTES).append(connectOptions);
queueInternalOutgoing(new ProtocolMessage(bab));
} catch (Exception exp) {
Expand Down Expand Up @@ -1250,13 +1250,12 @@ CompletableFuture<Boolean> sendPing(boolean treatAsInternal) {
}

CompletableFuture<Boolean> pongFuture = new CompletableFuture<>();
NatsMessage msg = new ProtocolMessage(OP_PING_BYTES);
pongQueue.add(pongFuture);

if (treatAsInternal) {
queueInternalOutgoing(msg);
queueInternalOutgoing(new ProtocolMessage(OP_PING_BYTES));
} else {
queueOutgoing(msg);
queueOutgoing(new ProtocolMessage(OP_PING_BYTES));
}

this.needPing.set(true);
Expand All @@ -1265,7 +1264,7 @@ CompletableFuture<Boolean> sendPing(boolean treatAsInternal) {
}

void sendPong() {
queueInternalOutgoing( new ProtocolMessage(OP_PONG_BYTES) );
queueInternalOutgoing(new ProtocolMessage(OP_PONG_BYTES));
}

// Called by the reader
Expand Down
64 changes: 32 additions & 32 deletions src/main/java/io/nats/client/impl/NatsConnectionWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

import static io.nats.client.support.NatsConstants.OP_PING_BYTES;
import static io.nats.client.support.NatsConstants.OP_PONG_BYTES;
import static io.nats.client.support.BuilderBase.bufferAllocSize;
import static io.nats.client.support.NatsConstants.*;

class NatsConnectionWriter implements Runnable {
private static final int BUFFER_BLOCK_SIZE = 256;

private final NatsConnection connection;

Expand All @@ -41,6 +42,7 @@ class NatsConnectionWriter implements Runnable {
private final ReentrantLock startStopLock;

private byte[] sendBuffer;
private final AtomicInteger sendBufferLength;

private final MessageQueue outgoing;
private final MessageQueue reconnectOutgoing;
Expand All @@ -56,14 +58,15 @@ class NatsConnectionWriter implements Runnable {
((CompletableFuture<Boolean>)this.stopped).complete(Boolean.TRUE); // we are stopped on creation

Options options = connection.getOptions();
int bufSize = options.getBufferSize();
this.sendBuffer = new byte[bufSize];
int sbl = bufferAllocSize(options.getBufferSize(), BUFFER_BLOCK_SIZE);
sendBufferLength = new AtomicInteger(sbl);
sendBuffer = new byte[sbl];

outgoing = new MessageQueue(true,
options.getMaxMessagesInOutgoingQueue(),
options.isDiscardMessagesWhenOutgoingQueueFull());

// The reconnect buffer contains internal messages, and we will keep it unlimited in size
// The "reconnect" buffer contains internal messages, and we will keep it unlimited in size
reconnectOutgoing = new MessageQueue(true, 0);
reconnectBufferSize = options.getReconnectBufferSize();
}
Expand Down Expand Up @@ -95,8 +98,8 @@ Future<Boolean> stop() {
this.reconnectOutgoing.pause();
// Clear old ping/pong requests
this.outgoing.filter((msg) ->
Arrays.equals(OP_PING_BYTES, msg.getProtocolBytes())
|| Arrays.equals(OP_PONG_BYTES, msg.getProtocolBytes()));
msg.isProtocol() &&
(msg.protocolBab.equals(OP_PING_BYTES) || msg.protocolBab.equals(OP_PONG_BYTES)));

} finally {
this.startStopLock.unlock();
Expand All @@ -106,48 +109,45 @@ Future<Boolean> stop() {
}

synchronized void sendMessageBatch(NatsMessage msg, DataPort dataPort, NatsStatistics stats)
throws IOException {
throws IOException {

int sendPosition = 0;
int sbl = sendBufferLength.get();

while (msg != null) {
long size = msg.getSizeInBytes();

if (sendPosition + size > sendBuffer.length) {
if (sendPosition == 0) { // have to resize
this.sendBuffer = new byte[(int)Math.max(sendBuffer.length + size, sendBuffer.length * 2L)];
} else { // else send and continue with current message
if (sendPosition + size > sbl) {
if (sendPosition > 0) {
dataPort.write(sendBuffer, sendPosition);
connection.getNatsStatistics().registerWrite(sendPosition);
sendPosition = 0;

// go back to the start of the loop, to ensure we check for resizing the buffer again
continue;
}
if (size > sbl) { // have to resize b/c can't fit 1 message
sbl = bufferAllocSize((int)size, BUFFER_BLOCK_SIZE);
sendBufferLength.set(sbl);
sendBuffer = new byte[sbl];
}
}

byte[] bytes = msg.getProtocolBytes();
System.arraycopy(bytes, 0, sendBuffer, sendPosition, bytes.length);
sendPosition += bytes.length;
int blen = msg.protocolBab.length();
System.arraycopy(msg.protocolBab.internalArray(), 0, sendBuffer, sendPosition, blen);
sendPosition += blen;

sendBuffer[sendPosition++] = '\r';
sendBuffer[sendPosition++] = '\n';
sendBuffer[sendPosition++] = CR;
sendBuffer[sendPosition++] = LF;

if (!msg.isProtocol()) {
bytes = msg.getSerializedHeader();
if (bytes != null && bytes.length > 0) {
System.arraycopy(bytes, 0, sendBuffer, sendPosition, bytes.length);
sendPosition += bytes.length;
}
sendPosition += msg.copyNotEmptyHeaders(sendPosition, sendBuffer);

bytes = msg.getData(); // guaranteed to not be null
byte[] bytes = msg.getData(); // guaranteed to not be null
if (bytes.length > 0) {
System.arraycopy(bytes, 0, sendBuffer, sendPosition, bytes.length);
sendPosition += bytes.length;
}

sendBuffer[sendPosition++] = '\r';
sendBuffer[sendPosition++] = '\n';
sendBuffer[sendPosition++] = CR;
sendBuffer[sendPosition++] = LF;
}

stats.incrementOutMsgs();
Expand All @@ -174,9 +174,9 @@ public void run() {
NatsMessage msg = null;

if (this.reconnectMode.get()) {
msg = this.reconnectOutgoing.accumulate(this.sendBuffer.length, maxAccumulate, reconnectWait);
msg = this.reconnectOutgoing.accumulate(sendBufferLength.get(), maxAccumulate, reconnectWait);
} else {
msg = this.outgoing.accumulate(this.sendBuffer.length, maxAccumulate, waitForMessage);
msg = this.outgoing.accumulate(sendBufferLength.get(), maxAccumulate, waitForMessage);
}

if (msg == null) { // Make sure we are still running
Expand All @@ -199,7 +199,7 @@ void setReconnectMode(boolean tf) {
}

boolean canQueueDuringReconnect(NatsMessage msg) {
// don't over fill the send buffer while waiting to reconnect
// don't over fill the "send" buffer while waiting to reconnect
return (reconnectBufferSize < 0 || (outgoing.sizeInBytes() + msg.getSizeInBytes()) < reconnectBufferSize);
}

Expand Down
Loading

0 comments on commit 6ebca4a

Please sign in to comment.