Skip to content

Commit

Permalink
Drop non-close/close_ok methods per AMQP stability rule when closing …
Browse files Browse the repository at this point in the history
…due to input framing error.

Fixed up/improved a number of tests, inluding two with duplicate names.
Don't override system-level exceptions (e.g., SystemExit, KeyboardException) in Channel.process_frames()

Implemented clean-up exception handling in Channel.process_frames to preserve exception/traceback of original exception, but allowing system-exiting exceptions to override.
Implemented unit test method test_process_frames_raises_systemexit_when_close_raises_systemexit.
Updated names of a couple of test methods to reflect what they do (implementaiton must have changed since they were originally named).

Cache connection's logger in Channel's constructor so that the logger is still available after `_closed_cb` (e.g., for logging an exception from a channel-close listener)

Implemented channel test test_process_frames_passes_through_exception_from_close_listener
  • Loading branch information
Vitaly Kruglikov committed Jan 11, 2016
1 parent e5320ab commit a3ce855
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 51 deletions.
49 changes: 43 additions & 6 deletions haigha/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from haigha.frames.frame import Frame
from haigha.frames.content_frame import ContentFrame
from haigha.frames.header_frame import HeaderFrame
from haigha.frames.method_frame import MethodFrame
from haigha.exceptions import ChannelError, ChannelClosed, ConnectionClosed

# Defined here so it's easier to test
Expand Down Expand Up @@ -57,6 +58,8 @@ def __init__(self, connection, channel_id, class_map, **kwargs):
'''
self._connection = connection
self._channel_id = channel_id
# Save logger so that we have access to it even after _closed_cb
self._logger = connection.logger

self._class_map = {}
for _id, _class in class_map.iteritems():
Expand All @@ -76,6 +79,11 @@ def __init__(self, connection, channel_id, class_map, **kwargs):
# Listeners for when channel closes
self._close_listeners = set()

# Set when we initiate channel.close following framing error; when set,
# we drop all incoming frames except basic.close and basic.close_ok per
# AMQP 0.9.1 stability rule; see `Channel.process_frames()`
self._emergency_close_pending = False

# Moving state out of protocol class so that it's accessible even
# after we've closed and deleted references to the protocol classes.
# Note though that many of these fields are written to directly
Expand All @@ -102,7 +110,7 @@ def channel_id(self):
@property
def logger(self):
'''Return a shared logger handle for the channel.'''
return self._connection.logger
return self._logger

@property
def closed(self):
Expand Down Expand Up @@ -227,24 +235,53 @@ def process_frames(self):
Process the input buffer.
'''
while len(self._frame_buffer):
# It would make sense to call next_frame, but it's
# technically faster to repeat the code here.
frame = self._frame_buffer.popleft()

if self._emergency_close_pending:
# Implement stability rule from AMQP 0.9.1 section 1.5.2.5.
# Method channel.close: "After sending this method, any
# received methods except Close and Close-OK MUST be discarded."
#
# NOTE: presently, we limit our implementation of the rule to
# the "emergency close" scenario to avoid potential adverse
# side-effect during normal user-initiated close
if (not isinstance(frame, MethodFrame) or
frame.class_id != 20 or
frame.method_id not in (40, 41)):
self.logger.warn("Emergency channel close: dropping input "
"frame %.255s", frame)
continue
try:
# It would make sense to call next_frame, but it's
# technically faster to repeat the code here.
frame = self._frame_buffer.popleft()
self.dispatch(frame)
except ProtocolClass.FrameUnderflow:
return
except (ConnectionClosed, ChannelClosed):
# Immediately raise if connection or channel is closed
raise
except Exception:
self.logger.exception(
"Closing on failed dispatch of frame %.255s", frame)

# Spec says that channel should be closed if there's a framing
# error. Unsure if we can send close if the current exception
# is transport level (e.g. gevent.GreenletExit)
self._emergency_close_pending = True

# Preserve the original exception and traceback during cleanup,
# only allowing system-exiting exceptions (e.g., SystemExit,
# KeyboardInterrupt) to override it
try:
self.close(500, "Failed to dispatch %s" % (str(frame)))
finally:
raise
finally:
try:
self.close(500, "Failed to dispatch %s" % (str(frame)))
except Exception:
# Suppress secondary non-system-exiting exception in
# favor of the original exception
self.logger.exception("Channel close failed")
pass

def next_frame(self):
'''
Expand Down
9 changes: 7 additions & 2 deletions haigha/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,11 @@ def read_frames(self):
(self._close_info['reply_code'],
self._close_info['reply_text']))

self._transport.process_channels(p_channels)

# NOTE: we process channels after buffering unused data in order to
# preserve the integrity of the input stream in case a channel needs to
# read input, such as when a channel framing error necessitates the use
# of the synchronous channel.close method. See `Channel.process_frames`.
#
# HACK: read the buffer contents and re-buffer. Would prefer to pass
# buffer back, but there's no good way of asking the total size of the
# buffer, comparing to tell(), and then re-buffering. There's also no
Expand All @@ -452,6 +455,8 @@ def read_frames(self):
if reader.tell() < len(data):
self._transport.buffer(data[reader.tell():])

self._transport.process_channels(p_channels)

def _flush_buffered_frames(self):
'''
Callback when protocol has been initialized on channel 0 and we're
Expand Down
Loading

0 comments on commit a3ce855

Please sign in to comment.