-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathClient.java
192 lines (174 loc) · 6.52 KB
/
Client.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import java.io.*;
import java.net.*;
public class Client {
Node node;
public Client(Node node) {
this.node = node;
synchronized (node) {
connectChannels(node);
}
}
public void connectChannels(Node node) {
System.out.println("[CLIENT] Making channel array...");
for (int i = 0; i < node.totalNodes; i++) {
if (i == node.id)
continue;
String host = node.getHost(i);
int port = node.getPort(i);
try {
Socket client = new Socket();
client.connect(new InetSocketAddress(host, port));
client.setKeepAlive(true);
node.idToChannelMap.put(node.hostToId_PortMap.get(host).get(0), client);
System.out.println("[CLIENT] Connected to " + host + ":" + port);
} catch (IOException error) {
System.out.println("[CLIENT] Unable to connect to " + host + ":" + port);
}
}
}
public void enterCS() {
// Start critical section
synchronized (node) {
node.under_cs = true;
node.exp.recordStart();
}
// Check for keys until has all the keys
// System.out.println("[CLIENT] Checking if all the keys are present or not");
boolean send_req = true;
while (!checkKeys(send_req)) {
send_req = false;
}
// Entering the CS
System.out.println("[CLIENT] All Keys are present. Entering the CS");
executeCS();
}
/** Function to perform critical section */
public void executeCS() {
try {
// Saving the state to file.
node.writeState();
// int wait = 1 + (int) (Math.random() * (node.executionTime));
// Thread.sleep(wait);
Thread.sleep(node.executionTime);
node.exp.recordEnd();
leaveCS();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void leaveCS() {
// Critical Section is done.
synchronized (node) {
node.clock = Math.max(node.clock, node.pendingClock) + 1;
node.under_cs = false;
node.pending_req = false;
}
// Sending the keys to all the neighbours
node.pendingRequest.forEach((key, value) -> {
if (value != null) {
node.client.sendKey(value, false);
}
});
// Sent all the keys...clearing the queue
synchronized (node) {
node.pendingRequest.clear();
}
}
/** Function to request key from respective process */
public void requestKey(int nodeId) {
// Send Msg to Specific Channel.
Socket channel = node.idToChannelMap.get(nodeId);
Message req_msg = new Message(MessageType.REQUEST, node.id, node.clock, -1);
try {
OutputStream outStream = channel.getOutputStream();
DataOutputStream dataOut = new DataOutputStream(outStream);
byte[] msgBytes = req_msg.toMessageBytes();
dataOut.writeInt(msgBytes.length);
dataOut.write(msgBytes);
dataOut.flush();
node.exp.totalMessages += 1;
System.out.println("[CLIENT] Sent request to node-" + nodeId + " for key.");
} catch (IOException error) {
error.printStackTrace();
}
}
/** Function to send the key to the respective process */
public void sendKey(Message msg, boolean needBack) {
if (node.keys.contains(msg.id)) {
synchronized (node) {
try {
node.keys.remove(Integer.valueOf(msg.id));
// Asking for key back if required
Message reply_msg = new Message(needBack ? MessageType.BOTH : MessageType.REPLY, node.id,
node.clock,
node.id);
Socket channel = node.idToChannelMap.get(msg.id);
OutputStream outStream = channel.getOutputStream();
DataOutputStream dataOut = new DataOutputStream(outStream);
byte[] msgBytes = reply_msg.toMessageBytes();
dataOut.writeInt(msgBytes.length);
dataOut.write(msgBytes);
dataOut.flush();
// System.out.println("[CLIENT] Key sent to node-" + msg.id);
} catch (IOException error) {
error.printStackTrace();
}
}
} else {
System.out.println("[CLIENT] Error.. can not send the key if not present");
}
}
/**
* Function to check if all keys for entering critical section is present
*/
public boolean checkKeys(boolean send_req) {
boolean hasAllKeys = true;
// synchronized (node) {
for (int i = 0; i < node.totalNodes; i++) {
if (i == node.id)
continue;
if (!node.keys.contains(i)) {
if (hasAllKeys && node.pending_req == false) {
synchronized (node) {
node.clock += 1;
node.pending_req = true;
}
}
if (send_req)
requestKey(i);
hasAllKeys = false;
}
}
// }
return hasAllKeys;
}
// ---
public void init() {
Thread client = new Thread(() -> {
System.out.println("[CLIENT] Starting...");
try {
while (node.requestSent < node.maxRequest) {
if (node.pending_req || node.under_cs)
continue;
try {
// int wait = 1 + (int) (Math.random() * (node.requestDelay));
// Thread.sleep(wait);
Thread.sleep(node.requestDelay);
} catch (InterruptedException e) {
e.printStackTrace();
}
enterCS();
synchronized (node) {
node.requestSent += 1;
}
System.out.println("[CLIENT] Request for CS #" + node.requestSent + " is completed");
}
node.exp.write();
System.out.println("[CLIENT] All request for CS has been sent");
} catch (Exception e) {
e.printStackTrace();
}
});
client.start();
}
}