-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathXR25streamreader.cc
96 lines (86 loc) · 3.18 KB
/
XR25streamreader.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/* XR25StreamReader.cc - Parse Renault XR25 frame stream
*
* Copyright (C) Javier Lopez-Gomez, 2016
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*/
#include "XR25streamreader.hh"
#include <condition_variable>
#include <iomanip>
#include <mutex>
#include <tuple>
#include <type_traits>
void XR25StreamReader::start(XR25FrameParser &parser) {
if (!_thrd)
_thrd = std::make_unique<std::thread>([&parser, this]() { this->read_frames(parser); });
}
void XR25StreamReader::stop() {
if (_thrd) {
pthread_cancel(_thrd->native_handle());
_thrd->join();
}
}
/** Frame received handler.
* @param parser The XR25FrameParser to use
* @param c Translated frame ("0xff 0xff" replaced by "0xff
* ")
* @param length Length of the frame in octets
* @param fra Reference to the parsed frame
*/
void XR25StreamReader::frame_recv(XR25FrameParser &parser, const unsigned char c[], int length, XR25Frame &fra) {
this->_fra_count++;
#ifdef DEBUG
std::cout << "[" << std::dec << length << "] " << std::hex << std::setfill('0');
for (int i = 0; i < length; ++i)
std::cout << " " << std::setw(2) << static_cast<int>(c[i]);
std::cout << std::endl;
#endif
parser.parse_frame(c, length, fra);
if (_post_parse)
_post_parse(c, length, fra);
}
void XR25StreamReader::read_frames(XR25FrameParser &parser) {
unsigned char frame[128] = {0xff, 0x00}, c, *p = &frame[1];
XR25Frame fra{};
std::condition_variable term;
std::mutex term_m;
std::atomic_int count(0);
// thread that updates _frames_per_sec once a second
std::thread stat_thread([&term, &term_m, &count, this]() {
std::unique_lock<std::mutex> lock(term_m);
while (term.wait_for(lock, std::chrono::seconds(1)) == std::cv_status::timeout)
this->_frames_per_sec = count.exchange(0);
});
// thread cancellation clean-up handler
typedef std::tuple<std::condition_variable &, std::thread &> arg_tuple_t;
auto args = std::make_tuple(std::ref(term), std::ref(stat_thread));
pthread_cleanup_push(
[](void *p) {
auto args = *static_cast<arg_tuple_t *>(p);
std::get<0>(args).notify_one();
std::get<1>(args).join();
},
&args);
while (!_in.eof()) {
if ((c = _in.get()) == 0xff) {
if ((c = _in.get()) == 0x00) { /* start of frame */
if (_synchronized)
frame_recv(parser, frame, p - frame, fra), count++;
_synchronized = 1, p = &frame[1];
} else if (c != 0xff) /* translate 'ff ff' to 'ff' */
_in.unget();
}
if (_synchronized)
static_cast<unsigned>(p - frame) < std::extent<decltype(frame)>::value ? *p++ = c
: (_synchronized = 0, _sync_err_count++);
}
pthread_cleanup_pop(1);
}