diff --git a/ChandyLamport.java b/ChandyLamport.java deleted file mode 100644 index 27552ba..0000000 --- a/ChandyLamport.java +++ /dev/null @@ -1,246 +0,0 @@ -import java.util.*; -import java.io.*; -import java.net.*; - -enum Color { - BLUE, RED -}; - -public class ChandyLamport { - public Node node; - public int parentId; - - public int markersSent = 0; - public int markerReceived = 0; - - public int msgSent = 0; - public int msgReceived = 0; - - public Color color; - - public Map> localSs = new HashMap<>(); - public boolean state; - - public ChandyLamport(Node node) { - this.node = node; - this.color = Color.BLUE; - this.state = false; - } - - public void initSpanningTree() throws Exception { - System.out.println("[INIT] Snapshot Spanning process at NODE: " + this.node.id); - - this.color = Color.RED; - - this.printStatus(); - - for (Map.Entry entry : node.idToChannelMap.entrySet()) { - Socket channel = entry.getValue(); - Message msg = new Message(node.id); - System.out.println("[CL] Sending " + msg.messageType + " to Node" + entry.getKey()); - Client.sendMsg(msg, channel, node); - this.markersSent += 1; - } - } - - public void printStatus() { - System.out.println("========== Snapshot Status =========="); - System.out.println("Color: " + color); - System.out.println("MARKERS Sent: " + markersSent); - System.out.println("MARKERS Received:" + markerReceived); - System.out.println("====================================\n"); - } - - public void handleMarkerMessageFromParent(Message msg) throws Exception { - if (this.color == Color.RED) { - System.out.println("[REJECTED] MARKER message from Node " + msg.id); - Message rejectMarker = new Message(); - Socket channel = this.node.idToChannelMap.get(msg.id); - synchronized(node) { - Client.sendMsg(rejectMarker, channel, node); - } - return; - } - - this.color = Color.RED; - this.parentId = msg.id; - - for (Map.Entry entry : node.idToChannelMap.entrySet()) { - Socket channel = entry.getValue(); - - Message msg = new Message(node.id); - synchronized (node) { - Client.sendMsg(msg, channel, node); - this.markersSent++; - } - } - - System.out.println("[ACCEPTED] MARKER message from Node " + msg.id); - checkTreeCollapse(); - } - - public void handleMarkerRejectionMsg(Message msg) throws Exception { - this.markerReceived += 1; - checkTreeCollapse(); - } - - public void handleSnapshotResetMsg(Message msg) throws Exception { - if (this.color == Color.BLUE) return; - synchronized (node) { - System.out.println("[SNAPSHOT] Snapshot Reset"); - this.reset(); - } - - for (Map.Entry entry : node.idToChannelMap.entrySet()) { - if (entry.getKey() == 0 || msg.parents.contains(entry.getKey())) continue; - Socket channel = entry.getValue(); - - Set parents = new HashSet<>(msg.parents); - parents.add(this.node.id); - Message resetMsg = new Message(msg.message, parents); - synchronized (node) { - Client.sendMsg(resetMsg, channel, node); - } - } - } - - public void handleMarkerRepliesFromChild(Message msg) throws Exception { - this.localSs.putAll(msg.localSs); - - this.msgSent += msg.msgSent; - this.msgReceived += msg.msgReceived; - - if (msg.state == true) { - this.state = true; - } - - this.markerReceived++; - System.out.println("[ACCEPTED] MARKER message from Node " + msg.id); - printStatus(); - checkTreeCollapse(); - }; - - public void checkTreeCollapse() throws Exception { - System.out.println("[COLLAPSE] Tree collapse at Node-" + node.id); - if (markersSent == markerReceived) { - this.localSs.put(node.id, node.clock); - this.msgSent += node.msgSent; - this.msgReceived += node.msgReceived; - if (node.state == true) { - System.out.println("[ALERT] Node is still active"); - this.state = true; - } - - genOutput(node.id, node.clock); - - if (node.id == 0) { - handleConvergence(); - return; - } - Message markerReplyMsg = new Message( - node.id, - localSs, - state, - msgSent, - msgReceived); - Client.sendMsg(markerReplyMsg, node.idToChannelMap.get(parentId), node); - } - - } - - public void handleConvergence() throws Exception { - System.out.println("=============== Convergence ==============="); - System.out.println("Snapshots(Local): " + localSs); - System.out.println("Messages sent: " + msgSent); - System.out.println("Messages received: " + msgReceived); - System.out.println("States gathered: " + state); - System.out.println("=============================================\n"); - verifyConsistency(localSs, node.totalNodes); - this.initSnapshotReset(); - } - - public void initSnapshotReset() throws Exception { - System.out.println("[INIT] Snapshot Reset"); - - this.color = Color.BLUE; - Boolean flag = false; - - for (Map.Entry entry : node.idToChannelMap.entrySet()) { - Socket channel = entry.getValue(); - String msgText; - if (this.state == true || this.msgSent != this.msgReceived) { - msgText = "System not terminated"; - } else { - msgText = "System terminated"; - flag = true; - } - Set parents = new HashSet<>(); - parents.add(0); - Message msg = new Message(msgText, parents); - synchronized (node) { - Client.sendMsg(msg, channel, node); - } - } - this.reset(); - if (node.id == 0 && !flag) { - System.out.println("[SNAPSHOT] Not Terminated"); - try { - System.out.println("[SNAPSHOT] Process delayed for " + node.snapshotDelay); - Thread.sleep(this.node.snapshotDelay); - initSpanningTree(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } else { - System.out.println("[SNAPSHOT]: Terminated"); - } - } - - public void reset() { - this.state = false; - this.color = Color.BLUE; - this.markerReceived = 0; - this.markersSent = 0; - this.msgSent = 0; - this.msgReceived = 0; - this.localSs = new HashMap<>(); - } - - public static void verifyConsistency(Map> gatheredlocalSs, int n) { - boolean flag = true; - for (Map.Entry> entry : gatheredlocalSs.entrySet()) { - int curr = entry.getKey(); - for (int i = 0; i < n; i++) { - if (gatheredlocalSs.containsKey(i)) { - int ref = gatheredlocalSs.get(i).get(i); - for (int j = 0; j < n; j++) { - if (gatheredlocalSs.containsKey(j)) { - if (gatheredlocalSs.get(j).get(i) > ref) { - flag = false; - } - } - } - } - } - } - - System.out.println("================================"); - System.out.println("Conistency: " + (flag ? "VERIFIED" : "INVALID")); - System.out.println("================================\n"); - } - - public static void genOutput(int nodeId, Vector clock) throws Exception { - String filename = "config-" + nodeId + ".out"; - - FileOutputStream stream = new FileOutputStream(filename, true); - PrintWriter writer = new PrintWriter(stream); - - for (Integer i : clock) { - writer.print(i + " "); - } - writer.println(); - - writer.close(); - stream.close(); - } -} \ No newline at end of file diff --git a/Message.java b/Message.java index eb772c7..8ebd399 100644 --- a/Message.java +++ b/Message.java @@ -8,12 +8,14 @@ enum MessageType { public class Message implements Serializable { public int id = -1; public MessageType messageType; - public Vector clock; + public int clock; + String key; public Message(MessageType type, int id, Vector timestamp) { this.messageType = type; this.id = id; this.clock = timestamp; + this.key = key; } public byte[] toMessageBytes() throws IOException { diff --git a/Node.java b/Node.java index 4b849aa..f822d55 100644 --- a/Node.java +++ b/Node.java @@ -10,7 +10,6 @@ public class Node { int id; String name; String port; - List> neighbours = new ArrayList<>(); // Config int totalNodes; int requestDelay; @@ -44,11 +43,11 @@ public static void main(String[] args) { node = new Node(-1); // Parse the config file node.readConfig(); - // Init Vector Clock; - node.initVectorClock(); + // Init Keys + node.initKeys(); // Print details node.printNodeConfig(); - node.printNodeNeighbours(); + node.printNodeKeys(); // Server node.server = new Server(node.getPort(), node); @@ -137,6 +136,11 @@ public void readConfig() { } } + public void initKeys() { + for (int i = id + 1; i <= totalNodes; i++) { + this.keys.add(i); + } + } public String getHost() { return idToHost_PortMap.get(id).get(0); } @@ -181,6 +185,13 @@ public void printNodeConfig() { System.out.println("Max # of Request: " + maxRequest); System.out.println("=====================================\n"); } + public void printNodeKeys() { + System.out.println("============= Node Keys ============="); + for (Integer x : keys) { + System.out.print(x + ", "); + } + System.out.println("=====================================\n"); + } public void printNodeVectorClock() { int totalSent = 0, totalReceive = 0;