Skip to content

Commit

Permalink
Make channel raise Connection::ClosedException (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
spuun authored Aug 23, 2024
1 parent 1ffc86d commit 6708526
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 30 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Specify cacert/cafile/keyfile via URI parameters

### Fixed

- `Channel` methods didn't raise `Connection::ClosedException` if connection had been closed by server.

## [1.2.5] - 2024-06-17

### Fixed
Expand Down
45 changes: 39 additions & 6 deletions spec/amqp-client_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,15 @@ describe AMQP::Client do
end
end

it "should set connection name" do
it "should set connection name", tags: "slow" do
AMQP::Client.start(name: "My Name") do |_|
names = Array(String).new
5.times do
HTTP::Client.get("http://guest:guest@#{AMQP::Client::AMQP_HOST}:15672/api/connections") do |resp|
conns = JSON.parse resp.body_io
names = conns.as_a.map &.dig("client_properties", "connection_name")
with_http_api do |api|
5.times do
names = api.connections.map &.dig("client_properties", "connection_name")
break if names.includes? "My name"
sleep 1
end
sleep 1
end
names.should contain "My Name"
end
Expand Down Expand Up @@ -412,4 +411,38 @@ describe AMQP::Client do
c.update_secret("foobar", "no reason")
end
end

describe "Channel raises Connection::ClosedException", tags: "slow" do
it "#basic_consume block=true" do
with_channel do |ch|
q = ch.queue
q.publish "foobar"
expect_raises(AMQP::Client::Connection::ClosedException) do
ch.basic_consume q.name, block: true do
with_http_api &.close_connections(1)
end
end
end
end

it "#basic_publish" do
with_channel do |ch|
with_http_api &.close_connections(1)
sleep 1 # Wait for connection to be closed
expect_raises(AMQP::Client::Connection::ClosedException) do
ch.basic_publish "", "", "foobar"
end
end
end

it "#basic_publish_confirm" do
with_channel do |ch|
with_http_api &.close_connections(1)
sleep 1 # Wait for connection to be closed
expect_raises(AMQP::Client::Connection::ClosedException) do
ch.basic_publish_confirm "", "", "foobar"
end
end
end
end
end
35 changes: 35 additions & 0 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,41 @@ module TestHelpers
yield c.channel
end
end

struct ManagementApi
def initialize(uri : URI)
@http = HTTP::Client.new uri
username = uri.user || "guest"
password = uri.password || "guest"
@http.basic_auth username, password
end

def connections
get("/api/connections/") do |resp|
JSON.parse(resp.body_io).as_a
end
end

def close_connections(amount : Int)
loop do
conns = connections
conns.each do |conn|
name = conn["name"].as_s
delete("/api/connections/#{URI.encode_path_segment name}")
amount -= 1
end
break if amount <= 0
sleep 1
end
end

forward_missing_to @http
end

def with_http_api(&)
uri = URI.parse ENV.fetch("MGMT_URL", "http://guest:guest@#{AMQP::Client::AMQP_HOST}:15672")
yield ManagementApi.new(uri)
end
end

extend TestHelpers
42 changes: 20 additions & 22 deletions src/amqp-client/channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class AMQP::Client
def basic_publish(body : IO | Bytes, bytesize : Int, exchange : String, routing_key = "",
mandatory = false, immediate = false, props properties = Properties.new,
blk : Proc(Bool, Nil)? = nil) : UInt64
raise ClosedException.new(@closing_frame) if @closing_frame
raise_if_closed

@connection.with_lock(flush: !@tx) do |c|
if blk && !@confirm_mode
Expand Down Expand Up @@ -309,7 +309,7 @@ class AMQP::Client
waiting_fiber.enqueue
end
sleep
raise ClosedException.new(@closing_frame) if @closing_frame
raise_if_closed
confirmed
end

Expand All @@ -321,7 +321,7 @@ class AMQP::Client
waiting_fiber.enqueue
end
sleep
raise ClosedException.new(@closing_frame) if @closing_frame
raise_if_closed
confirmed
end

Expand All @@ -333,11 +333,8 @@ class AMQP::Client
return true if @unconfirmed_publishes.empty?
ok = @unconfirmed_empty.receive
unless ok
if frame = @closing_frame
raise ClosedException.new(frame)
else
raise Error.new("BUG: got nack without closing frame")
end
raise_if_closed
raise Error.new("BUG: got nack without closing frame")
end
ok
end
Expand Down Expand Up @@ -383,13 +380,8 @@ class AMQP::Client
write Frame::Basic::Get.new(@id, 0_u16, queue, no_ack)
@basic_get.receive
rescue ex : ::Channel::ClosedError
if cf = @connection.closing_frame
raise Connection::ClosedException.new(cf)
elsif cf = @closing_frame
raise ClosedException.new(cf, cause: ex)
else
raise ex
end
raise_if_closed(cause: ex)
raise ex
end

# :nodoc:
Expand Down Expand Up @@ -430,7 +422,7 @@ class AMQP::Client
raise ex
end
end
raise ClosedException.new(@closing_frame) if @closing_frame
raise_if_closed
else
done.close
end
Expand Down Expand Up @@ -681,25 +673,31 @@ class AMQP::Client
end

private def write(frame)
raise ClosedException.new(@closing_frame) if @closing_frame
raise_if_closed
@connection.write frame
end

private def next_frame : Frame
@reply_frames.receive
rescue ex : ::Channel::ClosedError
if conn_close = @connection.closing_frame
raise Connection::ClosedException.new(conn_close)
else
raise ClosedException.new(@closing_frame, cause: ex)
end
raise_if_closed(ex)
raise Error.new("BUG: Channel::ClosedError but not closing frame")
end

private macro expect(clz)
frame = next_frame
frame.as?({{ clz }}) || raise Error::UnexpectedFrame.new(frame)
end

private def raise_if_closed(cause ex : Exception? = nil)
if frame = @closing_frame
raise ClosedException.new frame, cause: ex
end
if frame = @connection.closing_frame
raise Connection::ClosedException.new frame, cause: ex
end
end

def inspect(io : IO) : Nil
io << "#<" << self.class.name << " @id=" << @id << '>'
end
Expand Down
4 changes: 2 additions & 2 deletions src/amqp-client/errors.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class AMQP::Client
super(message, cause)
end

def initialize(frame : Frame::Connection::Close)
super("#{frame.reply_text} (#{frame.reply_code})")
def initialize(frame : Frame::Connection::Close, cause : Exception? = nil)
super("#{frame.reply_text} (#{frame.reply_code})", cause)
end

def initialize(message, host, user, vhost)
Expand Down

0 comments on commit 6708526

Please sign in to comment.