diff --git a/Client.java b/Client.java index 43544fc..b0baf23 100644 --- a/Client.java +++ b/Client.java @@ -1,132 +1,69 @@ -import java.util.*; import java.io.*; import java.net.*; public class Client { - - List channelList; Node node; public Client(Node node) { this.node = node; synchronized (node) { - this.channelList = connectChannels(node); + connectChannels(node); } } - public List connectChannels(Node node) { + public void connectChannels(Node node) { System.out.println("[CLIENT] Making channel array..."); - List channelList = new ArrayList<>(); - for (int i = 1; i <= node.totalNodes; i++) - { - if (i == node.id) continue; + for (int i = 1; i <= node.totalNodes; i++) { + if (i == node.id) + continue; String host = node.getHost(i); int port = node.getPort(i); try { Socket client = new Socket(); client.connect(new InetSocketAddress(host, port)); client.setKeepAlive(true); - channelList.add(client); node.idToChannelMap.put(node.hostToId_PortMap.get(host).get(0), client); System.out.println("[CLIENT] Connected to " + host + ":" + port); } catch (IOException error) { System.out.println("[CLIENT] Unable to connect to " + host + ":" + port); } } - return channelList; } - public void mapProtocol() { - while (true) { - if (node.msgSent >= node.maxNumber) { - System.out.println("[CLIENT] Node state (ACTIVE -> PASSIVE) permanently..."); - node.state = false; - sendCustomEnd(); - node.pem_passive = true; - break; - } - - if (node.state == true) { - Random random = new Random(); - int count = random.nextInt(node.maxPerActive - node.minPerActive + 1) + node.minPerActive; - synchronized (node) { - sendBulkMsg(count); - } - } else { - try { - System.out.println("[CLIENT] Node state (ACTIVE -> PASSIVE) after sending " + node.msgSent + "/" - + node.maxNumber + " messages"); - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } + public void enterCS() { + // Start critical section + synchronized (node) { + node.under_cs = true; } + executeCS(); } - public void sendBulkMsg(int count) { - - // System.out.println(String.format("Sending %d messages to neighbours...", - // count)); - Random random = new Random(); - - for (int i = 0; i < count; i++) { - if (node.msgSent >= node.maxNumber) { - node.state = false; - break; - } - int randomNumber = random.nextInt(this.channelList.size()); - int destination = node.neighbours.get(node.id).get(randomNumber); - // System.out.println("Sent a message to " + destination); - node.sndClk.set(destination, node.sndClk.get(destination) + 1); - Socket channel = channelList.get(randomNumber); - String messageString = "Hello " + node.name + " (" + node.msgSent + 1 + "/" + node.maxNumber + ")"; - Message msg = new Message(node.id, node.clock, messageString); - Client.sendMsg(msg, channel, node); - - try { - Thread.sleep(node.minSendDelay); - } catch (InterruptedException e) { - e.printStackTrace(); - } + /** Function to perform critical section */ + public void executeCS() { + try { + Thread.sleep(node.executionTime); + leaveCS(); + } catch (InterruptedException e) { + e.printStackTrace(); } - node.changeState(); - } - public static void sendMsg(Message msg, Socket channel, Node node) { - try { - OutputStream outStream = channel.getOutputStream(); - DataOutputStream dataOut = new DataOutputStream(outStream); - - byte[] msgBytes = msg.toMessageBytes(); - dataOut.writeInt(msgBytes.length); - dataOut.write(msgBytes); // Send message - dataOut.flush(); - - if (msg.messageType == MessageType.APPLICATION) { - int prevEntry = node.clock.get(node.id); - node.clock.set(node.id, prevEntry + 1); - node.msgSent++; - } - } catch (IOException error) { - error.printStackTrace(); + public void leaveCS() { + synchronized (node) { + node.under_cs = false; } } -// --- - public static void csEnter() {} - public static void executeCS() {} - public static void csLeave() {} - - public static void requestKeys(int nodeId) { + /** Function to request key from respective process */ + public void requestKey(int nodeId) { // Send Msg to Specific Channel. Socket channel = node.idToChannelMap.get(nodeId); + Message req_msg = new Message(MessageType.REQUEST, node.id, 0, -1); try { OutputStream outStream = channel.getOutputStream(); DataOutputStream dataOut = new DataOutputStream(outStream); - byte[] msgBytes = msg.toMessageBytes(); + byte[] msgBytes = req_msg.toMessageBytes(); dataOut.writeInt(msgBytes.length); dataOut.write(msgBytes); dataOut.flush(); @@ -135,27 +72,73 @@ public static void requestKeys(int nodeId) { } } - public static boolean checkKeys() { + /** Function to send the key to the respective process */ + public void sendKey(Message msg, boolean needBack) { + if (node.keys.contains(msg.key)) { + synchronized (node) { + try { + while (node.keys.contains(msg.key)) { + node.keys.remove(msg.key); + } + + Message reply_msg = new Message(needBack ? MessageType.BOTH : MessageType.REPLY, node.id, 0, + node.id); + Socket channel = node.idToChannelMap.get(msg.id); + + OutputStream outStream = channel.getOutputStream(); + DataOutputStream dataOut = new DataOutputStream(outStream); + + byte[] msgBytes = reply_msg.toMessageBytes(); + dataOut.writeInt(msgBytes.length); + dataOut.write(msgBytes); + dataOut.flush(); + + System.out.println("[CLIENT]: Key sent to Node-" + msg.id); + } catch (IOException error) { + error.printStackTrace(); + } + } + } else { + System.out.println("[CLIENT]: Error.. cant send the key if not present"); + } + } + + /** + * Function to check if all keys for entering critical section is present + */ + public boolean checkKeys() { boolean hasAllKeys = true; - for (int i = 1; i <= node.totalNodes; i++) { - if (i == node.id) continue; - if (!node.keys.contains(i)) { - requestKey(i); - hasAllKeys = false; + synchronized (node) { + for (int i = 1; i <= node.totalNodes; i++) { + if (i == node.id) + continue; + if (!node.keys.contains(i)) { + requestKey(i); + hasAllKeys = false; + } } } return hasAllKeys; } -// --- + + // --- public void init() { Thread client = new Thread(() -> { System.out.println("[CLIENT] Starting..."); try { - if (node.id == 0) { - node.changeState(); + while (node.requestSent < node.maxRequest) { + try { + Thread.sleep(node.requestDelay); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + enterCS(); + synchronized (node) { + node.requestSent += 1; + } } - System.out.println("[CLIENT] Init Map Protocol..."); - node.client.mapProtocol(); + System.out.println("[CLIENT]: All request for CS has been sent"); } catch (Exception e) { e.printStackTrace(); } diff --git a/Message.java b/Message.java index 8ebd399..c3699bc 100644 --- a/Message.java +++ b/Message.java @@ -1,20 +1,19 @@ import java.io.*; -import java.util.*; enum MessageType { - REQUEST, REPLY + REQUEST, REPLY, BOTH, }; public class Message implements Serializable { public int id = -1; - public MessageType messageType; + public MessageType msgType; public int clock; - String key; - - public Message(MessageType type, int id, Vector timestamp) { - this.messageType = type; + public int key = -1; + + public Message(MessageType type, int id, int clock, int key) { + this.msgType = type; this.id = id; - this.clock = timestamp; + this.clock = clock; this.key = key; } diff --git a/Node.java b/Node.java index 1ab8915..b9d0f69 100644 --- a/Node.java +++ b/Node.java @@ -3,6 +3,7 @@ import java.net.Socket; import java.net.UnknownHostException; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.regex.*; public class Node { @@ -18,9 +19,10 @@ public class Node { // Variable int requestSent = 0; boolean under_cs = false; + boolean pending_req = false; boolean end_flag = false; Vector keys = new Vector<>(); - Vector clock = new Vector<>(); + int clock = 0; // Components Server server; @@ -39,9 +41,6 @@ public Node(int id) { public static void main(String[] args) { // Init Node Node node; - // if (args.length > 0) - // node = new Node(Integer.parseInt(args[0])); - // else node = new Node(-1); // Parse the config file node.readConfig(); @@ -49,6 +48,7 @@ public static void main(String[] args) { node.initKeys(); // Print details node.printNodeConfig(); + node.printNodeNeighbours(); node.printNodeKeys(); // Server @@ -143,7 +143,7 @@ public void initKeys() { this.keys.add(i); } } - + public String getHost() { return idToHost_PortMap.get(id).get(0); } @@ -172,6 +172,7 @@ public void printNodeConfig() { System.out.println("Max # of Request: " + maxRequest); System.out.println("=====================================\n"); } + public void printNodeKeys() { System.out.println("============= Node Keys ============="); for (Integer x : keys) { @@ -179,14 +180,12 @@ public void printNodeKeys() { } System.out.println("=====================================\n"); } - public void printNodeVectorClock() { - int totalSent = 0, totalReceive = 0; - System.out.println("========= Node Vector Clock ========="); - for (int i = 0; i < totalNodes; i++) { - System.out.println("NodeId: " + i + " | Msg: " + clock.get(i)); + + public void printNodeNeighbours() { + System.out.println("============= Node Neighbours ============="); + for (Integer x : keys) { + System.out.println("Node-" + x + " | " + getHost(x) + "::" + getPort(x)); } - System.out.println("Total Send: " + totalSent + " | Total Recieve: " + totalReceive + " | Diff: " - + (totalSent - totalReceive)); System.out.println("=====================================\n"); } diff --git a/Server.java b/Server.java index 736dd0d..3652f4b 100644 --- a/Server.java +++ b/Server.java @@ -15,11 +15,53 @@ public void handleMessage(Message msg) { if (msg.id != -1) System.out.println("[SERVER] Message received from Node " + msg.id); // Message Handler - - if (msg.messageType == MessageType.REQUEST) { - // Handle incoming Request for key. - } else if (msg.messageType == MessageType.REPLY) { + if (msg.msgType == MessageType.REQUEST) { + // Handle incoming Request for key. + if (node.under_cs) { + // Adding to pending request. + synchronized (node) { + node.pendingRequest.put(msg.id, msg); + } + } else if (node.pending_req) { + // This is for when the request for CS is send + if (node.clock > msg.clock) { + // Send the key + node.client.sendKey(msg, true); + } else if (node.clock == msg.clock) { + if (node.id < msg.id) { + // Add to pending + synchronized (node) { + node.pendingRequest.put(msg.id, msg); + } + } else { + // Send the key + node.client.sendKey(msg, true); + } + } else { + // Add to pending + synchronized (node) { + node.pendingRequest.put(msg.id, msg); + } + } + } else { + // Sending the key + node.client.sendKey(msg, false); + } + } else if (msg.msgType == MessageType.REPLY) { // Handle incoming Reply with key. + if (!node.keys.contains(msg.key)) { + System.out.println("[SERVER]: Key " + msg.key + " received from Node-" + msg.id); + synchronized (node) { + node.keys.add(msg.key); + } + } else { + System.out.println("[SERVER]: Key " + msg.key + " somehow alrready exists!!"); + } + } else if (msg.msgType == MessageType.BOTH) { + synchronized (node) { + node.keys.add(msg.key); + node.pendingRequest.put(msg.id, msg); + } } } @@ -38,7 +80,6 @@ public void listen() { DataInputStream dataInputStream = new DataInputStream(clientInputStream); while (!client.isClosed()) { - try { // Reading Incoming Message. int length = dataInputStream.readInt();