forked from yeyouqun/xdeltalib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathxdelta_server.h
186 lines (171 loc) · 5.77 KB
/
xdelta_server.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
// author:[email protected]
#ifndef __XDELTA_SERVER_H__
#define __XDELTA_SERVER_H__
/// @file
/// 声明网络服务端处理基类。
namespace xdelta {
/// \def XDELTA_DEFAULT_PORT
/// 默认服务器同步端口,用户可以指定其他端口。
#define XDELTA_DEFAULT_PORT 1366
/// \def XDELTA_ADDR
/// 本地服务地址
#define XDELTA_ADDR "127.0.0.1"
/// \def STACK_BUFF_LEN
/// 栈缓存空间大小,应该保持尽量小,同时又满足一次数据块头的读取。
#define STACK_BUFF_LEN 1024
/// \def NO_MULTI_ROUND_FILSIZE
/// 默认多轮 Hash 的文件尺寸大小,一般不会有文件超过这个大小,因此指定这个参数时,所有文件都只执行单轮 Hash。
#define NO_MULTI_ROUND_FILSIZE ((uint64_t)(-1))
/// \class
/// \brief
/// 本类用于观察同步执行流程,使用者需要自己派生相应的观察者类,以实现自己的统计或者其他相应的操作。
class xdelta_observer
{
public:
xdelta_observer () {}
virtual ~xdelta_observer () {}
/// \brief
/// 指示开始处理文件的 Hash 流
/// \param[in] fname 文件名,带相对路径
/// \param[in] blk_len 处理文件的块长度
/// \return 没有返回
virtual void start_hash_stream (const std::string & fname, const int32_t blk_len) = 0;
/// \brief
/// 指示结束一个文件的 Hash 流的处理。
/// \param[in] filsize 源文件的大小。
/// \return 没有返回
virtual void end_hash_stream (const uint64_t filesize) = 0;
/// \brief
/// 指示结束多轮 Hash 中第一轮,其相应结果相当于单轮 Hash 中的 end_hash_stream。
/// \param[in] file_hash 整个文件的 MD4 Hash 值。
/// \return 如果源文件中判断需要继续下一轮,则返回真,否则返回假。
virtual void end_first_round () = 0;
/// \brief
/// 指示下一轮 Hash 流开始。
/// \param[in] blk_len 下一轮 Hash 的块长度。
/// \return 没有返回
virtual void next_round (const int32_t blk_len) = 0;
/// \brief
/// 指示结束一轮 Hash,只在多轮 Hash 中调用
/// \return 没有返回
virtual void end_one_round () = 0;
/// \brief
/// 指示处理过程中的错误。
/// \param[in] errmsg 错误信息。
/// \param[in] errorno 错误码。
/// \return 没有返回
virtual void on_error (const std::string & errmsg, const int errorno) = 0;
/// \brief
/// 指示发送的字节数,可以用来进行数据统计。
/// \param[in] bytes 字节数。
/// \return 没有返回
virtual void bytes_send (const uint32_t bytes) = 0;
/// \brief
/// 指示接收的字节数,可以用来进行数据统计。
/// \param[in] bytes 字节数。
/// \return 没有返回
virtual void bytes_recv (const uint32_t bytes) = 0;
};
/// \class
/// \brief
/// 本类用于作为服务端接收同步数据,当执行 run 后,一个任务结果后才会返回。
class DLL_EXPORT xdelta_server
{
CPassiveSocket server_; ///< 服务器 Socket 对象。
uint64_t auto_multiround_filsize_;///< 多轮 Hash 的边界值。
bool inplace_; ///< 采用就地构造文件的方式同步。多轮 Hash 与就地构造不能同时出现。
void _start_task (file_operator & foperator
, xdelta_observer & observer
, uint16_t port);
public:
//
// @auto_multiround_filsize if file's size excess this size
// will used multiround xdelta.
//
xdelta_server ()
: server_ ()
, auto_multiround_filsize_ (NO_MULTI_ROUND_FILSIZE)
, inplace_ (false)
{
// less than this size will cause no difference with single round.
if (MULTIROUND_MAX_BLOCK_SIZE > auto_multiround_filsize_)
auto_multiround_filsize_ = NO_MULTI_ROUND_FILSIZE;
}
/// \brief
/// 指示同步任务的文件构造通过就地的方式进行。
/// \return 没有返回
void set_inplace ();
/// \brief
/// 设置需要进行多轮 Hash 同步的文件大小边界。如果没有设置,则默认不进行多轮 Hash。
/// \param[in] multi_round_size 执行多轮 Hash 同步的边界大小。
/// \return 没有返回
void set_multiround_size (uint64_t multi_round_size);
~xdelta_server () {}
/// \brief
/// 开始执行同步,同步完成后,本接口才会返回。
/// \param[in] foperator 文件操作器。
/// \param[in] observer 观察者对象。
/// \param[in] port 服务器端口。
/// \return 没有返回
void run (file_operator & foperator
, xdelta_observer & observer
, uint16_t port = XDELTA_DEFAULT_PORT);
};
inline void wait_to_exit (std::vector<thread*> & thrds)
{
std::for_each (thrds.begin (),thrds.end (), std::mem_fun (&thread::join));
std::for_each (thrds.begin (), thrds.end (), delete_obj<thread>);
thrds.clear ();
}
void init_passive_socket (CPassiveSocket & passive, uint16_t port);
void init_active_socket (CActiveSocket & active, const uchar_t * addr, uint16_t port);
inline void read_block (char_buffer<uchar_t> & buff
, CSimpleSocket & client
, uint32_t len
, xdelta_observer & observer)
{
if (len == 0)
return;
int32_t nbytes = client.Receive (buff, len);
if (nbytes <= 0) {
std::string errmsg = fmt_string ("Socket is broken or closed!");
THROW_XDELTA_EXCEPTION (errmsg);
}
observer.bytes_recv (len);
}
inline block_header read_block_header (CSimpleSocket & client, xdelta_observer & observer)
{
DEFINE_STACK_BUFFER (buff);
read_block (buff, client, BLOCK_HEAD_LEN, observer);
block_header header;
buff >> header;
return header;
}
inline void send_block (CSimpleSocket & socket, char_buffer<uchar_t> & data, xdelta_observer & observer)
{
uint32_t tbytes = data.data_bytes ();
if (tbytes > 0) {
uint32_t nbytes = socket.Send (data.rd_ptr (), tbytes);
if (nbytes != tbytes) {
std::string errmsg = fmt_string ("Send data to client failed.");
THROW_XDELTA_EXCEPTION (errmsg);
}
observer.bytes_send (tbytes);
}
data.reset ();
}
void buffer_or_send (CSimpleSocket & socket
, char_buffer<uchar_t> & stream_buff
, char_buffer<uchar_t> & header_buff
, char_buffer<uchar_t> & data_buff
, xdelta_observer & observer);
inline void streamize_data (CSimpleSocket & socket
, char_buffer<uchar_t> & header_buff
, char_buffer<uchar_t> & data_buff
, xdelta_observer & observer)
{
send_block (socket, header_buff, observer);
send_block (socket, data_buff, observer);
}
} //namespace xdelta
#endif //__XDELTA_SERVER_H__