Skip to content

Commit

Permalink
Merge branch 'master' into 2-8-stable
Browse files Browse the repository at this point in the history
  • Loading branch information
localshred committed Jul 3, 2013
2 parents 4f0535a + 4c6a30e commit 9bde318
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 83 deletions.
1 change: 0 additions & 1 deletion lib/protobuf/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class CLI < ::Thor
option :evented, :type => :boolean, :aliases => %w(-m), :desc => 'Evented Mode for server and client connections (uses EventMachine).'
option :zmq, :type => :boolean, :aliases => %w(-z), :desc => 'ZeroMQ Socket Mode for server and client connections.'

option :beacon_address, :type => :string, :desc => 'Broadcast beacons to this address (defaul: value of ServiceDirectory.address)'
option :beacon_interval, :type => :numeric, :desc => 'Broadcast beacons every N seconds. (default: 5)'
option :beacon_port, :type => :numeric, :desc => 'Broadcast beacons to this port (default: value of ServiceDirectory.port)'
option :broadcast_beacons, :type => :boolean, :desc => 'Broadcast beacons for dynamic discovery (Currently only available with ZeroMQ).'
Expand Down
1 change: 0 additions & 1 deletion lib/protobuf/rpc/connectors/socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def send_data
@socket.flush
log_debug { sign_message("write closed") }
end

end
end
end
Expand Down
123 changes: 51 additions & 72 deletions lib/protobuf/rpc/connectors/zmq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ module Rpc
module Connectors
class Zmq < Base

RequestTimeout = Class.new(RuntimeError)

##
# Included Modules
#
Expand All @@ -23,7 +25,11 @@ class Zmq < Base
# Class Methods
#
def self.zmq_context
@zmq_context ||= ZMQ::Context.new
@zmq_contexts ||= Hash.new { |hash, key|
hash[key] = ZMQ::Context.new
}

@zmq_contexts[Process.pid]
end

##
Expand All @@ -37,9 +43,7 @@ def self.zmq_context
#
def send_request
setup_connection
poll_send_data
ensure
close_connection
send_request_with_lazy_pirate
end

def log_signature
Expand All @@ -53,23 +57,23 @@ def log_signature
#

def close_connection
socket_close
# The socket is automatically closed after every request.
end

# Establish a request socket connection to the remote rpc_server.
# Set the socket option LINGER to 0 so that we don't wait
# for queued messages to be accepted when the socket/context are
# asked to close/terminate.
#
def connect_to_rpc_server
return if error?

# Create a socket connected to a server that can handle the current
# service. The LINGER is set to 0 so we can close immediately in
# the event of a timeout
def create_socket
server_uri = lookup_server_uri
log_debug { sign_message("Establishing connection: #{server_uri}") }

socket = zmq_context.socket(::ZMQ::REQ)
socket.setsockopt(::ZMQ::LINGER, 0)

log_debug { sign_message("Establishing connection: #{server_uri}") }
zmq_error_check(socket.connect(server_uri), :socket_connect)
zmq_error_check(poller.register_readable(socket), :poller_register_readable)
log_debug { sign_message("Connection established to #{server_uri}") }

socket
end

# Method to determine error state, must be used with Connector API.
Expand Down Expand Up @@ -97,53 +101,44 @@ def lookup_server_uri
# If we haven't received a legitimate response in the CLIENT_RETRIES number
# of retries, fail the request.
#
def poll_send_data
return if error?

poll_timeout = (options[:timeout].to_f / CLIENT_RETRIES.to_f) * 1000

CLIENT_RETRIES.times do |n|
connect_to_rpc_server
log_debug { sign_message("Sending Request (attempt #{n + 1}, #{socket})") }
send_data
log_debug { sign_message("Request sending complete (attempt #{n + 1}, #{socket})") }

if poller.poll(poll_timeout) == 1
read_response
return
else
socket_close
end
end

fail(:RPC_FAILED, "The server took longer than #{options[:timeout]} seconds to respond")
end
def send_request_with_lazy_pirate
attempt = 0
timeout = options[:timeout].to_f
@stats.request_size = @request_data.size

def poller
@poller ||= ::ZMQ::Poller.new
begin
attempt += 1
send_request_with_timeout(timeout, attempt)
parse_response
rescue RequestTimeout
retry if attempt < CLIENT_RETRIES
fail(:RPC_FAILED, "The server repeatedly failed to respond within #{timeout} seconds")
rescue => e
fail(:RPC_FAILED, "Unexpected error sending request: #{e}")
end
end

# Read the string response from the available readable. This will be
# the current @socket. Calls `parse_response` to invoke the success or
# failed callbacks, depending on the state of the communication
# and response data.
#
def read_response
return if error?

@response_data = ''
zmq_error_check(socket.recv_string(@response_data), :socket_recv_string)

parse_response
end
def send_request_with_timeout(timeout, attempt = 0)
socket = create_socket

# Send the request data to the remote rpc_server.
#
def send_data
return if error?
poller = ::ZMQ::Poller.new
poller.register_readable(socket)

@stats.request_size = @request_data.size
log_debug { sign_message("Sending Request (attempt #{attempt}, #{socket})") }
zmq_error_check(socket.send_string(@request_data), :socket_send_string)
log_debug { sign_message("Waiting #{timeout} seconds for response (attempt #{attempt}, #{socket})") }

if poller.poll(timeout * 1000) == 1
zmq_error_check(socket.recv_string(@response_data = ""), :socket_recv_string)
log_debug { sign_message("Response received (attempt #{attempt}, #{socket})") }
else
log_debug { sign_message("Timed out waiting for response (attempt #{attempt}, #{socket})") }
raise RequestTimeout
end
ensure
log_debug { sign_message("Closing Socket") }
zmq_error_check(socket.close, :socket_close)
log_debug { sign_message("Socket closed") }
end

# The service we're attempting to connect to
Expand All @@ -157,21 +152,6 @@ def service_directory
::Protobuf::Rpc::ServiceDirectory.instance
end

# Setup a ZMQ request socket in the current zmq context.
#
def socket
@socket ||= zmq_context.socket(::ZMQ::REQ)
end

def socket_close
if socket
log_debug { sign_message("Closing Socket") }
zmq_error_check(socket.close, :socket_close)
log_debug { sign_message("Socket closed") }
@socket = nil
end
end

# Return the ZMQ Context to use for this process.
# If the context does not exist, create it, then register
# an exit block to ensure the context is terminated correctly.
Expand All @@ -189,7 +169,6 @@ def zmq_error_check(return_code, source)
ERROR
end
end

end
end
end
Expand Down
10 changes: 1 addition & 9 deletions lib/protobuf/rpc/servers/zmq/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,7 @@ def beacon_interval
end

def beacon_ip
unless @beacon_ip
unless address = options[:beacon_address]
address = ::Protobuf::Rpc::ServiceDirectory.address
end

@beacon_ip = resolve_ip(address)
end

@beacon_ip
"255.255.255.255"
end

def beacon_port
Expand Down
4 changes: 4 additions & 0 deletions lib/protobuf/rpc/servers/zmq_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ def register_signals
@server.add_worker
log_info { "Increased worker size to: #{@server.total_workers}" }
end

trap(:TTOU) do
log_info { "Current worker size: #{@server.workers.size}" }
end
end
end
end
Expand Down

0 comments on commit 9bde318

Please sign in to comment.