Skip to content

Commit

Permalink
Added Functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Leo-Malay committed Oct 24, 2024
1 parent cbb00aa commit b31bf08
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 36 deletions.
57 changes: 38 additions & 19 deletions Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ public Client(Node node) {
public List<Socket> connectChannels(Node node) {
System.out.println("[CLIENT] Making channel array...");
List<Socket> channelList = new ArrayList<>();
for (Integer neighbour : node.neighbours.get(node.id)) {
String host = node.getHost(neighbour);
int port = node.getPort(neighbour);
for (int i = 1; i <= node.totalNodes; i++)
{
if (i == node.id) continue;
String host = node.getHost(i);
int port = node.getPort(i);
try {
Socket client = new Socket();
client.connect(new InetSocketAddress(host, port));
Expand Down Expand Up @@ -92,22 +94,6 @@ public void sendBulkMsg(int count) {

}

public void sendCustomEnd() {
for (Socket channel : this.channelList) {
Message msg = new Message(node.id, node.id);
Client.sendMsg(msg, channel, node);
}

try {
node.pem_passive = true;
Thread.sleep(node.minSendDelay);
if (node.pem_passive && node.custom_end == node.neighbours.get(node.id).size())
node.printNodeVectorClock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void sendMsg(Message msg, Socket channel, Node node) {
try {
OutputStream outStream = channel.getOutputStream();
Expand All @@ -128,6 +114,39 @@ public static void sendMsg(Message msg, Socket channel, Node node) {
}
}

// ---
public static void csEnter() {}
public static void executeCS() {}
public static void csLeave() {}

public static void requestKeys(int nodeId) {
// Send Msg to Specific Channel.
Socket channel = node.idToChannelMap.get(nodeId);
try {
OutputStream outStream = channel.getOutputStream();
DataOutputStream dataOut = new DataOutputStream(outStream);

byte[] msgBytes = msg.toMessageBytes();
dataOut.writeInt(msgBytes.length);
dataOut.write(msgBytes);
dataOut.flush();
} catch (IOException error) {
error.printStackTrace();
}
}

public static boolean checkKeys() {
boolean hasAllKeys = true;
for (int i = 1; i <= node.totalNodes; i++) {
if (i == node.id) continue;
if (!node.keys.contains(i)) {
requestKey(i);
hasAllKeys = false;
}
}
return hasAllKeys;
}
// ---
public void init() {
Thread client = new Thread(() -> {
System.out.println("[CLIENT] Starting...");
Expand Down
20 changes: 3 additions & 17 deletions Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class Node {
// Variable
int requestSent = 0;
boolean under_cs = false;
boolean end_flag = false;
Vector<Integer> keys = new Vector<>();
Vector<Integer> clock = new Vector<>();

Expand All @@ -29,6 +30,7 @@ public class Node {
Map<String, List<Integer>> hostToId_PortMap = new HashMap<>();
Map<Integer, List<String>> idToHost_PortMap = new HashMap<>();
Map<Integer, Socket> idToChannelMap = new HashMap<>();
ConcurrentHashMap<Integer, Message> pendingRequest;

public Node(int id) {
this.id = id;
Expand Down Expand Up @@ -141,6 +143,7 @@ public void initKeys() {
this.keys.add(i);
}
}

public String getHost() {
return idToHost_PortMap.get(id).get(0);
}
Expand All @@ -157,22 +160,6 @@ public int getPort(int id) {
return Integer.parseInt(idToHost_PortMap.get(id).get(1));
}

public void initVectorClock() {
for (int i = 0; i < totalNodes; i++) {
this.clock.add(0);
}
}

/**
* This function is for Entering the critical section.
*/
public void enterCS(){}

/**
* This function is for Leaving the critical section.
*/
public void leaveCS(){}

/* ========== HELPER FUNCTIONS ========== */
public void printNodeConfig() {
System.out.println("============ Node Config ============");
Expand All @@ -192,7 +179,6 @@ public void printNodeKeys() {
}
System.out.println("=====================================\n");
}

public void printNodeVectorClock() {
int totalSent = 0, totalReceive = 0;
System.out.println("========= Node Vector Clock =========");
Expand Down

0 comments on commit b31bf08

Please sign in to comment.