Skip to content

Commit

Permalink
Initial Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Leo-Malay committed Oct 20, 2024
0 parents commit fee5c56
Show file tree
Hide file tree
Showing 11 changed files with 988 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/*.class
/abc/*
246 changes: 246 additions & 0 deletions ChandyLamport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
import java.util.*;
import java.io.*;
import java.net.*;

enum Color {
BLUE, RED
};

public class ChandyLamport {
public Node node;
public int parentId;

public int markersSent = 0;
public int markerReceived = 0;

public int msgSent = 0;
public int msgReceived = 0;

public Color color;

public Map<Integer, Vector<Integer>> localSs = new HashMap<>();
public boolean state;

public ChandyLamport(Node node) {
this.node = node;
this.color = Color.BLUE;
this.state = false;
}

public void initSpanningTree() throws Exception {
System.out.println("[INIT] Snapshot Spanning process at NODE: " + this.node.id);

this.color = Color.RED;

this.printStatus();

for (Map.Entry<Integer, Socket> entry : node.idToChannelMap.entrySet()) {
Socket channel = entry.getValue();
Message msg = new Message(node.id);
System.out.println("[CL] Sending " + msg.messageType + " to Node" + entry.getKey());
Client.sendMsg(msg, channel, node);
this.markersSent += 1;
}
}

public void printStatus() {
System.out.println("========== Snapshot Status ==========");
System.out.println("Color: " + color);
System.out.println("MARKERS Sent: " + markersSent);
System.out.println("MARKERS Received:" + markerReceived);
System.out.println("====================================\n");
}

public void handleMarkerMessageFromParent(Message msg) throws Exception {
if (this.color == Color.RED) {
System.out.println("[REJECTED] MARKER message from Node " + msg.id);
Message rejectMarker = new Message();
Socket channel = this.node.idToChannelMap.get(msg.id);
synchronized(node) {
Client.sendMsg(rejectMarker, channel, node);
}
return;
}

this.color = Color.RED;
this.parentId = msg.id;

for (Map.Entry<Integer, Socket> entry : node.idToChannelMap.entrySet()) {
Socket channel = entry.getValue();

Message msg = new Message(node.id);
synchronized (node) {
Client.sendMsg(msg, channel, node);
this.markersSent++;
}
}

System.out.println("[ACCEPTED] MARKER message from Node " + msg.id);
checkTreeCollapse();
}

public void handleMarkerRejectionMsg(Message msg) throws Exception {
this.markerReceived += 1;
checkTreeCollapse();
}

public void handleSnapshotResetMsg(Message msg) throws Exception {
if (this.color == Color.BLUE) return;
synchronized (node) {
System.out.println("[SNAPSHOT] Snapshot Reset");
this.reset();
}

for (Map.Entry<Integer, Socket> entry : node.idToChannelMap.entrySet()) {
if (entry.getKey() == 0 || msg.parents.contains(entry.getKey())) continue;
Socket channel = entry.getValue();

Set<Integer> parents = new HashSet<>(msg.parents);
parents.add(this.node.id);
Message resetMsg = new Message(msg.message, parents);
synchronized (node) {
Client.sendMsg(resetMsg, channel, node);
}
}
}

public void handleMarkerRepliesFromChild(Message msg) throws Exception {
this.localSs.putAll(msg.localSs);

this.msgSent += msg.msgSent;
this.msgReceived += msg.msgReceived;

if (msg.state == true) {
this.state = true;
}

this.markerReceived++;
System.out.println("[ACCEPTED] MARKER message from Node " + msg.id);
printStatus();
checkTreeCollapse();
};

public void checkTreeCollapse() throws Exception {
System.out.println("[COLLAPSE] Tree collapse at Node-" + node.id);
if (markersSent == markerReceived) {
this.localSs.put(node.id, node.clock);
this.msgSent += node.msgSent;
this.msgReceived += node.msgReceived;
if (node.state == true) {
System.out.println("[ALERT] Node is still active");
this.state = true;
}

genOutput(node.id, node.clock);

if (node.id == 0) {
handleConvergence();
return;
}
Message markerReplyMsg = new Message(
node.id,
localSs,
state,
msgSent,
msgReceived);
Client.sendMsg(markerReplyMsg, node.idToChannelMap.get(parentId), node);
}

}

public void handleConvergence() throws Exception {
System.out.println("=============== Convergence ===============");
System.out.println("Snapshots(Local): " + localSs);
System.out.println("Messages sent: " + msgSent);
System.out.println("Messages received: " + msgReceived);
System.out.println("States gathered: " + state);
System.out.println("=============================================\n");
verifyConsistency(localSs, node.totalNodes);
this.initSnapshotReset();
}

public void initSnapshotReset() throws Exception {
System.out.println("[INIT] Snapshot Reset");

this.color = Color.BLUE;
Boolean flag = false;

for (Map.Entry<Integer, Socket> entry : node.idToChannelMap.entrySet()) {
Socket channel = entry.getValue();
String msgText;
if (this.state == true || this.msgSent != this.msgReceived) {
msgText = "System not terminated";
} else {
msgText = "System terminated";
flag = true;
}
Set<Integer> parents = new HashSet<>();
parents.add(0);
Message msg = new Message(msgText, parents);
synchronized (node) {
Client.sendMsg(msg, channel, node);
}
}
this.reset();
if (node.id == 0 && !flag) {
System.out.println("[SNAPSHOT] Not Terminated");
try {
System.out.println("[SNAPSHOT] Process delayed for " + node.snapshotDelay);
Thread.sleep(this.node.snapshotDelay);
initSpanningTree();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println("[SNAPSHOT]: Terminated");
}
}

public void reset() {
this.state = false;
this.color = Color.BLUE;
this.markerReceived = 0;
this.markersSent = 0;
this.msgSent = 0;
this.msgReceived = 0;
this.localSs = new HashMap<>();
}

public static void verifyConsistency(Map<Integer, Vector<Integer>> gatheredlocalSs, int n) {
boolean flag = true;
for (Map.Entry<Integer, Vector<Integer>> entry : gatheredlocalSs.entrySet()) {
int curr = entry.getKey();
for (int i = 0; i < n; i++) {
if (gatheredlocalSs.containsKey(i)) {
int ref = gatheredlocalSs.get(i).get(i);
for (int j = 0; j < n; j++) {
if (gatheredlocalSs.containsKey(j)) {
if (gatheredlocalSs.get(j).get(i) > ref) {
flag = false;
}
}
}
}
}
}

System.out.println("================================");
System.out.println("Conistency: " + (flag ? "VERIFIED" : "INVALID"));
System.out.println("================================\n");
}

public static void genOutput(int nodeId, Vector<Integer> clock) throws Exception {
String filename = "config-" + nodeId + ".out";

FileOutputStream stream = new FileOutputStream(filename, true);
PrintWriter writer = new PrintWriter(stream);

for (Integer i : clock) {
writer.print(i + " ");
}
writer.println();

writer.close();
stream.close();
}
}
146 changes: 146 additions & 0 deletions Client.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import java.util.*;
import java.io.*;
import java.net.*;

public class Client {

List<Socket> channelList;
Node node;

public Client(Node node) {
this.node = node;
synchronized (node) {
this.channelList = connectChannels(node);
}
}

public List<Socket> connectChannels(Node node) {
System.out.println("[CLIENT] Making channel array...");
List<Socket> channelList = new ArrayList<>();
for (Integer neighbour : node.neighbours.get(node.id)) {
String host = node.getHost(neighbour);
int port = node.getPort(neighbour);
try {
Socket client = new Socket();
client.connect(new InetSocketAddress(host, port));
client.setKeepAlive(true);
channelList.add(client);
node.idToChannelMap.put(node.hostToId_PortMap.get(host).get(0), client);
System.out.println("[CLIENT] Connected to " + host + ":" + port);
} catch (IOException error) {
System.out.println("[CLIENT] Unable to connect to " + host + ":" + port);
}
}
return channelList;
}

public void mapProtocol() {
while (true) {
if (node.msgSent >= node.maxNumber) {
System.out.println("[CLIENT] Node state (ACTIVE -> PASSIVE) permanently...");
node.state = false;
sendCustomEnd();
node.pem_passive = true;
break;
}

if (node.state == true) {
Random random = new Random();
int count = random.nextInt(node.maxPerActive - node.minPerActive + 1) + node.minPerActive;
synchronized (node) {
sendBulkMsg(count);
}
} else {
try {
System.out.println("[CLIENT] Node state (ACTIVE -> PASSIVE) after sending " + node.msgSent + "/"
+ node.maxNumber + " messages");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public void sendBulkMsg(int count) {

// System.out.println(String.format("Sending %d messages to neighbours...",
// count));
Random random = new Random();

for (int i = 0; i < count; i++) {
if (node.msgSent >= node.maxNumber) {
node.state = false;
break;
}
int randomNumber = random.nextInt(this.channelList.size());
int destination = node.neighbours.get(node.id).get(randomNumber);
// System.out.println("Sent a message to " + destination);
node.sndClk.set(destination, node.sndClk.get(destination) + 1);
Socket channel = channelList.get(randomNumber);
String messageString = "Hello " + node.name + " (" + node.msgSent + 1 + "/" + node.maxNumber + ")";
Message msg = new Message(node.id, node.clock, messageString);
Client.sendMsg(msg, channel, node);

try {
Thread.sleep(node.minSendDelay);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
node.changeState();

}

public void sendCustomEnd() {
for (Socket channel : this.channelList) {
Message msg = new Message(node.id, node.id);
Client.sendMsg(msg, channel, node);
}

try {
node.pem_passive = true;
Thread.sleep(node.minSendDelay);
if (node.pem_passive && node.custom_end == node.neighbours.get(node.id).size())
node.printNodeVectorClock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void sendMsg(Message msg, Socket channel, Node node) {
try {
OutputStream outStream = channel.getOutputStream();
DataOutputStream dataOut = new DataOutputStream(outStream);

byte[] msgBytes = msg.toMessageBytes();
dataOut.writeInt(msgBytes.length);
dataOut.write(msgBytes); // Send message
dataOut.flush();

if (msg.messageType == MessageType.APPLICATION) {
int prevEntry = node.clock.get(node.id);
node.clock.set(node.id, prevEntry + 1);
node.msgSent++;
}
} catch (IOException error) {
error.printStackTrace();
}
}

public void init() {
Thread client = new Thread(() -> {
System.out.println("[CLIENT] Starting...");
try {
if (node.id == 0) {
node.changeState();
}
System.out.println("[CLIENT] Init Map Protocol...");
node.client.mapProtocol();
} catch (Exception e) {
e.printStackTrace();
}
});
client.start();
}
}
Loading

0 comments on commit fee5c56

Please sign in to comment.