forked from vsouz4/azure-uamqp-php
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProducer.cpp
88 lines (73 loc) · 2.64 KB
/
Producer.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#include "Producer.h"
#include "azure_c_shared_utility/platform.h"
#include "azure_c_shared_utility/tlsio.h"
#include "azure_c_shared_utility/socketio.h"
#include "azure_uamqp_c/uamqp.h"
#include "Session.h"
#include "Message.h"
static unsigned int sent_messages = 0;
static bool producerStopRunning = false;
std::string producerExceptionMessage = "";
static void on_link_detach_received_producer(void* context, ERROR_HANDLE error)
{
(void)error;
const char* condition = NULL;
const char* description = NULL;
error_get_condition(error, &condition);
error_get_description(error, &description);
producerExceptionMessage += "(" + std::string(condition) + ") " + std::string(description);
producerStopRunning = true;
}
static void on_message_send_complete(void* context, MESSAGE_SEND_RESULT send_result, AMQP_VALUE delivery_state)
{
(void)send_result;
(void)context;
(void)delivery_state;
sent_messages++;
}
Producer::Producer(Session *session, std::string resourceName)
{
this->session = session;
this->resourceName = resourceName;
source = messaging_create_source("ingress");
target = messaging_create_target(("amqps://" + session->getConnection()->getHost() + "/" + resourceName).c_str());
link = link_create(session->getSessionHandler(), "sender-link", role_sender, source, target);
link_set_snd_settle_mode(link, sender_settle_mode_unsettled);
(void)link_set_max_message_size(link, 65536);
link_subscribe_on_link_detach_received(link, on_link_detach_received_producer, session);
amqpvalue_destroy(source);
amqpvalue_destroy(target);
/* create a message sender */
message_sender = messagesender_create(link, NULL, NULL);
if (message_sender == NULL) {
throw Php::Exception("Could not create message sender");
}
if (session->getConnection()->isDebugOn()) {
messagesender_set_trace(message_sender, true);
}
}
void Producer::publish(Message *message)
{
MESSAGE_HANDLE msg = message->getMessageHandler();
sent_messages = 0;
if (messagesender_open(message_sender) != 0) {
throw Php::Exception("Error creating messaging sender");
}
(void)messagesender_send_async(message_sender, msg, on_message_send_complete, msg, 10000);
while (!producerStopRunning)
{
session->getConnection()->doWork();
if (sent_messages == 1)
{
break;
}
}
message_destroy(msg);
messagesender_destroy(message_sender);
link_destroy(link);
session->close();
session->getConnection()->close();
if (!producerExceptionMessage.empty()) {
throw Php::Exception(producerExceptionMessage);
}
}