diff --git a/CHANGELOG.md b/CHANGELOG.md index ad00110..9629a9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Specify cacert/cafile/keyfile via URI parameters +### Fixed + +- `Channel` methods didn't raise `Connection::ClosedException` if connection had been closed by server. + ## [1.2.5] - 2024-06-17 ### Fixed diff --git a/spec/amqp-client_spec.cr b/spec/amqp-client_spec.cr index ab903f9..6f2f68f 100644 --- a/spec/amqp-client_spec.cr +++ b/spec/amqp-client_spec.cr @@ -278,16 +278,15 @@ describe AMQP::Client do end end - it "should set connection name" do + it "should set connection name", tags: "slow" do AMQP::Client.start(name: "My Name") do |_| names = Array(String).new - 5.times do - HTTP::Client.get("http://guest:guest@#{AMQP::Client::AMQP_HOST}:15672/api/connections") do |resp| - conns = JSON.parse resp.body_io - names = conns.as_a.map &.dig("client_properties", "connection_name") + with_http_api do |api| + 5.times do + names = api.connections.map &.dig("client_properties", "connection_name") break if names.includes? "My name" + sleep 1 end - sleep 1 end names.should contain "My Name" end @@ -412,4 +411,38 @@ describe AMQP::Client do c.update_secret("foobar", "no reason") end end + + describe "Channel raises Connection::ClosedException", tags: "slow" do + it "#basic_consume block=true" do + with_channel do |ch| + q = ch.queue + q.publish "foobar" + expect_raises(AMQP::Client::Connection::ClosedException) do + ch.basic_consume q.name, block: true do + with_http_api &.close_connections(1) + end + end + end + end + + it "#basic_publish" do + with_channel do |ch| + with_http_api &.close_connections(1) + sleep 1 # Wait for connection to be closed + expect_raises(AMQP::Client::Connection::ClosedException) do + ch.basic_publish "", "", "foobar" + end + end + end + + it "#basic_publish_confirm" do + with_channel do |ch| + with_http_api &.close_connections(1) + sleep 1 # Wait for connection to be closed + expect_raises(AMQP::Client::Connection::ClosedException) do + ch.basic_publish_confirm "", "", "foobar" + end + end + end + end end diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 4814d01..9fe23bb 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -34,6 +34,41 @@ module TestHelpers yield c.channel end end + + struct ManagementApi + def initialize(uri : URI) + @http = HTTP::Client.new uri + username = uri.user || "guest" + password = uri.password || "guest" + @http.basic_auth username, password + end + + def connections + get("/api/connections/") do |resp| + JSON.parse(resp.body_io).as_a + end + end + + def close_connections(amount : Int) + loop do + conns = connections + conns.each do |conn| + name = conn["name"].as_s + delete("/api/connections/#{URI.encode_path_segment name}") + amount -= 1 + end + break if amount <= 0 + sleep 1 + end + end + + forward_missing_to @http + end + + def with_http_api(&) + uri = URI.parse ENV.fetch("MGMT_URL", "http://guest:guest@#{AMQP::Client::AMQP_HOST}:15672") + yield ManagementApi.new(uri) + end end extend TestHelpers diff --git a/src/amqp-client/channel.cr b/src/amqp-client/channel.cr index 58a3ba6..e9b3698 100644 --- a/src/amqp-client/channel.cr +++ b/src/amqp-client/channel.cr @@ -268,7 +268,7 @@ class AMQP::Client def basic_publish(body : IO | Bytes, bytesize : Int, exchange : String, routing_key = "", mandatory = false, immediate = false, props properties = Properties.new, blk : Proc(Bool, Nil)? = nil) : UInt64 - raise ClosedException.new(@closing_frame) if @closing_frame + raise_if_closed @connection.with_lock(flush: !@tx) do |c| if blk && !@confirm_mode @@ -309,7 +309,7 @@ class AMQP::Client waiting_fiber.enqueue end sleep - raise ClosedException.new(@closing_frame) if @closing_frame + raise_if_closed confirmed end @@ -321,7 +321,7 @@ class AMQP::Client waiting_fiber.enqueue end sleep - raise ClosedException.new(@closing_frame) if @closing_frame + raise_if_closed confirmed end @@ -333,11 +333,8 @@ class AMQP::Client return true if @unconfirmed_publishes.empty? ok = @unconfirmed_empty.receive unless ok - if frame = @closing_frame - raise ClosedException.new(frame) - else - raise Error.new("BUG: got nack without closing frame") - end + raise_if_closed + raise Error.new("BUG: got nack without closing frame") end ok end @@ -383,13 +380,8 @@ class AMQP::Client write Frame::Basic::Get.new(@id, 0_u16, queue, no_ack) @basic_get.receive rescue ex : ::Channel::ClosedError - if cf = @connection.closing_frame - raise Connection::ClosedException.new(cf) - elsif cf = @closing_frame - raise ClosedException.new(cf, cause: ex) - else - raise ex - end + raise_if_closed(cause: ex) + raise ex end # :nodoc: @@ -430,7 +422,7 @@ class AMQP::Client raise ex end end - raise ClosedException.new(@closing_frame) if @closing_frame + raise_if_closed else done.close end @@ -681,18 +673,15 @@ class AMQP::Client end private def write(frame) - raise ClosedException.new(@closing_frame) if @closing_frame + raise_if_closed @connection.write frame end private def next_frame : Frame @reply_frames.receive rescue ex : ::Channel::ClosedError - if conn_close = @connection.closing_frame - raise Connection::ClosedException.new(conn_close) - else - raise ClosedException.new(@closing_frame, cause: ex) - end + raise_if_closed(ex) + raise Error.new("BUG: Channel::ClosedError but not closing frame") end private macro expect(clz) @@ -700,6 +689,15 @@ class AMQP::Client frame.as?({{ clz }}) || raise Error::UnexpectedFrame.new(frame) end + private def raise_if_closed(cause ex : Exception? = nil) + if frame = @closing_frame + raise ClosedException.new frame, cause: ex + end + if frame = @connection.closing_frame + raise Connection::ClosedException.new frame, cause: ex + end + end + def inspect(io : IO) : Nil io << "#<" << self.class.name << " @id=" << @id << '>' end diff --git a/src/amqp-client/errors.cr b/src/amqp-client/errors.cr index 016dc0e..05f43e1 100644 --- a/src/amqp-client/errors.cr +++ b/src/amqp-client/errors.cr @@ -17,8 +17,8 @@ class AMQP::Client super(message, cause) end - def initialize(frame : Frame::Connection::Close) - super("#{frame.reply_text} (#{frame.reply_code})") + def initialize(frame : Frame::Connection::Close, cause : Exception? = nil) + super("#{frame.reply_text} (#{frame.reply_code})", cause) end def initialize(message, host, user, vhost)