Skip to content

Commit

Permalink
Updated Client and added functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Leo-Malay committed Oct 25, 2024
1 parent b31bf08 commit 3aa6870
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 123 deletions.
179 changes: 81 additions & 98 deletions Client.java
Original file line number Diff line number Diff line change
@@ -1,132 +1,69 @@
import java.util.*;
import java.io.*;
import java.net.*;

public class Client {

List<Socket> channelList;
Node node;

public Client(Node node) {
this.node = node;
synchronized (node) {
this.channelList = connectChannels(node);
connectChannels(node);
}
}

public List<Socket> connectChannels(Node node) {
public void connectChannels(Node node) {
System.out.println("[CLIENT] Making channel array...");
List<Socket> channelList = new ArrayList<>();
for (int i = 1; i <= node.totalNodes; i++)
{
if (i == node.id) continue;
for (int i = 1; i <= node.totalNodes; i++) {
if (i == node.id)
continue;
String host = node.getHost(i);
int port = node.getPort(i);
try {
Socket client = new Socket();
client.connect(new InetSocketAddress(host, port));
client.setKeepAlive(true);
channelList.add(client);
node.idToChannelMap.put(node.hostToId_PortMap.get(host).get(0), client);
System.out.println("[CLIENT] Connected to " + host + ":" + port);
} catch (IOException error) {
System.out.println("[CLIENT] Unable to connect to " + host + ":" + port);
}
}
return channelList;
}

public void mapProtocol() {
while (true) {
if (node.msgSent >= node.maxNumber) {
System.out.println("[CLIENT] Node state (ACTIVE -> PASSIVE) permanently...");
node.state = false;
sendCustomEnd();
node.pem_passive = true;
break;
}

if (node.state == true) {
Random random = new Random();
int count = random.nextInt(node.maxPerActive - node.minPerActive + 1) + node.minPerActive;
synchronized (node) {
sendBulkMsg(count);
}
} else {
try {
System.out.println("[CLIENT] Node state (ACTIVE -> PASSIVE) after sending " + node.msgSent + "/"
+ node.maxNumber + " messages");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void enterCS() {
// Start critical section
synchronized (node) {
node.under_cs = true;
}
executeCS();
}

public void sendBulkMsg(int count) {

// System.out.println(String.format("Sending %d messages to neighbours...",
// count));
Random random = new Random();

for (int i = 0; i < count; i++) {
if (node.msgSent >= node.maxNumber) {
node.state = false;
break;
}
int randomNumber = random.nextInt(this.channelList.size());
int destination = node.neighbours.get(node.id).get(randomNumber);
// System.out.println("Sent a message to " + destination);
node.sndClk.set(destination, node.sndClk.get(destination) + 1);
Socket channel = channelList.get(randomNumber);
String messageString = "Hello " + node.name + " (" + node.msgSent + 1 + "/" + node.maxNumber + ")";
Message msg = new Message(node.id, node.clock, messageString);
Client.sendMsg(msg, channel, node);

try {
Thread.sleep(node.minSendDelay);
} catch (InterruptedException e) {
e.printStackTrace();
}
/** Function to perform critical section */
public void executeCS() {
try {
Thread.sleep(node.executionTime);
leaveCS();
} catch (InterruptedException e) {
e.printStackTrace();
}
node.changeState();

}

public static void sendMsg(Message msg, Socket channel, Node node) {
try {
OutputStream outStream = channel.getOutputStream();
DataOutputStream dataOut = new DataOutputStream(outStream);

byte[] msgBytes = msg.toMessageBytes();
dataOut.writeInt(msgBytes.length);
dataOut.write(msgBytes); // Send message
dataOut.flush();

if (msg.messageType == MessageType.APPLICATION) {
int prevEntry = node.clock.get(node.id);
node.clock.set(node.id, prevEntry + 1);
node.msgSent++;
}
} catch (IOException error) {
error.printStackTrace();
public void leaveCS() {
synchronized (node) {
node.under_cs = false;
}
}

// ---
public static void csEnter() {}
public static void executeCS() {}
public static void csLeave() {}

public static void requestKeys(int nodeId) {
/** Function to request key from respective process */
public void requestKey(int nodeId) {
// Send Msg to Specific Channel.
Socket channel = node.idToChannelMap.get(nodeId);
Message req_msg = new Message(MessageType.REQUEST, node.id, 0, -1);
try {
OutputStream outStream = channel.getOutputStream();
DataOutputStream dataOut = new DataOutputStream(outStream);

byte[] msgBytes = msg.toMessageBytes();
byte[] msgBytes = req_msg.toMessageBytes();
dataOut.writeInt(msgBytes.length);
dataOut.write(msgBytes);
dataOut.flush();
Expand All @@ -135,27 +72,73 @@ public static void requestKeys(int nodeId) {
}
}

public static boolean checkKeys() {
/** Function to send the key to the respective process */
public void sendKey(Message msg, boolean needBack) {
if (node.keys.contains(msg.key)) {
synchronized (node) {
try {
while (node.keys.contains(msg.key)) {
node.keys.remove(msg.key);
}

Message reply_msg = new Message(needBack ? MessageType.BOTH : MessageType.REPLY, node.id, 0,
node.id);
Socket channel = node.idToChannelMap.get(msg.id);

OutputStream outStream = channel.getOutputStream();
DataOutputStream dataOut = new DataOutputStream(outStream);

byte[] msgBytes = reply_msg.toMessageBytes();
dataOut.writeInt(msgBytes.length);
dataOut.write(msgBytes);
dataOut.flush();

System.out.println("[CLIENT]: Key sent to Node-" + msg.id);
} catch (IOException error) {
error.printStackTrace();
}
}
} else {
System.out.println("[CLIENT]: Error.. cant send the key if not present");
}
}

/**
* Function to check if all keys for entering critical section is present
*/
public boolean checkKeys() {
boolean hasAllKeys = true;
for (int i = 1; i <= node.totalNodes; i++) {
if (i == node.id) continue;
if (!node.keys.contains(i)) {
requestKey(i);
hasAllKeys = false;
synchronized (node) {
for (int i = 1; i <= node.totalNodes; i++) {
if (i == node.id)
continue;
if (!node.keys.contains(i)) {
requestKey(i);
hasAllKeys = false;
}
}
}
return hasAllKeys;
}
// ---

// ---
public void init() {
Thread client = new Thread(() -> {
System.out.println("[CLIENT] Starting...");
try {
if (node.id == 0) {
node.changeState();
while (node.requestSent < node.maxRequest) {
try {
Thread.sleep(node.requestDelay);
} catch (InterruptedException e) {
e.printStackTrace();
}

enterCS();
synchronized (node) {
node.requestSent += 1;
}
}
System.out.println("[CLIENT] Init Map Protocol...");
node.client.mapProtocol();
System.out.println("[CLIENT]: All request for CS has been sent");
} catch (Exception e) {
e.printStackTrace();
}
Expand Down
15 changes: 7 additions & 8 deletions Message.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
import java.io.*;
import java.util.*;

enum MessageType {
REQUEST, REPLY
REQUEST, REPLY, BOTH,
};

public class Message implements Serializable {
public int id = -1;
public MessageType messageType;
public MessageType msgType;
public int clock;
String key;
public Message(MessageType type, int id, Vector<Integer> timestamp) {
this.messageType = type;
public int key = -1;

public Message(MessageType type, int id, int clock, int key) {
this.msgType = type;
this.id = id;
this.clock = timestamp;
this.clock = clock;
this.key = key;
}

Expand Down
23 changes: 11 additions & 12 deletions Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.*;

public class Node {
Expand All @@ -18,9 +19,10 @@ public class Node {
// Variable
int requestSent = 0;
boolean under_cs = false;
boolean pending_req = false;
boolean end_flag = false;
Vector<Integer> keys = new Vector<>();
Vector<Integer> clock = new Vector<>();
int clock = 0;

// Components
Server server;
Expand All @@ -39,16 +41,14 @@ public Node(int id) {
public static void main(String[] args) {
// Init Node
Node node;
// if (args.length > 0)
// node = new Node(Integer.parseInt(args[0]));
// else
node = new Node(-1);
// Parse the config file
node.readConfig();
// Init Keys
node.initKeys();
// Print details
node.printNodeConfig();
node.printNodeNeighbours();
node.printNodeKeys();

// Server
Expand Down Expand Up @@ -143,7 +143,7 @@ public void initKeys() {
this.keys.add(i);
}
}

public String getHost() {
return idToHost_PortMap.get(id).get(0);
}
Expand Down Expand Up @@ -172,21 +172,20 @@ public void printNodeConfig() {
System.out.println("Max # of Request: " + maxRequest);
System.out.println("=====================================\n");
}

public void printNodeKeys() {
System.out.println("============= Node Keys =============");
for (Integer x : keys) {
System.out.print(x + ", ");
}
System.out.println("=====================================\n");
}
public void printNodeVectorClock() {
int totalSent = 0, totalReceive = 0;
System.out.println("========= Node Vector Clock =========");
for (int i = 0; i < totalNodes; i++) {
System.out.println("NodeId: " + i + " | Msg: " + clock.get(i));

public void printNodeNeighbours() {
System.out.println("============= Node Neighbours =============");
for (Integer x : keys) {
System.out.println("Node-" + x + " | " + getHost(x) + "::" + getPort(x));
}
System.out.println("Total Send: " + totalSent + " | Total Recieve: " + totalReceive + " | Diff: "
+ (totalSent - totalReceive));
System.out.println("=====================================\n");
}

Expand Down
Loading

0 comments on commit 3aa6870

Please sign in to comment.