Skip to content

Commit

Permalink
Support message containers introduced in RabbitMQ 3.13.0
Browse files Browse the repository at this point in the history
This commit makes the plugin depend on the `mc` module and hence
3.13.0+ at runtime.

Fixes noxdafox#108
  • Loading branch information
gomoripeti committed Jun 26, 2024
1 parent 2591f9d commit 114d6b0
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
elixir:
- '1.16.3'
rmqref:
- v3.12.x
- v3.13.x
steps:
- uses: actions/checkout@v4
- name: Install Erlang and Elixir
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ Then copy all the *.ez files inside the plugins folder to the [RabbitMQ plugins
[sudo] rabbitmq-plugins enable rabbitmq_message_deduplication
```

## Version requirements

The latest version of the plugin requires RabbitMQ 3.13.0.

Earlier RabbitMQ versions are supported by 0.6.2.

## Exchange level deduplication

The exchange type `x-message-deduplication` allows to filter message duplicates before any routing rule is applied.
Expand Down
40 changes: 20 additions & 20 deletions lib/rabbitmq_message_deduplication/common.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,9 @@ defmodule RabbitMQMessageDeduplication.Common do

require RabbitMQMessageDeduplication.Cache

alias :rabbit_binary_parser, as: RabbitBinaryParser
alias :mc, as: MC
alias RabbitMQMessageDeduplication.Cache, as: Cache

defrecord :content, extract(
:content, from_lib: "rabbit_common/include/rabbit.hrl")

@type basic_message :: record(:basic_message)
defrecord :basic_message, extract(
:basic_message, from_lib: "rabbit_common/include/rabbit.hrl")

defrecord :basic_properties, :P_basic, extract(
:P_basic, from_lib: "rabbit_common/include/rabbit_framing.hrl")

@default_arguments %{type: nil, default: nil}

@doc """
Expand All @@ -55,14 +45,24 @@ defmodule RabbitMQMessageDeduplication.Common do
@doc """
Retrieve the given header from the message.
"""
@spec message_header(basic_message, String.t) :: String.t | nil
def message_header(basic_message(content: message_content), header) do
message_content = RabbitBinaryParser.ensure_content_decoded(message_content)

case content(message_content, :properties) do
basic_properties(headers: headers) when is_list(headers) ->
rabbit_keyfind(headers, header)
basic_properties(headers: :undefined) -> nil
@spec message_header(MC.state, String.t) :: String.t | integer() | float() | boolean() | :undefined | nil
def message_header(message, header) do
case MC.x_header(header, message) do
{_type, value} when not is_list(value) and not is_tuple(value) ->
# list and tuple values have type-tagged elements
# that would need to be untagged recursively
# we don't expect to use such headers, so those cases are not handled
value
:null ->
# header value in AMQP message was {:void, :undefined}

# pre-3.13 version of this function used rabbit_keyfind/2
# which returned :undefined instead of nil or :void. We have to
# keep this value as this is used in keys to cache the message
# and is preserved during a rolling upgrade in a replicated
# Mnesia table
:undefined
:undefined -> nil
end
end

Expand All @@ -71,7 +71,7 @@ defmodule RabbitMQMessageDeduplication.Common do
If not, it adds it to the cache with the corresponding name.
"""
@spec duplicate?(tuple, basic_message, integer | nil) :: boolean
@spec duplicate?(tuple, MC.state, integer | nil) :: boolean
def duplicate?(name, message, ttl \\ nil) do
cache = cache_name(name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ defmodule RabbitMQMessageDeduplication.Exchange do
defrecord :exchange, extract(
:exchange, from_lib: "rabbit_common/include/rabbit.hrl")

defrecord :delivery, extract(
:delivery, from_lib: "rabbit_common/include/rabbit.hrl")

defrecord :basic_message, extract(
:basic_message, from_lib: "rabbit_common/include/rabbit.hrl")

@doc """
Register the exchange type within the Broker.
"""
Expand Down Expand Up @@ -90,7 +84,7 @@ defmodule RabbitMQMessageDeduplication.Exchange do
end

@impl :rabbit_exchange_type
def route(exchange(name: name), delivery(message: msg = basic_message())) do
def route(exchange(name: name), msg, _opts) do
if route?(name, msg) do
RabbitRouter.match_routing_key(name, [:_])
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ defmodule RabbitMQMessageDeduplication.Queue do
"""

import Record, only: [defrecord: 2, defrecord: 3, extract: 2]
import Record, only: [defrecord: 2]

require RabbitMQMessageDeduplication.Cache
require RabbitMQMessageDeduplication.Common

alias :amqqueue, as: AMQQueue
alias :rabbit_log, as: RabbitLog
alias :rabbit_amqqueue, as: RabbitQueue
alias :mc, as: MC
alias RabbitMQMessageDeduplication.Common, as: Common
alias RabbitMQMessageDeduplication.Cache, as: Cache
alias RabbitMQMessageDeduplication.CacheManager, as: CacheManager
Expand All @@ -46,15 +47,6 @@ defmodule RabbitMQMessageDeduplication.Queue do
{:requires, :kernel_ready},
{:enables, :core_initialized}]}

defrecord :content, extract(
:content, from_lib: "rabbit_common/include/rabbit.hrl")

defrecord :basic_message, extract(
:basic_message, from_lib: "rabbit_common/include/rabbit.hrl")

defrecord :basic_properties, :P_basic, extract(
:P_basic, from_lib: "rabbit_common/include/rabbit_framing.hrl")

defrecord :dqack, [:tag, :header]
defrecord :dqstate, [:queue, :queue_state, dedup_enabled: false]

Expand Down Expand Up @@ -286,8 +278,9 @@ defmodule RabbitMQMessageDeduplication.Queue do
if dedup_queue?(state) do
case fetch(need_ack, state) do
{:empty, state} -> {:empty, state}
{{message = basic_message(id: id), _, ack_tag}, state} ->
{{message, _, ack_tag}, state} ->
maybe_delete_cache_entry(queue, message)
id = MC.get_annotation(:id, message)

{{id, ack_tag}, state}
end
Expand Down Expand Up @@ -521,7 +514,7 @@ defmodule RabbitMQMessageDeduplication.Queue do
end

# Returns true if the message is a duplicate.
defp duplicate?(queue, message = basic_message()) do
defp duplicate?(queue, message) do
name = AMQQueue.get_name(queue)

if Common.duplicate?(name, message, message_expiration(message)) do
Expand All @@ -533,18 +526,14 @@ defmodule RabbitMQMessageDeduplication.Queue do

# Returns the expiration property of the given message
defp message_expiration(message) do
basic_message(content: content(properties: properties)) = message

case properties do
basic_properties(expiration: ttl) when is_bitstring(ttl) ->
String.to_integer(ttl)
basic_properties(expiration: :undefined) -> nil
case MC.ttl(message) do
:undefined -> nil
ttl -> ttl
end
end

# Removes the message deduplication header from the cache
defp maybe_delete_cache_entry(queue, msg = basic_message()) do
defp maybe_delete_cache_entry(queue, msg) when is_tuple(msg) do
header = Common.message_header(msg, "x-deduplication-header")
maybe_delete_cache_entry(queue, header)
end
Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Mixfile do
applications: [:mnesia],
extra_applications: [:rabbit],
mod: {RabbitMQMessageDeduplication, []},
registered: [RabbitMQMessageDeduplication]
registered: [RabbitMQMessageDeduplication],
broker_version_requirements: ["3.13.0"]
]
end

Expand Down

0 comments on commit 114d6b0

Please sign in to comment.