-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathOpenDDST.cpp
340 lines (223 loc) · 7.5 KB
/
OpenDDST.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
#include <dds/DCPS/Marked_Default_Qos.h>
#include <dds/DCPS/Service_Participant.h>
#include <string>
#include <thread>
#include <iostream> //cout
#include <ctime> //time format
#include <conio.h> //key press
#include <map> //map - list of vehicles
#include "MriTypeSupportImpl.h"
#include "DataWriter_Aux2Strings.h"
#include "DataReader_Aux2Strings.h"
#include "DataReader_V2XMessage.h"
#include "DataWriter_V2XMessage.h"
#include "DataReader_VehData.h"
#include "QueueTs.h"
#include "MriTimeSync.h"
#include "OpenDDS.h"
#include "V2xApps.h"
using std::cerr;
using std::cout;
using std::endl;
using std::string;
Mri::Aux2StringsDataWriter_var writer_global_aux2strings;
Mri::V2XMessageDataWriter_var writer_global_v2xmessage;
long veh_id_to_remove;
QueueTs<Mri::VehData> vehdata_queue;
std::map<long, Mri::VehData> vehs_map;
bool finish_application;
QueueTs<Mri::V2XMessage> v2x_queue;
bool getInput(char *c)
{
if (_kbhit())
{
*c = _getch();
return true; // Key Was Hit
}
return false; // No keys were pressed
}
void sendV2X(long sender_id, long sender_timestamp, string message) {
Mri::V2XMessage v2x;
v2x.sender_id = sender_id;
v2x.sender_timestamp = sender_timestamp;
v2x.message = message.c_str();
v2x.recipient_id = -1;
v2x.recipient_timestamp = -1;
int success = writer_global_v2xmessage->write(v2x, DDS::HANDLE_NIL);
if (success != DDS::RETCODE_OK) {
//ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: TimeSync send message write returned %d.\n"), success));
throw std::string("ERROR: SendV2X message failed");
}
else
{
//cout << endl <<" ***** SEND V2X message:" << v2x.message << " sender_id=" << v2x.sender_id << " sender_timestamp=" << v2x.sender_timestamp << endl;
}
}
void vehsMapThread() {
Mri::VehData _veh;
//std::map<long, Mri::VehData> vehs_map;
std::map<long, Mri::VehData>::iterator it;
veh_id_to_remove = -1; // initial value
while (!finish_application)
{
//wait for something at the queue
vehdata_queue.pop(_veh);
it = vehs_map.find(_veh.vehicle_id);
if (it != vehs_map.end()) {
//there is a car with id = _veh.vehicle_id
//check if new data is older than in vehs_map
if (_veh.timestamp > it->second.timestamp)
{
//update data in vehs_map
vehs_map[it->first] = _veh;
}
else
{
cout << "OpenDDS data is older than data in vehs_list. Veh_id=" << _veh.vehicle_id
<< " timestamp_vehs_list=" << it->second.timestamp
<< " timestamp_OpenDDS=" << _veh.timestamp << endl;
}
}
else
{
//no car with id = _veh.vehicle_id
vehs_map.emplace(_veh.vehicle_id, _veh);
}
//check if we have to remove a car
if (veh_id_to_remove!=-1)
{
it = vehs_map.find(veh_id_to_remove);
veh_id_to_remove = -1; //reset this variable
if (it != vehs_map.end())
{
vehs_map.erase(it);
//cout << endl << "####################################################################################" << endl;
cout << endl << "####### Removed from vehsMap vehicle_id =" << veh_id_to_remove << " #####" << endl<<endl;
}
//veh_id_to_remove = -1; //reset this variable
}
}
}
void v2xMapThread() {
// thread where app receives v2x messages from NS-3
Mri::V2XMessage _v2x;
while (!finish_application)
{
Mri::VehData subjectCar1;
Mri::VehData _veh;
float distance = -1;
//wait for something at the queue
v2x_queue.pop(_v2x);
{
//we do not process v2x dnpw => moved to opendds3 unity client
//_veh = readVehDatafromString((string)_v2x.message);
//subjectCar1 = vehs_map[1];
//
//if (subjectCar1.vehicle_id!= _veh.vehicle_id)
//{
// distance = doNotPassWarning(subjectCar1.position_x, subjectCar1.position_y, subjectCar1.orient_heading, _veh.position_x, _veh.position_y, _veh.orient_heading);
// if (distance>0 && distance <160)
// {
// cout << endl <<" *** WARNING distance= " << distance << " ***" << endl << endl;
// sendDNPWMessage(distance, 5); // 5 is an app id of OpenDS 1
// }
//}
}
}
}
void OpenDDSThread(int argc, char* argv[]){
// start thread
/*int argc;
char* argv[]={ "-DCPSConfigFile","rtps.ini" };*/
char key = ' ';
DDS::DomainId_t DomainID{ 246 };
const char * topic_traffic_vehs_name = "Mri_TrafficVeh";
const char * topic_subject_car_name = "Mri_SubjectCar";
try
{
std::thread threadTimestamp(TimestampThread);
// Initialize DomainParticipantFactory
DDS::DomainParticipantFactory_var dpf =
TheParticipantFactoryWithArgs(argc, argv);
// Create DomainParticipant
DDS::DomainParticipant_var participant =
dpf->create_participant(DomainID,
PARTICIPANT_QOS_DEFAULT,
0,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil(participant.in())) {
cerr << "Participant: Failed to create participant..." << endl;
}
else {
cout << "Participant: participant created successfully" << endl;
}
// Create Subscriber
DDS::Subscriber_var subscriber = participant->create_subscriber(
SUBSCRIBER_QOS_DEFAULT,
DDS::SubscriberListener::_nil(),
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil(subscriber.in())) {
cerr << "create_subscriber failed." << endl;
}
else {
cout << "SubscriberClass: subscriber created correctly" << endl;
}
// Create Publisher
DDS::Publisher_var publisher = participant->create_publisher(
PUBLISHER_QOS_DEFAULT,
DDS::PublisherListener::_nil(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK
);
if (CORBA::is_nil(publisher.in())) {
cerr << "create_publisher failed." << endl;
}
else {
cout << "PublisherClass: publisher created correctly" << endl;
}
// run timestamp synchronization
if (!TimeSynchronization(participant, subscriber, publisher))
{
//there was a problem with time synchronization
cout << "#### THERE WAS A PROBLEM WITH TIMESTAMP SYNCHRONIZATION. PROGRAM WILL CLOSE... ####" << endl;
// Clean-up!
participant->delete_contained_entities();
dpf->delete_participant(participant);
TheServiceParticipant->shutdown();
threadTimestamp.detach();
finish_application = true;
Sleep(5000);
exit(-1);
}
//Timestamps synchronized !
cout << "Timestamp synchronized. t=" << GetTimestamp() << endl << endl;
cout << "To quit press 'q'" << endl << endl;
//create reader to receive V2X message Mri_V2XfromNS3 Mri_V2XtoNS3
//DataReader_V2XMessage reader_v2xmessage(participant, subscriber, "Mri_V2XfromNS3");
// writer
DataWriter_V2XMessage writer_v2xMessage(participant, publisher, "Mri_V2XtoNS3");
// topics: Mri_TrafficVeh Mri_SubjectCar
//WARNING: both readers uses the same "on_data_available" code in DataReaderListenerImpl_VehData
//we can do this, because formats of the messages are the same
DataReader_VehData reader_traffic_vehs(participant, subscriber, "Mri_TrafficVeh");
// 8/3/2018 rd disabled datareader, subjectCar sends v2x BSM messages independently
//DataReader_VehData reader_subject_car(participant, subscriber, "Mri_SubjectCar");
//// to send dnpw message we need:
//DataWriter_Aux2Strings sender(participant, publisher, "Mri_Control");
while (key != 'q')
{
//Sleep(200);
getInput(&key);
}
//close thread
finish_application = true; //it breaks waiting "while .. " and close app
threadTimestamp.detach();
Sleep(2500);
// Clean-up!
participant->delete_contained_entities();
dpf->delete_participant(participant);
TheServiceParticipant->shutdown();
}
catch (const CORBA::Exception& e) {
e._tao_print_exception("Exception caught in OpenDDSThread:");
}
}