Skip to content

Commit

Permalink
lots of fixes for multi-thread WS/sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
codybum committed Feb 22, 2024
1 parent f1099c5 commit 78e28e8
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 50 deletions.
150 changes: 113 additions & 37 deletions src/main/java/crescoclient/core/WSInterface.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package crescoclient.core;

import com.google.gson.Gson;
import crescoclient.dataplane.DataPlaneInterface;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
Expand All @@ -14,9 +16,7 @@
import java.net.URI;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -25,7 +25,6 @@ public class WSInterface
{
//private boolean isActive = false;
private AtomicBoolean isActive = new AtomicBoolean(false);

private AtomicBoolean sessionLock = new AtomicBoolean();
private String regionName;
private String agentName;
Expand All @@ -43,7 +42,7 @@ public class WSInterface

private int connectionTimeout;

private final int idleTimeout = 300;
private final int idleTimeout = 30 * 1000;

private String url;

Expand All @@ -59,6 +58,8 @@ public WSInterface(Map<String,String> wsConfig, WSCallback wsCallback) {
this.request.addExtensions("permessage-deflate");
this.request.setHeader("cresco_service_key", wsConfig.get("service_key"));

//clean up sessions
sessionCleanup();
}

public String getRegionName() {
Expand Down Expand Up @@ -129,13 +130,14 @@ public boolean connect() {
ssl.setValidateCerts(false);
ssl.setValidatePeerCerts(false);
ssl.setEndpointIdentificationAlgorithm(null);
ssl.setIncludeProtocols("TLSv1.2", "TLSv1.3");
//ssl.setIncludeProtocols("TLSv1.2", "TLSv1.3");
//ssl.setEndpointIdentificationAlgorithm("HTTPS");
http = new HttpClient(ssl);
//http = new HttpClient();
client = new WebSocketClient(http);
//no idle timeout
client.getPolicy().setIdleTimeout(0);

//set buffers
client.getPolicy().setMaxTextMessageSize(1024 * 1024 * 32);
client.getPolicy().setMaxTextMessageBufferSize(1024 * 1024 * 128);
Expand All @@ -147,23 +149,13 @@ public boolean connect() {
http.start();
client.start();

/*
WSocketImp socket = new WSocketImp(new WSPassThroughCallback());
Future<Session> fut = client.connect(socket, URI.create(url), request);
Session session = fut.get();
//Set region and agent info
//get an initial session in order to get the agent info
getSession(true);
setAgentInfo(http);
//set connected
//isConnected = session.isOpen();
*/
isActive.set(true);


} catch (Throwable t) {
System.out.println("WHAT TYPE ERROR: " + t.getMessage());
LOG.warn(t);
} finally {
//stop(http);
Expand Down Expand Up @@ -197,7 +189,7 @@ public void run() {

//clear out previous
clearWS();

//System.out.println("start()");
while ((isReconnect.get()) && (!isActive.get())) {
try {
connect();
Expand Down Expand Up @@ -229,23 +221,31 @@ public void setIsReconnect(boolean isReconnect) {
}

public boolean connected() {
boolean isConnected = false;
boolean isConencted = false;
if(getIsActive()) {
boolean isSession;
synchronized (sessionLock) {
isSession = sessionMap.containsKey(Thread.currentThread().getId());
}
if (!isSession) {
getSession();
if(client.getState().equals("STARTED")) {
isConencted = true;
}
}
return isConencted;

}
public boolean SessionConnected() {
boolean isConnected = false;
if(getIsActive()) {
synchronized (sessionLock) {
isConnected = sessionMap.get(Thread.currentThread().getId()).isOpen();
if(sessionMap.containsKey(Thread.currentThread().getId())) {
isConnected = sessionMap.get(Thread.currentThread().getId()).isOpen();
if(!isConnected) {
sessionMap.remove(Thread.currentThread().getId());
}
}
}
}
return isConnected;
}

public Session createSession() {
public Session createSession(boolean setIdleTimeout) {
Session session = null;
try {
WSocketImp socket = new WSocketImp(new WSPassThroughCallback());
Expand All @@ -255,30 +255,40 @@ public Session createSession() {
//System.out.println(URI.create(url));
Future<Session> fut = client.connect(socket, URI.create(url), request);
session = fut.get();
if(regionName == null) {
setAgentInfo(http);
if(setIdleTimeout) {
session.setIdleTimeout(idleTimeout);
}
//if(regionName == null) {
// setAgentInfo(http);
//}
//System.out.println("CREATE SESSIONS Thread: " + Thread.currentThread().getId());
} catch (Exception ex) {
System.out.println("createSession() Error: " + ex.getMessage());
ex.printStackTrace();
}
return session;
}
public Session getSession() {
return getSession(false);
}
public Session getSession(boolean isTemp) {
Session session;

boolean sessionExists;
boolean sessionExists = false;
synchronized (sessionLock) {
sessionExists = sessionMap.containsKey(Thread.currentThread().getId());
if(sessionMap.containsKey(Thread.currentThread().getId())) {
sessionExists = SessionConnected();
}
}
if(sessionExists) {
//System.out.println("session exists");
session = sessionMap.get(Thread.currentThread().getId());
} else {
session = createSession();
//System.out.println("session create");
session = createSession(isTemp);
synchronized (sessionLock) {
sessionMap.put(Thread.currentThread().getId(), session);
}
}

return session;
}

Expand All @@ -298,6 +308,70 @@ private void clearWS() {
}
}

private int getSessionCount() {
synchronized (sessionLock) {
return sessionMap.size();
}
}

private List<Long> getSessionList() {
List<Long> sessionList;
synchronized (sessionLock) {
sessionList = new ArrayList<>(sessionMap.keySet());
}
return sessionList;
}

private void sessionCleanup() {

try {

//setup performance timer
Timer timer = new Timer();

TimerTask task = new TimerTask() {
@Override
public void run() {
try {
int sessionCount = getSessionCount();
if(sessionCount > 0) {

//System.out.println("thread: " + Thread.currentThread().getId() + " session count: " + sessionCount);
List<Long> sessionList = getSessionList();
int count = 1;
for (Long sessionId : sessionList) {
boolean isOpen;
synchronized (sessionLock) {
isOpen = sessionMap.get(sessionId).isOpen();
}
//System.out.println("thread: " + Thread.currentThread().getId() + " " + count + " of " + sessionList.size() +" sessionId: " + sessionId + " is open " + isOpen);
count++;
if (!isOpen) {
synchronized (sessionLock) {
sessionMap.remove(sessionId);
}
}
}
//System.out.println("-");
}

} catch (Exception ex) {
ex.printStackTrace();
}

}
};

// Schedule the timer task to run every second
timer.schedule(task, 0, 30 * 1000);
} catch (Exception ex) {
ex.printStackTrace();
}


}


private void stopLC(LifeCycle lifeCycle) {
try
{
Expand Down Expand Up @@ -335,13 +409,15 @@ public void onMessage(byte[] b, int offset, int length) {
@Override
public void onClose(int statusCode, String reason) {
wsCallback.onClose(statusCode,reason);

/*
if(isReconnect.get()) {
if(isActive.get()) {
isActive.set(false);
System.out.println("1");
start(connectionTimeout);
}
}
*/

}
}
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/crescoclient/core/WSocketImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ public void onClose(int statusCode, String reason)
@OnWebSocketError
public void onError(Throwable cause)
{
LOG.warn(cause);
wSStatusCallback.onError(cause);
if(cause instanceof org.eclipse.jetty.websocket.api.CloseException) {
LOG.debug(cause);
} else {
LOG.warn(cause);
wSStatusCallback.onError(cause);
}
}

@OnWebSocketMessage
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/crescoclient/dataplane/DataPlaneInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public void send(String message) {

try {
if(wsInterface.connected()) {
//System.out.println("DP Send text getSession()");
wsInterface.getSession().getRemote().sendString(message);

} else {
System.out.println("WS not connected!");
}
Expand All @@ -76,6 +78,7 @@ public void send(ByteBuffer byteBuffer) {

try {
if(wsInterface.connected()) {
System.out.println("DB B");
wsInterface.getSession().getRemote().sendBytes(byteBuffer);
} else {
System.out.println("WS not connected!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public LogStreamerInterface(String host, int port, String serviceKey, OnMessageC
public void send(String message) {

try {

System.out.println("LOG");
System.out.println("LOG getSession()");
wsInterface.getSession().getRemote().sendString(message);

} catch (Exception e) {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/crescoclient/msgevent/MsgEventInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public String getPluginName() {
public void send(String message) {

try {

wsInterface.getSession().getRemote().sendString(message);
System.out.println("MSG EVENT: " + message);
wsInterface.getSession(true).getRemote().sendString(message);

} catch (Exception e) {
e.printStackTrace();
Expand Down
24 changes: 16 additions & 8 deletions src/main/java/example/TextPerformanceTesting.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void run() {
double bytesPerSecond = (double) bytesTransferred / (timeElapsed/1000);

// Print the bytes per second
System.out.println("Message per second: " + bytesPerSecond);
//System.out.println("Message per second: " + bytesPerSecond);
}
};

Expand Down Expand Up @@ -99,15 +99,15 @@ public void onMessage(byte[] b, int offset, int length) {

String input = "BRRRRUUU";

for(int i = 0; i<10; i++) {
for(int i = 0; i<3; i++) {
launchThread(dataPlaneSend, input);
}

while(true) {
//while(true) {
//for(int i = 0; i<25; i++) {
Thread.sleep(1000);
//Thread.sleep(1000);
//dataPlaneSend.send(str);
}
//}


} catch (Exception ex) {
Expand All @@ -121,15 +121,23 @@ private void launchThread(DataPlaneInterface dataPlaneSend, String input) {
new Thread() {
public void run() {
try {
while(true) {
for(int i = 0; i<100; i++) {
//while(true) {
for(int i = 0; i<10; i++) {
//Thread.sleep(1000);
//dataPlaneSend.send(str);
dataPlaneSend.send(input);
}
Thread.sleep(45 * 1000);

for(int i = 0; i<10; i++) {
//Thread.sleep(1000);
//dataPlaneSend.send(str);
dataPlaneSend.send(input);
}
System.out.println("ENDED");
//dataPlaneSend.send(input);
//Thread.sleep(1000);
}
//}

} catch(Exception v) {
System.out.println(v);
Expand Down

0 comments on commit 78e28e8

Please sign in to comment.