Skip to content

Commit

Permalink
Split large data when using brpc streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaaaaaron committed Oct 9, 2022
1 parent f1d1cd5 commit 67b6b29
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 12 deletions.
3 changes: 1 addition & 2 deletions docs/cn/streaming_rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ Streaming RPC保证:
- 全双工。
- 支持流控。
- 提供超时提醒

目前的实现还没有自动切割过大的消息,同一个tcp连接上的多个Stream之间可能有[Head-of-line blocking](https://en.wikipedia.org/wiki/Head-of-line_blocking)问题,请尽量避免过大的单个消息,实现自动切割后我们会告知并更新文档。
- 自动切割过大的消息

例子见[example/streaming_echo_c++](https://github.com/brpc/brpc/tree/master/example/streaming_echo_c++/)

Expand Down
3 changes: 1 addition & 2 deletions docs/en/streaming_rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ Streaming RPC ensures/provides:
- Full duplex
- Flow control
- Notification on timeout

We do not support segment large messages automatically so that multiple Streams on a single TCP connection may lead to [Head-of-line blocking](https://en.wikipedia.org/wiki/Head-of-line_blocking) problem. Please avoid putting huge data into single message until we provide automatic segmentation.
- support segment large messages automaticall

For examples please refer to [example/streaming_echo_c++](https://github.com/brpc/brpc/tree/master/example/streaming_echo_c++/).

Expand Down
27 changes: 19 additions & 8 deletions src/brpc/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
namespace brpc {

DECLARE_bool(usercode_in_pthread);

DEFINE_uint64(max_trans_unit_size, 64 * 1024 * 1024,
"Maximum size of a transmission unit that we used to cut the message.");
const static butil::IOBuf *TIMEOUT_TASK = (butil::IOBuf*)-1L;

Stream::Stream()
Expand Down Expand Up @@ -140,20 +141,30 @@ ssize_t Stream::CutMessageIntoFileDescriptor(int /*fd*/,
errno = EBADF;
return -1;
}
butil::IOBuf out;
ssize_t len = 0;
for (size_t i = 0; i < size; ++i) {
butil::IOBuf *data = data_list[i];
size_t length = data->length();
uint64_t trans_unit = FLAGS_max_trans_unit_size;
int packet_num = ceil((double)length / (double)trans_unit);

butil::IOBuf split_data;
for (int j = 0; j < packet_num; j++) {
butil::IOBuf out;
data->cutn(&split_data, trans_unit);
bool has_continuation = (j != packet_num - 1);
StreamFrameMeta fm;
fm.set_stream_id(_remote_settings.stream_id());
fm.set_source_stream_id(id());
fm.set_frame_type(FRAME_TYPE_DATA);
// TODO: split large data
fm.set_has_continuation(false);
policy::PackStreamMessage(&out, fm, data_list[i]);
len += data_list[i]->length();
data_list[i]->clear();
fm.set_has_continuation(has_continuation);
policy::PackStreamMessage(&out, fm, &split_data);
WriteToHostSocket(&out);
len += (ssize_t)split_data.length();
split_data.clear();
}
data->clear();
}
WriteToHostSocket(&out);
return len;
}

Expand Down

0 comments on commit 67b6b29

Please sign in to comment.