Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicitly set basic.qos.global #313

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions src/Channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -939,11 +939,29 @@ std::string Channel::BasicConsume(const std::string &queue,
return BasicConsume(queue, consumer_tag, no_local, no_ack, exclusive,
message_prefetch_count, Table());
}

std::string Channel::BasicConsume(const std::string &queue,
const std::string &consumer_tag,
bool no_local, bool no_ack, bool exclusive,
boost::uint16_t message_prefetch_count, bool global_qos) {
return BasicConsume(queue, consumer_tag, no_local, no_ack, exclusive,
message_prefetch_count, global_qos, Table());
}

std::string Channel::BasicConsume(const std::string &queue,
const std::string &consumer_tag,
bool no_local, bool no_ack, bool exclusive,
boost::uint16_t message_prefetch_count,
const Table &arguments) {
return BasicConsume(queue, consumer_tag, no_local, no_ack, exclusive,
message_prefetch_count, m_impl->BrokerHasNewQosBehavior(), Table());
}

std::string Channel::BasicConsume(const std::string &queue,
const std::string &consumer_tag,
bool no_local, bool no_ack, bool exclusive,
boost::uint16_t message_prefetch_count, bool global_qos,
const Table &arguments) {
m_impl->CheckIsConnected();
amqp_channel_t channel = m_impl->GetChannel();

Expand All @@ -954,7 +972,7 @@ std::string Channel::BasicConsume(const std::string &queue,
amqp_basic_qos_t qos = {};
qos.prefetch_size = 0;
qos.prefetch_count = message_prefetch_count;
qos.global = m_impl->BrokerHasNewQosBehavior();
qos.global = global_qos;

m_impl->DoRpcOnChannel(channel, AMQP_BASIC_QOS_METHOD, &qos, QOS_OK);
m_impl->MaybeReleaseBuffersOnChannel(channel);
Expand Down Expand Up @@ -990,6 +1008,11 @@ std::string Channel::BasicConsume(const std::string &queue,

void Channel::BasicQos(const std::string &consumer_tag,
boost::uint16_t message_prefetch_count) {
BasicQos(consumer_tag, message_prefetch_count, m_impl->BrokerHasNewQosBehavior());
}

void Channel::BasicQos(const std::string &consumer_tag,
boost::uint16_t message_prefetch_count, bool globalQos) {
m_impl->CheckIsConnected();
amqp_channel_t channel = m_impl->GetConsumerChannel(consumer_tag);

Expand All @@ -998,7 +1021,7 @@ void Channel::BasicQos(const std::string &consumer_tag,
amqp_basic_qos_t qos = {};
qos.prefetch_size = 0;
qos.prefetch_count = message_prefetch_count;
qos.global = m_impl->BrokerHasNewQosBehavior();
qos.global = globalQos;

m_impl->DoRpcOnChannel(channel, AMQP_BASIC_QOS_METHOD, &qos, QOS_OK);
m_impl->MaybeReleaseBuffersOnChannel(channel);
Expand Down
75 changes: 75 additions & 0 deletions src/SimpleAmqpClient/Channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,34 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable {
bool exclusive = true,
boost::uint16_t message_prefetch_count = 1);

/**
* Starts consuming Basic messages on a queue
*
* Subscribes as a consumer to a queue, so all future messages on a queue
* will be Basic.Delivered
* @note Due to a limitation to how things are done, it is only possible to
* reliably have **a single consumer per channel**; calling this
* more than once per channel may result in undefined results.
* @param queue The name of the queue to subscribe to.
* @param consumer_tag The name of the consumer. This is used to do
* operations with a consumer.
* @param no_local Defaults to true
* @param no_ack If `true`, ack'ing the message is automatically done when the
* message is delivered. Defaults to `true` (message does not have to be
* ack'ed).
* @param exclusive Means only this consumer can access the queue.
* @param message_prefetch_count Number of unacked messages the broker will
* deliver. Setting this to more than 1 will allow the broker to deliver
* messages while a current message is being processed. A value of
* 0 means no limit. This option is ignored if `no_ack = true`.
* @param global_qos Sets basic.qos.global explicitly.
* @returns the consumer tag
*/
std::string BasicConsume(const std::string &queue,
const std::string &consumer_tag, bool no_local,
bool no_ack, bool exclusive,
boost::uint16_t message_prefetch_count, bool global_qos);

/**
* Starts consuming Basic messages on a queue
*
Expand Down Expand Up @@ -803,6 +831,37 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable {
boost::uint16_t message_prefetch_count,
const Table &arguments);

/**
* Starts consuming Basic messages on a queue
*
* Subscribes as a consumer to a queue, so all future messages on a queue
* will be Basic.Delivered
* @note Due to a limitation to how things are done, it is only possible to
* reliably have **a single consumer per channel**; calling this
* more than once per channel may result in undefined results.
* @param queue The name of the queue to subscribe to.
* @param consumer_tag The name of the consumer. This is used to do
* operations with a consumer.
* @param no_local Defaults to true
* @param no_ack If `true`, ack'ing the message is automatically done when
* the message is delivered. Defaults to `true` (message does not have to be
* ack'ed).
* @param exclusive Means only this consumer can access the queue.
* @param message_prefetch_count Number of unacked messages the broker will
* deliver. Setting this to more than 1 will allow the broker to deliver
* messages while a current message is being processed. A value of
* 0 means no limit. This option is ignored if `no_ack = true`.
* @param global_qos Sets basic.qos.global explicitly.
* @param arguments A table of additional arguments when creating the
* consumer
* @returns the consumer tag
*/
std::string BasicConsume(const std::string &queue,
const std::string &consumer_tag, bool no_local,
bool no_ack, bool exclusive,
boost::uint16_t message_prefetch_count, bool global_qos,
const Table &arguments);

/**
* Modify consumer's message prefetch count
*
Expand All @@ -818,6 +877,22 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable {
void BasicQos(const std::string &consumer_tag,
boost::uint16_t message_prefetch_count);

/**
* Modify consumer's message prefetch count
*
* Sets the number of unacknowledged messages that will be delivered
* by the broker to a consumer.
*
* Has no effect for consumer with `no_ack` set.
*
* @param consumer_tag The consumer tag to adjust the prefetch for.
* @param message_prefetch_count The number of unacknowledged message the
* @param globalQos Sets basic.qos.global explicitly.
* broker will deliver. A value of 0 means no limit.
*/
void BasicQos(const std::string &consumer_tag,
boost::uint16_t message_prefetch_count, bool globalQos);

/**
* Cancels a previously created Consumer
*
Expand Down