Skip to content

Commit

Permalink
ReentrantLock instead of synchronized (#1095)
Browse files Browse the repository at this point in the history
scottf authored Mar 14, 2024
1 parent 6c5ae3f commit 9d5fb0a
Showing 14 changed files with 372 additions and 216 deletions.
25 changes: 19 additions & 6 deletions src/examples/java/io/nats/examples/chaosTestApp/Output.java
Original file line number Diff line number Diff line change
@@ -27,13 +27,14 @@
import java.io.PrintStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.locks.ReentrantLock;

public class Output extends JPanel {
public enum Screen {Left, Main, Console}

static final Object workLock = new Object();
static final Object controlLock = new Object();
static final Object debugLock = new Object();
static final ReentrantLock workLock = new ReentrantLock();
static final ReentrantLock controlLock = new ReentrantLock();
static final ReentrantLock debugLock = new ReentrantLock();

static boolean console;
static boolean work;
@@ -202,7 +203,8 @@ private static String time() {

public static void workMessage(String label, String s) {
if (work) {
synchronized (workLock) {
workLock.lock();
try {
if (console) {
consoleMessage("WORK", label, s);
}
@@ -213,6 +215,9 @@ public static void workMessage(String label, String s) {
consoleMessage(null, label, s, workLog);
}
}
finally {
workLock.unlock();
}
}
}

@@ -225,7 +230,8 @@ public static void controlMessage(String label, String jvLabel, JsonValue jv) {
}

public static void controlMessage(String label, String s) {
synchronized (controlLock) {
controlLock.lock();
try {
if (console) {
consoleMessage(controlConsoleAreaLabel, label, s);
}
@@ -236,6 +242,9 @@ public static void controlMessage(String label, String s) {
consoleMessage(null, label, s, controlLog);
}
}
finally {
controlLock.unlock();
}
}

public static void dumpControl() {
@@ -258,7 +267,8 @@ private static void dump(String label, Document document) {

public static void debugMessage(String label, String s) {
if (debug) {
synchronized (debugLock) {
debugLock.lock();
try {
if (console) {
consoleMessage("DEBUG", label, s);
}
@@ -271,6 +281,9 @@ public static void debugMessage(String label, String s) {
consoleMessage("DEBUG", label, s + "\n", controlLog);
}
}
finally {
debugLock.unlock();
}
}
}

78 changes: 47 additions & 31 deletions src/main/java/io/nats/client/NUID.java
Original file line number Diff line number Diff line change
@@ -15,6 +15,8 @@

package io.nats.client;

import java.util.concurrent.locks.ReentrantLock;

import static io.nats.client.support.RandomUtils.*;

/**
@@ -50,6 +52,7 @@ public final class NUID {
private long inc;

private static final NUID globalNUID;
private final ReentrantLock nextLock;

static {
globalNUID = new NUID();
@@ -69,6 +72,7 @@ static NUID getInstance() {
* SecureRandom instance.
*/
public NUID() {
nextLock = new ReentrantLock();
// Generate a cryto random int, 0 <= val < max to seed pseudorandom
seq = nextLong(PRAND, maxSeq);
inc = minInc + nextLong(PRAND, maxInc - minInc);
@@ -82,14 +86,14 @@ public NUID() {
/**
* @return the next NUID string from a shared global NUID instance
*/
public static synchronized String nextGlobal() {
public static String nextGlobal() {
return globalNUID.next();
}

/**
* @return the next sequence portion of the NUID string from a shared global NUID instance
*/
public static synchronized String nextGlobalSequence() {
public static String nextGlobalSequence() {
return globalNUID.nextSequence();
}

@@ -98,45 +102,57 @@ public static synchronized String nextGlobalSequence() {
*
* @return the next NUID string from this instance.
*/
public synchronized String next() {
// Increment and capture.
seq += inc;
if (seq >= maxSeq) {
randomizePrefix();
resetSequential();
public String next() {
nextLock.lock();
try {
// Increment and capture.
seq += inc;
if (seq >= maxSeq) {
randomizePrefix();
resetSequential();
}

// Copy prefix
char[] b = new char[totalLen];
System.arraycopy(pre, 0, b, 0, preLen);

// copy in the seq
int i = b.length;
for (long l = seq; i > preLen; l /= base) {
b[--i] = digits[(int) (l % base)];
}
return new String(b);
}

// Copy prefix
char[] b = new char[totalLen];
System.arraycopy(pre, 0, b, 0, preLen);

// copy in the seq
int i = b.length;
for (long l = seq; i > preLen; l /= base) {
b[--i] = digits[(int) (l % base)];
finally {
nextLock.unlock();
}
return new String(b);
}

/**
* Generate the next NUID string from this instance and return only the sequence portion.
* @return the next sequence portion of the NUID string from a shared global NUID instance
*/
public synchronized String nextSequence() {
// Increment and capture.
seq += inc;
if (seq >= maxSeq) {
randomizePrefix();
resetSequential();
public String nextSequence() {
nextLock.lock();
try {
// Increment and capture.
seq += inc;
if (seq >= maxSeq) {
randomizePrefix();
resetSequential();
}

char[] b = new char[seqLen];
// copy in the seq
int ix = seqLen;
for (long l = seq; ix > 0; l /= base) {
b[--ix] = digits[(int) (l % base)];
}
return new String(b);
}

char[] b = new char[seqLen];
// copy in the seq
int ix = seqLen;
for (long l = seq; ix > 0; l /= base) {
b[--ix] = digits[(int) (l % base)];
finally {
nextLock.unlock();
}
return new String(b);
}

// Resets the sequential portion of the NUID
29 changes: 23 additions & 6 deletions src/main/java/io/nats/client/impl/MessageManager.java
Original file line number Diff line number Diff line change
@@ -22,13 +22,14 @@
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

abstract class MessageManager {
public enum ManageResult {MESSAGE, STATUS_HANDLED, STATUS_TERMINUS, STATUS_ERROR}

protected static final int THRESHOLD = 3;

protected final Object stateChangeLock;
protected final ReentrantLock stateChangeLock;
protected final NatsConnection conn;
protected final SubscribeOptions so;
protected final boolean syncMode;
@@ -47,7 +48,7 @@ public enum ManageResult {MESSAGE, STATUS_HANDLED, STATUS_TERMINUS, STATUS_ERROR
protected Timer heartbeatTimer;

protected MessageManager(NatsConnection conn, SubscribeOptions so, boolean syncMode) {
stateChangeLock = new Object();
stateChangeLock = new ReentrantLock();

this.conn = conn;
this.so = so;
@@ -87,19 +88,24 @@ protected Boolean beforeQueueProcessorImpl(NatsMessage msg) {
abstract protected ManageResult manage(Message msg);

protected void trackJsMessage(Message msg) {
synchronized (stateChangeLock) {
stateChangeLock.lock();
try {
NatsJetStreamMetaData meta = msg.metaData();
lastStreamSeq = meta.streamSequence();
lastConsumerSeq++;
}
finally {
stateChangeLock.unlock();
}
}

protected void handleHeartbeatError() {
conn.executeCallback((c, el) -> el.heartbeatAlarm(c, sub, lastStreamSeq, lastConsumerSeq));
}

protected void configureIdleHeartbeat(Duration configIdleHeartbeat, long configMessageAlarmTime) {
synchronized (stateChangeLock) {
stateChangeLock.lock();
try {
idleHeartbeatSetting = configIdleHeartbeat == null ? 0 : configIdleHeartbeat.toMillis();
if (idleHeartbeatSetting <= 0) {
alarmPeriodSetting = 0;
@@ -115,6 +121,9 @@ protected void configureIdleHeartbeat(Duration configIdleHeartbeat, long configM
hb = true;
}
}
finally {
stateChangeLock.unlock();
}
}

protected void updateLastMessageReceived() {
@@ -160,7 +169,8 @@ public String toString() {
}

protected void initOrResetHeartbeatTimer() {
synchronized (stateChangeLock) {
stateChangeLock.lock();
try {
if (heartbeatTimer != null) {
// Same settings, just reuse the existing timer
if (heartbeatTimerTask.alarmPeriod == alarmPeriodSetting) {
@@ -178,16 +188,23 @@ protected void initOrResetHeartbeatTimer() {
heartbeatTimer.schedule(heartbeatTimerTask, alarmPeriodSetting, alarmPeriodSetting);
updateLastMessageReceived();
}
finally {
stateChangeLock.unlock();
}
}

protected void shutdownHeartbeatTimer() {
synchronized (stateChangeLock) {
stateChangeLock.lock();
try {
if (heartbeatTimer != null) {
heartbeatTimerTask.shutdown();
heartbeatTimerTask = null;
heartbeatTimer.cancel();
heartbeatTimer = null;
}
}
finally {
stateChangeLock.unlock();
}
}
}
Loading

0 comments on commit 9d5fb0a

Please sign in to comment.