Skip to content

Commit

Permalink
Don't Send on Socket if Identity Frame was not Sent (#75)
Browse files Browse the repository at this point in the history
* fixes #63 thanks to @andreasjunghans and his supurb docs investigation

* update comments
  • Loading branch information
kevinkreiser authored Feb 24, 2019
1 parent 9a8ac0e commit 14c7592
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 31 deletions.
2 changes: 1 addition & 1 deletion prime_server/prime_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//some version info
#define PRIME_SERVER_VERSION_MAJOR 0
#define PRIME_SERVER_VERSION_MINOR 6
#define PRIME_SERVER_VERSION_PATCH 3
#define PRIME_SERVER_VERSION_PATCH 4

#include <functional>
#include <string>
Expand Down
74 changes: 44 additions & 30 deletions src/prime_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ namespace prime_server {
if(request.second == 0)
break;
//send the stuff on
server.send(static_cast<const void*>(identity), identity_size, ZMQ_SNDMORE);
server.send(request.first, request.second, 0);
if(!server.send(static_cast<const void*>(identity), identity_size, ZMQ_SNDMORE) ||
!server.send(request.first, request.second, 0))
logging::ERROR("Client failed to send request");
}
catch(const std::exception& e) {
logging::ERROR(std::string(__FILE__) + ":" + std::to_string(__LINE__) + " client_t: " + e.what());
Expand Down Expand Up @@ -200,13 +201,16 @@ namespace prime_server {
}//actual request data
else {
if(session != sessions.end()) {
//proxy any whole bits onward, or if that failed (malformed or large request) close the session
//attempt to proxy any whole bits onward and if it failed (malformed or large request) close the session
if(!enqueue(session->first, body, session->second)) {
client.send(session->first, ZMQ_SNDMORE | ZMQ_DONTWAIT);
client.send(static_cast<const void*>(""), 0, ZMQ_DONTWAIT);
for(auto id_time_stamp : session->second.enqueued)
interrupt.send(static_cast<void*>(&id_time_stamp), sizeof(id_time_stamp), ZMQ_DONTWAIT);
sessions.erase(session);
//if sending the identity frame (session->first) fails we cannot send the disconnect or it will hang the entire socket
if(client.send(session->first, ZMQ_SNDMORE | ZMQ_DONTWAIT) && client.send(static_cast<const void*>(""), 0, ZMQ_DONTWAIT)) {
for(auto id_time_stamp : session->second.enqueued)
interrupt.send(static_cast<void*>(&id_time_stamp), sizeof(id_time_stamp), ZMQ_DONTWAIT);
sessions.erase(session);
}
else
logging::ERROR("Server failed to disconnect client after rejecting request");
}
}
else
Expand All @@ -222,9 +226,10 @@ namespace prime_server {
parsed_requests = request.from_stream(static_cast<const char*>(message.data()), message.size(), max_request_size);
}//something went wrong, either in parsing or size limitation
catch(const typename request_container_t::request_exception_t& e) {
client.send(requester, ZMQ_SNDMORE | ZMQ_DONTWAIT);
client.send(e.response, ZMQ_DONTWAIT);
if(log) {
//if sending the identify frame fails (requester) we cannot send the error response or it will hang the entire socket
if(!client.send(requester, ZMQ_SNDMORE | ZMQ_DONTWAIT) || !client.send(e.response, ZMQ_DONTWAIT))
logging::ERROR("Server failed to send rejection response");
else if(log) {
request.log(request_id);
e.log(request_id);
}
Expand All @@ -237,14 +242,17 @@ namespace prime_server {
//figure out if we are expecting to close this request or not
auto info = parsed_request.to_info(request_id++);
//send on the request
this->proxy.send(static_cast<const void*>(&info), sizeof(info), ZMQ_DONTWAIT | ZMQ_SNDMORE);
this->proxy.send(parsed_request.to_string(), ZMQ_DONTWAIT);
if(!proxy.send(static_cast<const void*>(&info), sizeof(info), ZMQ_DONTWAIT | ZMQ_SNDMORE) ||
!proxy.send(parsed_request.to_string(), ZMQ_DONTWAIT)) {
logging::ERROR("Server failed to enqueue request");
return false;
}
if(log)
parsed_request.log(info.id);
//remember we are working on it
request.enqueued.emplace_back(static_cast<typename decltype(requests)::key_type>(info));
this->requests.emplace(request.enqueued.back(), requester);
this->request_history.emplace_back(std::move(info));
requests.emplace(request.enqueued.back(), requester);
request_history.emplace_back(std::move(info));
}
return true;
}
Expand All @@ -255,15 +263,15 @@ namespace prime_server {
auto request = requests.find(static_cast<typename decltype(requests)::key_type>(info));
if(request == requests.cend())
return false;
//reply to the client with the response or an error
client.send(request->second, ZMQ_SNDMORE | ZMQ_DONTWAIT);
client.send(response, ZMQ_DONTWAIT);
if(log)
//reply to the client with the response or an error however, if sending the identity frame failed
//we cannot send the response/error because it will hang the entire socket
if(!client.send(request->second, ZMQ_SNDMORE | ZMQ_DONTWAIT) || !client.send(response, ZMQ_DONTWAIT))
logging::ERROR("Server failed to dequeue request");
else if(log)
info.log(response.size());
//cleanup, session may or may not be keep alive
if(!info.keep_alive()){
this->client.send(request->second, ZMQ_DONTWAIT | ZMQ_SNDMORE);
this->client.send(static_cast<const void*>(""), 0, ZMQ_DONTWAIT);
//cleanup and if its not keep alive close the session
//if sending the identity frame fails, we cannot send the disconnect message or it will hang the entire socket
if(!info.keep_alive() && client.send(request->second, ZMQ_DONTWAIT | ZMQ_SNDMORE) && client.send(static_cast<const void*>(""), 0, ZMQ_DONTWAIT)){
auto session = sessions.find(request->second);
for(auto id_time_stamp : session->second.enqueued)
interrupt.send(static_cast<void*>(&id_time_stamp), sizeof(id_time_stamp), ZMQ_DONTWAIT);
Expand Down Expand Up @@ -341,8 +349,10 @@ namespace prime_server {
hb_itr = heart_beats.find(heart_beat);
}
//send it on to the first bored worker
downstream.send(hb_itr->second, ZMQ_DONTWAIT | ZMQ_SNDMORE);
downstream.send_all(messages, ZMQ_DONTWAIT);
//TODO: if sending fails we need to try the next worker
if(!downstream.send(hb_itr->second, ZMQ_DONTWAIT | ZMQ_SNDMORE) ||
!downstream.send_all(messages, ZMQ_DONTWAIT))
logging::ERROR("Failed to forward job to worker");
//they are dead to us until they report back
auto worker_itr = workers.find(hb_itr->second);
fifo.erase(worker_itr->second);
Expand Down Expand Up @@ -409,18 +419,22 @@ namespace prime_server {
heart_beat = std::move(result.heart_beat);
//should we send this on to the next proxy
if(result.intermediate) {
downstream_proxy.send(request_info, ZMQ_SNDMORE);
downstream_proxy.send_all(result.messages, 0);
//TODO: retry?
if(!downstream_proxy.send(request_info, ZMQ_SNDMORE) ||
!downstream_proxy.send_all(result.messages, 0))
logging::ERROR("Worker failed to forward intermediate result");
}//or are we done
else if(result.messages.size() != 0) {
loopback.send(request_info, ZMQ_SNDMORE);
if(result.messages.size() > 1) {
logging::WARN("Sending more than one result message over the loopback will result in additional parts being dropped");
result.messages.resize(1);
}
if(result.messages.back().size() == 0)
if(result.messages.back().empty())
logging::WARN("Sending empty messages will disconnect the client");
loopback.send_all(result.messages, 0);
//TODO: retry
if(!loopback.send(request_info, ZMQ_SNDMORE) ||
!loopback.send_all(result.messages, 0))
logging::ERROR("Worker failed to forward final result");
}//an empty result is no good
else {
logging::ERROR("At least one result message is required for the loopback");
Expand Down

0 comments on commit 14c7592

Please sign in to comment.