-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhdfsdumpreader.cpp
181 lines (157 loc) · 5.94 KB
/
hdfsdumpreader.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#include "hdfsdumpreader.h"
#include <assert.h>
#include <zlib.h>
#include <iostream>
/**********************************************************************/
/** Reads a int written by java's DataOutput#writeInt
*
* FIXME Move to propper namespace
*/
inline int32_t ReadInt(tmacam::filebuf* data) {
assert(sizeof(int32_t) == 4);
const char* bytes = data->read(sizeof(int32_t));
return (((bytes[0] & 0xff) << 24) | ((bytes[1] & 0xff) << 16) |
((bytes[2] & 0xff) << 8) | (bytes[3] & 0xff));
}
/**********************************************************************/
namespace tmacam {
HdfsDumpReader::HdfsDumpReader(hdfs::FileSystem* fs, const char* path,
size_t buffer_size) :
fs_(fs),
path_(path),
file_(*fs, path, O_RDONLY, buffer_size, 0, 0),
file_size_(0), // we will get to you in a momment, sir
buffer_size_(buffer_size),
buffer_(buffer_size),
bytes_read_(0),
available_data_(),
empty_file_buffer_()
{
hdfs::FileInfoList file_info;
fs_->GetPathInfo(path, &file_info);
file_size_ = file_info->mSize;
}
bool HdfsDumpReader::HasNext()
{
// We won't be done untill we have read all the file and
// there is nothing left in available_data_ to be read.
return (bytes_read_ < file_size_) || !available_data_.eof();
}
tmacam::filebuf HdfsDumpReader::GetNext()
{
/* First of all, remember that this code originally was something like
* this:
*
* for (bytes_read_ = 0; bytes_read_ < file_size_;) {
* (...)
* try {
* while(!available_data_.eof()) {
* (...)
* // yield data unit
* }
* } catch (std::out_of_range) {
* (...)
* }
*
* Missing closures now?
*
* Using Pread() and exceptions for doing this is ugly and lame.
* We should have used Read() and proper tests but, you know what,
* this works and is easy to understand -- and this scores +1 in
* my book.
*/
assert(HasNext()); // Simplify corner cases for the next if
// Do we need to perform a read operation? Can we perform a read?
if (available_data_.eof() && (bytes_read_ < file_size_)) {
tSize read_length = file_.Pread(bytes_read_, &buffer_[0], buffer_size_);
bytes_read_ += read_length;
available_data_ = tmacam::filebuf(&buffer_[0], read_length);
}
// Save current progress
size_t data_left_len = available_data_.len();
try {
// Try to read a full pkt (hdr + payload + crc trailer)
assert(data_left_len);
const char* pkt_start = available_data_.current;
// Read header, payload and CRC. Abort if payload is too big
int32_t payload_len = ReadInt(&available_data_);
assert(payload_len + 2*sizeof(int32_t) < buffer_size_);
const char* payload_data = available_data_.read(payload_len);
int32_t expected_checksum = ReadInt(&available_data_);
#ifndef DUMPREADER_FAST_PASS
// Calc CRC32
uLong crc = crc32(0L, (const Bytef*)payload_data, payload_len);
if (expected_checksum != static_cast<int32_t>(crc)) {
std::cerr << "CRC MISSMATCH -- found " << crc <<
" expected" << expected_checksum <<
std::endl;
exit(EXIT_FAILURE);
}
#endif
// Ok, pkt content was read and is sane. Return it.
size_t pkt_len = data_left_len - available_data_.len();
return tmacam::filebuf(pkt_start, pkt_len);
} catch(std::out_of_range) {
// Ooops! Not enough data...
// rewind reading position and reset available_data_
bytes_read_ -= data_left_len;
available_data_ = empty_file_buffer_;
// and retry
return GetNext();
}
}
// Code left for historical and documentational purposes.
/*
* void HdfsDumpReader::ReadFile()
* {
* size_t data_left_len = 0;
*
* for (bytes_read_ = 0; bytes_read_ < file_size_;) {
* !* Using Pread() and exceptions for doing this is ugly and lame.
* * We should have used Read() and proper tests but, you know what,
* * this works and is easy to understand -- and this scores +1 in
* * my book.
* *!
* tSize read_length = file_.Pread(bytes_read_,
* &buffer_[0],
* buffer_size_);
* bytes_read_ += read_length;
*
* available_data_ = tmacam::filebuf(&buffer_[0], read_length);
*
* std::cout << "READ " << read_length << std::endl;
*
* try {
* while (!available_data_.eof()) {
* // Save current progress
* data_left_len = available_data_.len();
* // Read header, payload and CRC. Abort if payload is too big
* int32_t payload_len = ReadInt(&available_data_);
* assert(payload_len + 2*sizeof(int32_t) < buffer_size_);
* const char* payload_data = available_data_.read(payload_len);
* int32_t expected_checksum = ReadInt(&available_data_);
* #ifndef DUMPREADER_FAST_PASS
* // Calc CRC32
* uLong crc = crc32(0L,
* (const Bytef*)payload_data,
* payload_len);
* if (expected_checksum != static_cast<int32_t>(crc)) {
* std::cerr << "CRC MISSMATCH -- found " << crc <<
* " expected" << expected_checksum <<
* std::endl;
* exit(EXIT_FAILURE);
* }
* #endif
* std::cout << "P: " << payload_len << std::endl;
* }
* } catch(std::out_of_range) {
* std::cout << "ooops " << std::endl;
* // not enough data...
* // rewind reading position
* bytes_read_ -= data_left_len;
* }
* }
* }
*/
}; // namespace tmacam
// vim: et ai sts=4 ts=4 sw=4