-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasync_tcp_client.cpp
307 lines (253 loc) · 7.79 KB
/
async_tcp_client.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
#include <unistd.h>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
using boost::asio::deadline_timer;
using boost::asio::ip::tcp;
class client
{
public:
client(boost::asio::io_service& io_service)
: stopped_(false),
socket_(io_service),
deadline_(io_service),
heartbeat_timer_(io_service),
input_(io_service, ::dup(STDIN_FILENO)),
console_buffer_(100000)
{
}
// Called by the user of the client class to initiate the connection process.
// The endpoint iterator will have been obtained using a tcp::resolver.
void start(tcp::resolver::iterator endpoint_iter)
{
// Start the connect actor.
start_connect(endpoint_iter);
// Start the read console actor
start_read_console();
// Start the deadline actor. You will note that we're not setting any
// particular deadline here. Instead, the connect and input actors will
// update the deadline prior to each asynchronous operation.
deadline_.async_wait(boost::bind(&client::check_deadline, this));
}
// This function terminates all the actors to shut down the connection. It
// may be called by the user of the client class, or by the class itself in
// response to graceful termination or an unrecoverable error.
void stop()
{
stopped_ = true;
boost::system::error_code ignored_ec;
socket_.close(ignored_ec);
deadline_.cancel();
heartbeat_timer_.cancel();
}
private:
void start_connect(tcp::resolver::iterator endpoint_iter)
{
if (endpoint_iter != tcp::resolver::iterator())
{
std::cout << "Trying " << endpoint_iter->endpoint() << "...\n";
// Set a deadline for the connect operation.
deadline_.expires_from_now(boost::posix_time::seconds(60));
// Start the asynchronous connect operation.
socket_.async_connect(endpoint_iter->endpoint(),
boost::bind(&client::handle_connect,
this, _1, endpoint_iter));
}
else
{
// There are no more endpoints to try. Shut down the client.
stop();
}
}
void handle_connect(const boost::system::error_code& ec,
tcp::resolver::iterator endpoint_iter)
{
if (stopped_)
return;
// The async_connect() function automatically opens the socket at the start
// of the asynchronous operation. If the socket is closed at this time then
// the timeout handler must have run first.
if (!socket_.is_open())
{
std::cout << "Connect timed out\n";
// Try the next available endpoint.
start_connect(++endpoint_iter);
}
// Check if the connect operation failed before the deadline expired.
else if (ec)
{
std::cout << "Connect error: " << ec.message() << "\n";
// We need to close the socket used in the previous connection attempt
// before starting a new one.
socket_.close();
// Try the next available endpoint.
start_connect(++endpoint_iter);
}
// Otherwise we have successfully established a connection.
else
{
std::cout << "Connected to " << endpoint_iter->endpoint() << "\n";
// Start the input actor.
start_read();
// Start the heartbeat actor.
start_write();
}
}
void start_read()
{
// Set a deadline for the read operation.
deadline_.expires_from_now(boost::posix_time::seconds(30));
// Start an asynchronous operation to read a newline-delimited message.
boost::asio::async_read_until(socket_, input_buffer_, '\n',
boost::bind(&client::handle_read, this, _1));
}
void handle_read(const boost::system::error_code& ec)
{
if (stopped_)
return;
if (!ec)
{
// Extract the newline-delimited message from the buffer.
std::string line;
std::istream is(&input_buffer_);
std::getline(is, line);
// Empty messages are heartbeats and so ignored.
if (!line.empty())
{
std::cout << "Received: " << line << "\n";
}
start_read();
}
else
{
std::cout << "Error on receive: " << ec.message() << "\n";
stop();
}
}
void start_write()
{
if (stopped_)
return;
// Start an asynchronous operation to send a heartbeat message.
boost::asio::async_write(socket_, boost::asio::buffer("\n", 1),
boost::bind(&client::handle_write, this, _1));
}
void handle_write(const boost::system::error_code& ec)
{
if (stopped_)
return;
if (!ec)
{
// Wait 10 seconds before sending the next heartbeat.
heartbeat_timer_.expires_from_now(boost::posix_time::seconds(10));
heartbeat_timer_.async_wait(boost::bind(&client::start_write, this));
}
else
{
std::cout << "Error on heartbeat: " << ec.message() << "\n";
stop();
}
}
void check_deadline()
{
if (stopped_)
return;
// Check whether the deadline has passed. We compare the deadline against
// the current time since a new asynchronous operation may have moved the
// deadline before this actor had a chance to run.
if (deadline_.expires_at() <= deadline_timer::traits_type::now())
{
// The deadline has passed. The socket is closed so that any outstanding
// asynchronous operations are cancelled.
socket_.close();
// There is no longer an active deadline. The expiry is set to positive
// infinity so that the actor takes no action until a new deadline is set.
deadline_.expires_at(boost::posix_time::pos_infin);
}
// Put the actor back to sleep.
deadline_.async_wait(boost::bind(&client::check_deadline, this));
}
void start_read_console()
{
boost::asio::async_read_until(input_, console_buffer_, '\n',
boost::bind(&client::handle_read_console, this, _1, _2));
}
void handle_read_console(const boost::system::error_code& ec, std::size_t length)
{
if (stopped_)
return;
if (!ec)
{
// Extract the newline-delimited message from the buffer.
std::string line, terminated_line;
std::istream is(&console_buffer_);
std::getline(is, line);
// Empty messages are heartbeats and so ignored.
if (!line.empty())
{
std::cout << "Sending: " << line << "\n";
terminated_line = line + std::string("\n");
std::size_t n = terminated_line.size();
terminated_line.copy(send_buffer_, n);
boost::asio::async_write(socket_, boost::asio::buffer(send_buffer_,n),
boost::bind(&client::handle_send, this, _1, _2));
}
/*if (length != 0)
{
boost::asio::async_write(socket_, console_buffer_,
boost::bind(&client::handle_send, this, _1, _2));
}*/
start_read_console();
}
else
{
std::cout << "Error on handle_read_console: " << ec.message() << "\n";
stop();
}
}
void handle_send(const boost::system::error_code& ec, std::size_t length)
{
if (stopped_)
return;
if (!ec)
{
std::cout << "Sent " << length << " bytes" << std::endl;
}
else
{
std::cout << "Error on handle_send: " << ec.message() << "\n";
stop();
}
}
private:
bool stopped_;
tcp::socket socket_;
boost::asio::streambuf input_buffer_;
deadline_timer deadline_;
deadline_timer heartbeat_timer_;
boost::asio::posix::stream_descriptor input_;
boost::asio::streambuf console_buffer_;
enum {max_length=1024};
char send_buffer_[max_length];
};
int main(int argc, char* argv[])
{
try
{
if (argc != 3)
{
std::cerr << "Usage: client <host> <port>\n";
return 1;
}
boost::asio::io_service io_service;
tcp::resolver r(io_service);
client c(io_service);
c.start(r.resolve(tcp::resolver::query(argv[1], argv[2])));
io_service.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}