-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathDataWriter_Aux2Strings.cpp
137 lines (98 loc) · 3.19 KB
/
DataWriter_Aux2Strings.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include "DataWriter_Aux2Strings.h"
#include <dds/DCPS/Marked_Default_Qos.h>
#include <dds/DCPS/WaitSet.h>
#include <iostream>
#include "MriTimeSync.h"
using std::cerr;
using std::cout;
using std::endl;
using std::string;
DataWriter_Aux2Strings::DataWriter_Aux2Strings(DDS::DomainParticipant_var participant, DDS::Publisher_var publisher, const char * topic_name)
{
this->participant = participant;
this->publisher = publisher;
this->topic = createTopic(topic_name);
this->writer = createDataWriter();
this->msg_writer = Mri::Aux2StringsDataWriter::_narrow(writer.in());
writer_global_aux2strings = msg_writer;
waitForSubscriber();
}
DataWriter_Aux2Strings::~DataWriter_Aux2Strings()
{
}
void DataWriter_Aux2Strings::waitForSubscriber() {
// Block until Subscriber is available
DDS::StatusCondition_var condition = writer->get_statuscondition();
condition->set_enabled_statuses(DDS::PUBLICATION_MATCHED_STATUS);
DDS::WaitSet_var ws = new DDS::WaitSet;
ws->attach_condition(condition);
while (true) {
DDS::PublicationMatchedStatus matches;
if (writer->get_publication_matched_status(matches) != ::DDS::RETCODE_OK) {
ACE_ERROR((LM_ERROR,ACE_TEXT("ERROR: %N:%l: main() - get_publication_matched_status failed!\n")),-1);
}
if (matches.current_count >= 1) {
break;
}
DDS::ConditionSeq conditions;
DDS::Duration_t timeout = { 60, 0 };
if (ws->wait(conditions, timeout) != DDS::RETCODE_OK) {
ACE_ERROR((LM_ERROR,
ACE_TEXT("ERROR: %N:%l: main() -")
ACE_TEXT(" wait failed!\n")),
-1);
}
}
ws->detach_condition(condition);
}
void DataWriter_Aux2Strings::sendMessage(const Mri::Aux2Strings& message) {
int success = msg_writer->write(message, DDS::HANDLE_NIL);
if (success != DDS::RETCODE_OK) {
ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Publisher::sendMessage write returned %d.\n"), success));
}
}
DDS::Topic_var
DataWriter_Aux2Strings::createTopic(const char * topic_name)
{
// Register TypeSupport (Messenger::Message)
Mri::Aux2StringsTypeSupport_var ts =
new Mri::Aux2StringsTypeSupportImpl;
if (ts->register_type(participant, "") != DDS::RETCODE_OK) {
throw std::string("failed to register type support");
}
// Create Topic (Mri_Control)
CORBA::String_var type_name = ts->get_type_name();
DDS::Topic_var topic =
participant->create_topic(topic_name,
type_name,
TOPIC_QOS_DEFAULT,
DDS::TopicListener::_nil(),
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
// Check for failure
if (!topic) {
throw std::string("failed to create topic");
}
return topic;
}
DDS::DataWriter_var
DataWriter_Aux2Strings::createDataWriter()
{
DDS::DataWriterQos dw_qos;
publisher->get_default_datawriter_qos(dw_qos);
dw_qos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
dw_qos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
dw_qos.reliability.max_blocking_time.sec = 10;
dw_qos.reliability.max_blocking_time.nanosec = 0;
dw_qos.resource_limits.max_samples_per_instance = 100;
// Create DataWriter
DDS::DataWriter_var writer =
publisher->create_datawriter(topic,
dw_qos,
DDS::DataWriterListener::_nil(),
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
// Check for failure
if (!writer) {
throw std::string("failed to create data writer");
}
return writer;
}