Skip to content

Commit

Permalink
blam, multi-threaded client based on thread_id
Browse files Browse the repository at this point in the history
  • Loading branch information
codybum committed Feb 21, 2024
1 parent caf13e3 commit 47d1fed
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 26 deletions.
92 changes: 77 additions & 15 deletions src/main/java/crescoclient/core/WSInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;

import java.io.InputStream;
import java.net.Socket;
import java.net.URI;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -24,6 +25,8 @@ public class WSInterface
{
//private boolean isActive = false;
private AtomicBoolean isActive = new AtomicBoolean(false);

private AtomicBoolean sessionLock = new AtomicBoolean();
private String regionName;
private String agentName;
private String pluginName;
Expand All @@ -32,17 +35,30 @@ public class WSInterface
private final Logger LOG = Log.getLogger(WSInterface.class);
private HttpClient http;
private WebSocketClient client;
private Session session;

//private Session session;
Map<Long, Session> sessionMap;
private Map<String,String> wsConfig;

private WSCallback wsCallback;

private int connectionTimeout;

private final int idleTimeout = 300;

private String url;

private ClientUpgradeRequest request;

public WSInterface(Map<String,String> wsConfig, WSCallback wsCallback) {
this.wsConfig = wsConfig;
this.wsCallback = wsCallback;
this.sessionMap = Collections.synchronizedMap(new HashMap<>());
this.url = "wss://" + wsConfig.get("host") + ":" + wsConfig.get("port") + wsConfig.get("api_path");

this.request = new ClientUpgradeRequest();
this.request.addExtensions("permessage-deflate");
this.request.setHeader("cresco_service_key", wsConfig.get("service_key"));

}

public String getRegionName() {
Expand Down Expand Up @@ -96,6 +112,8 @@ public boolean serverListening(String host, int port)

public boolean connect() {

//System.out.println("THIS FIRST");

boolean isConnected = false;

if(!inConnect.get()) {
Expand All @@ -106,8 +124,6 @@ public boolean connect() {

if (serverListening(wsConfig.get("host"), Integer.parseInt(wsConfig.get("port")))) {

String url = "wss://" + wsConfig.get("host") + ":" + wsConfig.get("port") + wsConfig.get("api_path");

SslContextFactory ssl = new SslContextFactory.Client();
ssl.setTrustAll(true);
ssl.setValidateCerts(false);
Expand All @@ -125,31 +141,35 @@ public boolean connect() {
client.getPolicy().setMaxTextMessageBufferSize(1024 * 1024 * 128);
client.getPolicy().setMaxBinaryMessageSize(1024 * 1024 * 32);
client.getPolicy().setMaxBinaryMessageBufferSize(1024 * 1024 * 128);
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.addExtensions("permessage-deflate");
request.setHeader("cresco_service_key", wsConfig.get("service_key"));

try {

http.start();
client.start();

WSInterfaceImpl socket = new WSInterfaceImpl(new WSPassThroughCallback());
/*
WSocketImp socket = new WSocketImp(new WSPassThroughCallback());
Future<Session> fut = client.connect(socket, URI.create(url), request);
session = fut.get();
Session session = fut.get();
//Set region and agent info
setAgentInfo(http);
//set connected
isConnected = session.isOpen();
//isConnected = session.isOpen();
*/
isActive.set(true);


} catch (Throwable t) {
LOG.warn(t);
} finally {
//stop(http);
//stop(client);
}
//System.out.println("DID YOU FINISHED");
} else {
inConnect.set(false);
LOG.warn("connect(): Remote server is not listening at host:" + wsConfig.get("host") + " port:" + wsConfig.get("port"));
Expand Down Expand Up @@ -209,14 +229,56 @@ public void setIsReconnect(boolean isReconnect) {
}

public boolean connected() {
if(session != null) {
return session.isOpen();
} else {
return false;
boolean isConnected = false;
if(getIsActive()) {
boolean isSession;
synchronized (sessionLock) {
isSession = sessionMap.containsKey(Thread.currentThread().getId());
}
if (!isSession) {
getSession();
}
synchronized (sessionLock) {
isConnected = sessionMap.get(Thread.currentThread().getId()).isOpen();
}
}
return isConnected;
}

public Session createSession() {
Session session = null;
try {
WSocketImp socket = new WSocketImp(new WSPassThroughCallback());
//System.out.println("url: " + url);
//System.out.println("request: " + request);
//System.out.println(client);
//System.out.println(URI.create(url));
Future<Session> fut = client.connect(socket, URI.create(url), request);
session = fut.get();
if(regionName == null) {
setAgentInfo(http);
}
} catch (Exception ex) {
ex.printStackTrace();
}
return session;
}
public Session getSession() {
Session session;

boolean sessionExists;
synchronized (sessionLock) {
sessionExists = sessionMap.containsKey(Thread.currentThread().getId());
}
if(sessionExists) {
session = sessionMap.get(Thread.currentThread().getId());
} else {
session = createSession();
synchronized (sessionLock) {
sessionMap.put(Thread.currentThread().getId(), session);
}
}

return session;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.*;

import java.io.InputStream;

@WebSocket
public class WSInterfaceImpl
public class WSocketImp
{

private final Logger LOG = Log.getLogger(WSInterfaceImpl.class);
private final Logger LOG = Log.getLogger(WSocketImp.class);
private WSCallback wSStatusCallback;
public WSInterfaceImpl(WSCallback wSStatusCallback) {
public WSocketImp(WSCallback wSStatusCallback) {
this.wSStatusCallback = wSStatusCallback;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class LogPrinter implements OnMessageCallback {
@Override
public void onMessage(String msg) {

System.out.println("DP LogPrinter: " + msg);
//System.out.println("DP LogPrinter: " + msg);
}

@Override
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/crescoclient/msgevent/MsgEventInterface.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package crescoclient.msgevent;

import com.google.common.primitives.Bytes;
import crescoclient.core.WSCallback;
import crescoclient.core.WSInterface;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;

import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.SynchronousQueue;
Expand Down
11 changes: 8 additions & 3 deletions src/main/java/example/Launcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,18 @@ public static void main(String[] args) throws Exception {

CrescoClient client = new CrescoClient(host,port,service_key);
client.connect();
//System.out.println("ARE YOU BLOCKING?");

if(client.connected()) {

//System.out.println("ARE YOU BLOCKING? 2");
//BinaryPerformanceTesting binaryPerformanceTesting = new BinaryPerformanceTesting(client);
//binaryPerformanceTesting.runTest();
BinaryFileRepoTesting binaryFileRepoTesting = new BinaryFileRepoTesting(client);
binaryFileRepoTesting.runTest();

//BinaryFileRepoTesting binaryFileRepoTesting = new BinaryFileRepoTesting(client);
//binaryFileRepoTesting.runTest();
TextPerformanceTesting textPerformanceTesting = new TextPerformanceTesting(client);
textPerformanceTesting.runTest();

/*
Map<String,String> update = new HashMap<>();
Expand Down
133 changes: 133 additions & 0 deletions src/main/java/example/TextPerformanceTesting.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package example;

import com.google.gson.Gson;
import crescoclient.CrescoClient;
import crescoclient.core.OnMessageCallback;
import crescoclient.dataplane.DataPlaneInterface;

import java.util.*;

public class TextPerformanceTesting {

private CrescoClient client;
public long bytesTransferred = 0;
public TextPerformanceTesting(CrescoClient client) {
this.client = client;
}

public void runTest() {

try {

long startTime = System.currentTimeMillis();

//setup performance timer
Timer timer = new Timer();

TimerTask task = new TimerTask() {
@Override
public void run() {
// Get the number of bytes transferred

// Get the time it took to transfer the bytes
long timeElapsed = System.currentTimeMillis() - startTime;

// Calculate the bytes per second
double bytesPerSecond = (double) bytesTransferred / (timeElapsed/1000);

// Print the bytes per second
System.out.println("Message per second: " + bytesPerSecond);
}
};

// Schedule the timer task to run every second
timer.schedule(task, 0, 1000);

System.out.println("API: region: " + client.api.getAPIRegionName() + " agent: " + client.api.getAPIAgentName() + " plugin: " + client.api.getAPIPluginName());
String dst_region = client.api.getGlobalRegion();
String dst_agent = client.api.getGlobalAgent();
System.out.println("Global Controller: region: " + dst_region + " agent:" + dst_agent);
System.out.println("---");

//String queryString = "stream_test='" + "bin" + "'";
String identKey = "stream_name";
String identId = "1234";
//String streamQuery = "stream_name='" + identId + "'";
Map<String, String> configDB = new HashMap<>();
configDB.put("ident_key", identKey);
configDB.put("ident_id", identId);
//configDB.put("stream_query",identKey + "='" + identId + "' and type='" + "outgoing" + "'");
configDB.put("io_type_key", "type");
configDB.put("output_id", "output");
configDB.put("input_id", "output");
Gson gson = new Gson();

String queryString = gson.toJson(configDB);

class BytePrinter implements OnMessageCallback {

@Override
public void onMessage(String msg) {

//System.out.println("TEXT MESSAGE!");
bytesTransferred += 1;
}

@Override
public void onMessage(byte[] b, int offset, int length) {
//bytesTransferred = bytesTransferred + length;
//String s = new String(b, StandardCharsets.UTF_8);
//System.out.println("binary: " + s);
//System.out.println("length: " + b.length + " offset: " + offset + " length: " + length);
}
}

DataPlaneInterface dataPlaneRec = client.getDataPlane("", new BytePrinter());
dataPlaneRec.start();
while(!dataPlaneRec.connected()) {
Thread.sleep(1000);
}

DataPlaneInterface dataPlaneSend = client.getDataPlane(queryString);
dataPlaneSend.start();
while(!dataPlaneSend.connected()) {
Thread.sleep(1000);
}

String input = "BRRRRUUU";

for(int i = 0; i<50; i++) {
launchThread(dataPlaneSend, input);
}

while(true) {
Thread.sleep(1000);
//dataPlaneSend.send(str);
}


} catch (Exception ex) {
ex.printStackTrace();
}

}

private void launchThread(DataPlaneInterface dataPlaneSend, String input) {

new Thread() {
public void run() {
try {
while(true) {
dataPlaneSend.send(input);
//Thread.sleep(1000);
}

} catch(Exception v) {
System.out.println(v);
}
}
}.start();

}

}

0 comments on commit 47d1fed

Please sign in to comment.