Skip to content

Commit

Permalink
Fix network framer handling
Browse files Browse the repository at this point in the history
This should fix those big delays
  • Loading branch information
tmolitor-stud-tu committed Jan 27, 2024
1 parent 7663e02 commit 4b3f872
Showing 1 changed file with 41 additions and 28 deletions.
69 changes: 41 additions & 28 deletions Monal/Classes/MLStream.m
Original file line number Diff line number Diff line change
Expand Up @@ -183,38 +183,54 @@ -(NSStreamStatus) streamStatus

-(void) schedule_read
{
if(self.closed || !self.open_called || !self.shared_state.open)
{
DDLogVerbose(@"ignoring schedule_read call because connection is closed: %@", self);
return;
}

//don't call nw_connection_receive() while our framer is active, this will result in multiple calls in parallel
//(one call for every read: call) and cause reordering and truncation problems
@synchronized(self.shared_state) {
if(self.shared_state.framer != nil)
if(self.closed || !self.open_called || !self.shared_state.open)
{
DDLogVerbose(@"Framer active, not scheduling nw_connection_receive() read...");
DDLogVerbose(@"ignoring schedule_read call because connection is closed: %@", self);
return;
}

//don't call nw_connection_receive() twice: this will introduce race conditions
if(_reading)
{
DDLogWarn(@"Not calling nw_connection_receive: already reading!");
//TODO: does this ever happen??
unreachable(@"Not calling nw_connection_receive: already reading!");
}
_reading = YES;

if(self.shared_state.framer != nil)
{
DDLogVerbose(@"dispatching async call to nw_framer_parse_input into framer queue");
nw_framer_async(self.shared_state.framer, ^{
DDLogVerbose(@"now calling nw_framer_parse_input inside framer queue");
nw_framer_parse_input(self.shared_state.framer, 1, BUFFER_SIZE, nil, ^size_t(uint8_t* buffer, size_t buffer_length, bool is_complete) {
DDLogVerbose(@"nw_framer_parse_input got callback with is_complete:%@, length=%zu", bool2str(is_complete), (unsigned long)buffer_length);
self.incoming_data_handler([NSData dataWithBytes:buffer length:buffer_length], is_complete, nil);
return buffer_length;
});
});
}
else
{
DDLogVerbose(@"calling nw_connection_receive");
nw_connection_receive(self.shared_state.connection, 1, BUFFER_SIZE, ^(dispatch_data_t content, nw_content_context_t context __unused, bool is_complete, nw_error_t receive_error) {
DDLogVerbose(@"nw_connection_receive got callback with is_complete:%@, receive_error=%@, length=%zu", bool2str(is_complete), receive_error, (unsigned long)((NSData*)content).length);
NSError* st_error = nil;
if(receive_error)
st_error = (NSError*)CFBridgingRelease(nw_error_copy_cf_error(receive_error));
self.incoming_data_handler((NSData*)content, is_complete, st_error);
});
}
}

_reading = YES;
DDLogVerbose(@"calling nw_connection_receive");
nw_connection_receive(self.shared_state.connection, 1, BUFFER_SIZE, ^(dispatch_data_t content, nw_content_context_t context __unused, bool is_complete, nw_error_t receive_error) {
DDLogVerbose(@"nw_connection_receive got callback with is_complete:%@, receive_error=%@, length=%zu", bool2str(is_complete), receive_error, (unsigned long)((NSData*)content).length);
NSError* st_error = nil;
if(receive_error)
st_error = (NSError*)CFBridgingRelease(nw_error_copy_cf_error(receive_error));
self.incoming_data_handler((NSData*)content, is_complete, st_error);
});
}

-(void) generateEvent:(NSStreamEvent) event
{
@synchronized(self.shared_state) {
[super generateEvent:event];
if(event == NSStreamEventOpenCompleted && self.open_called && self.shared_state.open)
//don't call schedule_read if a framer is active: the framer will call it itself
if(event == NSStreamEventOpenCompleted && self.open_called && self.shared_state.open && self.shared_state.framer == nil)
[self schedule_read];
}
}
Expand Down Expand Up @@ -463,13 +479,10 @@ +(void) connectWithSNIDomain:(NSString*) SNIDomain connectHost:(NSString*) host
}

nw_framer_set_input_handler(framer, ^size_t(nw_framer_t framer) {
nw_framer_parse_input(framer, 1, BUFFER_SIZE, nil, ^size_t(uint8_t* buffer, size_t buffer_length, bool is_complete) {
//pass data on to our input stream providing the NSStream api
input.incoming_data_handler([NSData dataWithBytes:buffer length:buffer_length], is_complete, nil);
return buffer_length;
});
return 0; //why that?
[input schedule_read];
return 0; //why that??
});

shared_state.framer = framer;
return nw_framer_start_result_will_mark_ready;
});
Expand All @@ -482,7 +495,7 @@ +(void) connectWithSNIDomain:(NSString*) SNIDomain connectHost:(NSString*) host
//create and configure connection object
nw_endpoint_t endpoint = nw_endpoint_create_host([host cStringUsingEncoding:NSUTF8StringEncoding], [[port stringValue] cStringUsingEncoding:NSUTF8StringEncoding]);
nw_connection_t connection = nw_connection_create(endpoint, parameters);
nw_connection_set_queue(connection, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0));
nw_connection_set_queue(connection, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0));

//configure shared state
shared_state.connection = connection;
Expand Down

0 comments on commit 4b3f872

Please sign in to comment.