Skip to content

Commit

Permalink
websocket_frame_parser: Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
lhmouse committed Jan 22, 2024
1 parent c980b32 commit f6d977a
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 61 deletions.
8 changes: 4 additions & 4 deletions poseidon/base/deflator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ deflate(chars_view data)
if(data.n == 0)
return 0;

int err;
const char* in_ptr = data.p;
const char* in_end = in_ptr + data.n;
int err;

do {
// Allocate an output buffer and write compressed data there.
Expand Down Expand Up @@ -66,9 +66,9 @@ bool
Deflator::
sync_flush()
{
int err;
const char* in_ptr = "";
const char* in_end = in_ptr;
int err;

do {
// Allocate an output buffer and write compressed data there.
Expand Down Expand Up @@ -100,9 +100,9 @@ bool
Deflator::
full_flush()
{
int err;
const char* in_ptr = "";
const char* in_end = in_ptr;
int err;

do {
// Allocate an output buffer and write compressed data there.
Expand Down Expand Up @@ -134,9 +134,9 @@ bool
Deflator::
finish()
{
int err;
const char* in_ptr = "";
const char* in_end = in_ptr;
int err;

do {
// Allocate an output buffer and write compressed data there.
Expand Down
7 changes: 5 additions & 2 deletions poseidon/base/inflator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ size_t
Inflator::
inflate(chars_view data)
{
if(data.n == 0)
return 0;

int err;
const char* in_ptr = data.p;
const char* in_end = in_ptr + data.n;
int err;

do {
constexpr size_t out_request = 128;
Expand Down Expand Up @@ -62,9 +65,9 @@ bool
Inflator::
finish()
{
int err;
const char* in_ptr = "";
const char* in_end = in_ptr;
int err;

do {
// Allocate an output buffer and write compressed data there.
Expand Down
16 changes: 6 additions & 10 deletions poseidon/http/websocket_deflator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@ void
WebSocket_Deflator::
deflate_message_stream(plain_mutex::unique_lock& lock, chars_view data)
{
lock.lock(this->m_def_mtx);

if(data.n == 0)
return;

lock.lock(this->m_def_mtx);
int err;
const char* in_ptr = data.p;
const char* in_end = in_ptr + data.n;
int err;

do {
// Allocate an output buffer and write compressed data there.
Expand All @@ -62,10 +61,9 @@ WebSocket_Deflator::
deflate_message_finish(plain_mutex::unique_lock& lock)
{
lock.lock(this->m_def_mtx);

int err;
const char* in_ptr = "";
const char* in_end = in_ptr;
int err;

do {
// Allocate an output buffer and write compressed data there.
Expand All @@ -90,14 +88,13 @@ void
WebSocket_Deflator::
inflate_message_stream(plain_mutex::unique_lock& lock, chars_view data)
{
lock.lock(this->m_inf_mtx);

if(data.n == 0)
return;

lock.lock(this->m_inf_mtx);
int err;
const char* in_ptr = data.p;
const char* in_end = in_ptr + data.n;
int err;

do {
// Allocate an output buffer and write compressed data there.
Expand All @@ -120,10 +117,9 @@ WebSocket_Deflator::
inflate_message_finish(plain_mutex::unique_lock& lock)
{
lock.lock(this->m_inf_mtx);

int err;
const char* in_ptr = "\x00\x00\xFF\xFF";
const char* in_end = in_ptr + 4;
int err;

do {
// Allocate an output buffer and write compressed data there.
Expand Down
46 changes: 27 additions & 19 deletions poseidon/http/websocket_frame_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,14 +428,14 @@ parse_frame_header_from_stream(linear_buffer& data)
if(this->m_wsf >= wsf_header_done)
return;

if(this->m_msg_fin) {
if(this->m_fin) {
// If a previous message has finished, forget it before a next frame. It
// is important for control frames to not touch `m_msg_*` fields.
this->m_msg_fin = 0;
this->m_msg_rsv1 = 0;
this->m_msg_rsv2 = 0;
this->m_msg_rsv3 = 0;
this->m_msg_opcode = 0;
// is important for control frames to not touch `m_*` fields.
this->m_fin = 0;
this->m_rsv1 = 0;
this->m_rsv2 = 0;
this->m_rsv3 = 0;
this->m_opcode = 0;
}

// Calculate the length of this header.
Expand Down Expand Up @@ -470,7 +470,7 @@ parse_frame_header_from_stream(linear_buffer& data)
case 1: // text data
case 2: // binary data
{
if(this->m_msg_opcode != 0) {
if(this->m_opcode != 0) {
// The previous message must have terminated.
this->m_wsf = wsf_error;
this->m_error_desc = "continuation frame expected";
Expand All @@ -492,14 +492,15 @@ parse_frame_header_from_stream(linear_buffer& data)
return;
}

// Copy message header fields for later use.
this->m_msg_fin = this->m_frm_header.fin;
this->m_msg_rsv1 = this->m_frm_header.rsv1;
this->m_msg_rsv2 = this->m_frm_header.rsv2;
this->m_msg_rsv3 = this->m_frm_header.rsv3;
this->m_msg_opcode = this->m_frm_header.opcode;
// Copy fields for later use.
this->m_fin = this->m_frm_header.fin;
this->m_rsv1 = this->m_frm_header.rsv1;
this->m_rsv2 = this->m_frm_header.rsv2;
this->m_rsv3 = this->m_frm_header.rsv3;
this->m_opcode = this->m_frm_header.opcode;
POSEIDON_LOG_TRACE(("WebSocket data frame: opcode = $1, rsv1 = $2"), this->m_opcode, this->m_rsv1);
break;
}
break;

case 0: // continuation
{
Expand All @@ -510,18 +511,22 @@ parse_frame_header_from_stream(linear_buffer& data)
return;
}

if(this->m_msg_opcode == 0) {
if(this->m_opcode == 0) {
// A continuation frame must follow a data frame.
this->m_wsf = wsf_error;
this->m_error_desc = "dangling continuation frame";
return;
}

// If this is a FIN frame, terminate the current message.
// If this is a FIN frame, terminate the current message.
if(mask_len_rsv_opcode & 0b10000000)
this->m_msg_fin = 1;
this->m_fin = 1;

// There shall be a message payload.
POSEIDON_LOG_TRACE(("WebSocket data continuation: opcode = $1"), this->m_opcode);
break;
}
break;

case 8: // close
case 9: // ping
Expand Down Expand Up @@ -552,8 +557,11 @@ parse_frame_header_from_stream(linear_buffer& data)
this->m_error_desc = "control frame not fragmentable";
return;
}

// There shall be a message payload.
POSEIDON_LOG_TRACE(("WebSocket control frame: opcode = $1"), this->m_opcode);
break;
}
break;

default:
// Reject this frame with an unknown opcode.
Expand Down
20 changes: 10 additions & 10 deletions poseidon/http/websocket_frame_parser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ class WebSocket_Frame_Parser
// 2
WSF_State m_wsf;
// 3
uint8_t m_msg_opcode : 4;
uint8_t m_msg_rsv3 : 1;
uint8_t m_msg_rsv2 : 1;
uint8_t m_msg_rsv1 : 1;
uint8_t m_msg_fin : 1;
uint8_t m_opcode : 4;
uint8_t m_rsv3 : 1;
uint8_t m_rsv2 : 1;
uint8_t m_rsv1 : 1;
uint8_t m_fin : 1;
};
};

Expand Down Expand Up @@ -179,23 +179,23 @@ class WebSocket_Frame_Parser
// returned.
bool
message_fin() const noexcept
{ return this->m_msg_fin; }
{ return this->m_fin; }

bool
message_rsv1() const noexcept
{ return this->m_msg_rsv1; }
{ return this->m_rsv1; }

bool
message_rsv2() const noexcept
{ return this->m_msg_rsv2; }
{ return this->m_rsv2; }

bool
message_rsv3() const noexcept
{ return this->m_msg_rsv3; }
{ return this->m_rsv3; }

uint8_t
message_opcode() const noexcept
{ return this->m_msg_opcode; }
{ return this->m_opcode; }

// Clears the current complete frame, so the parser can start the next one.
void
Expand Down
16 changes: 12 additions & 4 deletions poseidon/socket/ws_client_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,16 @@ do_on_http_upgraded_stream(linear_buffer& data, bool eof)
switch(this->m_parser.frame_header().opcode) {
case 0: // CONTINUATION
case 1: // TEXT
case 2: { // BINARY
case 2: // BINARY
{
auto opcode = static_cast<WebSocket_OpCode>(this->m_parser.message_opcode());
ROCKET_ASSERT(is_any_of(opcode, { websocket_text, websocket_binary }));
this->do_on_ws_message_finish(opcode, move(this->m_msg));
break;
}

case 8: { // CLOSE
case 8: // CLOSE
{
ROCKET_ASSERT(this->m_closure_notified == false);
this->m_closure_notified = true;
data.clear();
Expand All @@ -199,7 +201,8 @@ do_on_http_upgraded_stream(linear_buffer& data, bool eof)
return;
}

case 9: { // PING
case 9: // PING
{
POSEIDON_LOG_TRACE(("WebSocket PING from `$1`: $2"), this->remote_address(), payload);
this->do_on_ws_message_finish(websocket_ping, move(payload));

Expand All @@ -208,7 +211,8 @@ do_on_http_upgraded_stream(linear_buffer& data, bool eof)
break;
}

case 10: { // PONG
case 10: // PONG
{
POSEIDON_LOG_TRACE(("WebSocket PONG from `$1`: $2"), this->remote_address(), payload);
this->do_on_ws_message_finish(websocket_pong, move(payload));
break;
Expand Down Expand Up @@ -304,6 +308,7 @@ ws_send(WebSocket_OpCode opcode, chars_view data)
switch(opcode) {
case 1: // TEXT
case 2: // BINARY
{
// Check whether the message should be compressed. We believe that small
// (including empty) messages are not worth compressing.
if(this->m_pmce_opt) {
Expand Down Expand Up @@ -338,9 +343,11 @@ ws_send(WebSocket_OpCode opcode, chars_view data)
// Send the message uncompressed.
// FIN + opcode
return this->do_ws_send_raw_frame(0b10000000 | opcode, data);
}

case 9: // PING
case 10: // PONG
{
// Control messages can't be fragmented.
if(data.n > 125)
POSEIDON_THROW((
Expand All @@ -351,6 +358,7 @@ ws_send(WebSocket_OpCode opcode, chars_view data)
// Control messages can't be compressed, so send it as is.
// FIN + opcode
return this->do_ws_send_raw_frame(0b10000000 | opcode, data);
}

default:
POSEIDON_THROW((
Expand Down
16 changes: 12 additions & 4 deletions poseidon/socket/ws_server_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,16 @@ do_on_http_upgraded_stream(linear_buffer& data, bool eof)
switch(this->m_parser.frame_header().opcode) {
case 0: // CONTINUATION
case 1: // TEXT
case 2: { // BINARY
case 2: // BINARY
{
auto opcode = static_cast<WebSocket_OpCode>(this->m_parser.message_opcode());
ROCKET_ASSERT(is_any_of(opcode, { websocket_text, websocket_binary }));
this->do_on_ws_message_finish(opcode, move(this->m_msg));
break;
}

case 8: { // CLOSE
case 8: // CLOSE
{
ROCKET_ASSERT(this->m_closure_notified == false);
this->m_closure_notified = true;
data.clear();
Expand All @@ -238,7 +240,8 @@ do_on_http_upgraded_stream(linear_buffer& data, bool eof)
return;
}

case 9: { // PING
case 9: // PING
{
POSEIDON_LOG_TRACE(("WebSocket PING from `$1`: $2"), this->remote_address(), payload);
this->do_on_ws_message_finish(websocket_ping, move(payload));

Expand All @@ -247,7 +250,8 @@ do_on_http_upgraded_stream(linear_buffer& data, bool eof)
break;
}

case 10: { // PONG
case 10: // PONG
{
POSEIDON_LOG_TRACE(("WebSocket PONG from `$1`: $2"), this->remote_address(), payload);
this->do_on_ws_message_finish(websocket_pong, move(payload));
break;
Expand Down Expand Up @@ -338,6 +342,7 @@ ws_send(WebSocket_OpCode opcode, chars_view data)
switch(opcode) {
case 1: // TEXT
case 2: // BINARY
{
// Check whether the message should be compressed. We believe that small
// (including empty) messages are not worth compressing.
if(this->m_pmce_opt) {
Expand Down Expand Up @@ -372,9 +377,11 @@ ws_send(WebSocket_OpCode opcode, chars_view data)
// Send the message uncompressed.
// FIN + opcode
return this->do_ws_send_raw_frame(0b10000000 | opcode, data);
}

case 9: // PING
case 10: // PONG
{
// Control messages can't be fragmented.
if(data.n > 125)
POSEIDON_THROW((
Expand All @@ -385,6 +392,7 @@ ws_send(WebSocket_OpCode opcode, chars_view data)
// Control messages can't be compressed, so send it as is.
// FIN + opcode
return this->do_ws_send_raw_frame(0b10000000 | opcode, data);
}

default:
POSEIDON_THROW((
Expand Down
Loading

0 comments on commit f6d977a

Please sign in to comment.