diff --git a/Monal/Classes/MLStream.m b/Monal/Classes/MLStream.m index b38cce82a..2112a087f 100644 --- a/Monal/Classes/MLStream.m +++ b/Monal/Classes/MLStream.m @@ -50,7 +50,7 @@ @interface MLInputStream() //(mutexes can not be unlocked in a thread different from the one it got locked in and NSLock internally uses mutext --> both can not be used) dispatch_semaphore_t _read_sem; } -@property (atomic, readonly) void (^incoming_data_handler)(NSData* _Nullable, BOOL, NSError* _Nullable); +@property (atomic, readonly) void (^incoming_data_handler)(NSData* _Nullable, BOOL, NSError* _Nullable, BOOL); @end @interface MLOutputStream() @@ -89,7 +89,7 @@ -(instancetype) initWithSharedState:(MLSharedStreamState*) shared //this handler will be called by the schedule_read method //since the framer swallows all data, nw_connection_receive() and the framer cannot race against each other and deliver reordered data weakify(self); - _incoming_data_handler = ^(NSData* _Nullable content, BOOL is_complete, NSError* _Nullable st_error) { + _incoming_data_handler = ^(NSData* _Nullable content, BOOL is_complete, NSError* _Nullable st_error, BOOL polling_active) { strongify(self); if(self == nil) return; @@ -142,7 +142,7 @@ -(instancetype) initWithSharedState:(MLSharedStreamState*) shared [self generateEvent:NSStreamEventEndEncountered]; //try to read again - if(!is_complete && !generate_bytes_available_event) + if(!is_complete && !generate_error_event && !generate_bytes_available_event && polling_active) [self schedule_read]; }; return self; @@ -235,7 +235,8 @@ -(void) schedule_read DDLogDebug(@"now calling nw_framer_parse_input inside framer queue"); nw_framer_parse_input(self.shared_state.framer, 1, BUFFER_SIZE, nil, ^size_t(uint8_t* buffer, size_t buffer_length, bool is_complete) { DDLogDebug(@"nw_framer_parse_input got callback with is_complete:%@, length=%zu", bool2str(is_complete), (unsigned long)buffer_length); - self.incoming_data_handler([NSData dataWithBytes:buffer length:buffer_length], is_complete, nil); + //we don't want to do "polling" here, our next nw_framer_parse_input will be triggered by the nw_framer_set_input_handler block + self.incoming_data_handler([NSData dataWithBytes:buffer length:buffer_length], is_complete, nil, NO); return buffer_length; }); }); @@ -248,7 +249,8 @@ -(void) schedule_read NSError* st_error = nil; if(receive_error) st_error = (NSError*)CFBridgingRelease(nw_error_copy_cf_error(receive_error)); - self.incoming_data_handler((NSData*)content, is_complete, st_error); + //we want to do "polling" here (e.g. start our next blocking nw_connection_receive call if we did not receive new data nor any error) + self.incoming_data_handler((NSData*)content, is_complete, st_error, YES); }); } }