Skip to content

Commit

Permalink
Shedule network events in dedicated runloop to not block main thread
Browse files Browse the repository at this point in the history
  • Loading branch information
tmolitor-stud-tu committed Jan 27, 2024
1 parent 7bfb860 commit 7663e02
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 90 deletions.
5 changes: 5 additions & 0 deletions Monal/Classes/HelperTools.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ typedef NS_ENUM(NSUInteger, MLVersionType) {
MLVersionTypeLog,
};

typedef NS_ENUM(NSUInteger, MLRunLoopIdentifier) {
MLRunLoopIdentifierNetwork,
};

void logException(NSException* exception);
void swizzle(Class c, SEL orig, SEL new);

Expand Down Expand Up @@ -69,6 +73,7 @@ void swizzle(Class c, SEL orig, SEL new);
+(MLXMLNode* _Nullable) candidate2xml:(NSString*) candidate withMid:(NSString*) mid pwd:(NSString* _Nullable) pwd ufrag:(NSString* _Nullable) ufrag andInitiator:(BOOL) initiator;
+(NSString* _Nullable) xml2candidate:(MLXMLNode*) xml withInitiator:(BOOL) initiator;

+(NSRunLoop*) getExtraRunloopWithIdentifier:(MLRunLoopIdentifier) identifier;
+(NSError* _Nullable) hardLinkOrCopyFile:(NSString*) from to:(NSString*) to;
+(NSString*) getQueueThreadLabelFor:(DDLogMessage*) logMessage;
+(BOOL) shouldProvideVoip;
Expand Down
41 changes: 41 additions & 0 deletions Monal/Classes/HelperTools.m
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,47 @@ +(NSString*) getSelectedPushServerBasedOnLocale
];
}

+(NSRunLoop*) getExtraRunloopWithIdentifier:(MLRunLoopIdentifier) identifier
{
static NSMutableDictionary* runloops = nil;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
runloops = [NSMutableDictionary new];
});

//every identifier has its own thread priority/qos class
__block dispatch_queue_priority_t priority;
switch(identifier)
{
case MLRunLoopIdentifierNetwork: priority = DISPATCH_QUEUE_PRIORITY_BACKGROUND; break;
default: unreachable(@"unknown runloop identifier!");
}

@synchronized(runloops) {
if(runloops[@(identifier)] == nil)
{
NSCondition* condition = [NSCondition new];
[condition lock];
dispatch_async(dispatch_get_global_queue(priority, 0), ^{
//we don't need an @synchronized block around this because the @synchronized block of the outer thread
//waits until we signal our condition (e.g. no other thread can race with us)
NSRunLoop* localLoop = runloops[@(identifier)] = [NSRunLoop currentRunLoop];
[condition lock];
[condition signal];
[condition unlock];
while(YES)
{
[localLoop run];
usleep(10000); //sleep 10ms if we ever return from our runloop to not consume too much cpu
}
});
[condition wait];
[condition unlock];
}
return runloops[@(identifier)];
}
}

+(NSError* _Nullable) hardLinkOrCopyFile:(NSString*) from to:(NSString*) to
{
NSError* error = nil;
Expand Down
179 changes: 91 additions & 88 deletions Monal/Classes/MLPipe.m
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
//

#import "MLPipe.h"
#import "HelperTools.h"

#define kPipeBufferSize 4096
static uint8_t _staticOutputBuffer[kPipeBufferSize+1]; //+1 for '\0' needed for logging the received raw bytes

@interface MLPipe()
{
//buffer for writes to the output stream that can not be completed
uint8_t * _outputBuffer;
uint8_t* _outputBuffer;
size_t _outputBufferByteCount;
}

Expand All @@ -31,7 +33,7 @@ -(id) initWithInputStream:(NSInputStream*) inputStream andOuterDelegate:(id<NSSt
_delegate = outerDelegate;
_outputBufferByteCount = 0;
[_input setDelegate:self];
[_input scheduleInRunLoop:[NSRunLoop mainRunLoop] forMode:NSDefaultRunLoopMode];
[_input scheduleInRunLoop:[HelperTools getExtraRunloopWithIdentifier:MLRunLoopIdentifierNetwork] forMode:NSDefaultRunLoopMode];
return self;
}

Expand All @@ -55,15 +57,15 @@ -(void) close
{
DDLogInfo(@"Closing pipe: input end");
[_input setDelegate:nil];
[_input removeFromRunLoop:[NSRunLoop mainRunLoop] forMode:NSDefaultRunLoopMode];
[_input removeFromRunLoop:[HelperTools getExtraRunloopWithIdentifier:MLRunLoopIdentifierNetwork] forMode:NSDefaultRunLoopMode];
[_input close];
_input = nil;
}
if(_output)
{
DDLogInfo(@"Closing pipe: output end");
[_output setDelegate:nil];
[_output removeFromRunLoop:[NSRunLoop mainRunLoop] forMode:NSDefaultRunLoopMode];
[_output removeFromRunLoop:[HelperTools getExtraRunloopWithIdentifier:MLRunLoopIdentifierNetwork] forMode:NSDefaultRunLoopMode];
[_output close];
_output = nil;
}
Expand All @@ -84,7 +86,7 @@ -(NSInputStream*) getNewOutputStream
{
DDLogInfo(@"Pipe making output stream orphan");
[_output setDelegate:nil];
[_output removeFromRunLoop:[NSRunLoop mainRunLoop] forMode:NSDefaultRunLoopMode];
[_output removeFromRunLoop:[HelperTools getExtraRunloopWithIdentifier:MLRunLoopIdentifierNetwork] forMode:NSDefaultRunLoopMode];
[_output close];
_output = nil;
}
Expand All @@ -98,7 +100,7 @@ -(NSInputStream*) getNewOutputStream
NSInputStream* inputStream = (__bridge_transfer NSInputStream *)readStream;
_output = (__bridge_transfer NSOutputStream *)writeStream;
[_output setDelegate:self];
[_output scheduleInRunLoop:[NSRunLoop mainRunLoop] forMode:NSDefaultRunLoopMode];
[_output scheduleInRunLoop:[HelperTools getExtraRunloopWithIdentifier:MLRunLoopIdentifierNetwork] forMode:NSDefaultRunLoopMode];
[_output open];
[inputStream open];
return inputStream;
Expand All @@ -113,125 +115,126 @@ -(NSNumber*) drainInputStreamAndCloseOutputStream
{
DDLogInfo(@"Pipe making output stream orphan");
[_output setDelegate:nil];
[_output removeFromRunLoop:[NSRunLoop mainRunLoop] forMode:NSDefaultRunLoopMode];
[_output removeFromRunLoop:[HelperTools getExtraRunloopWithIdentifier:MLRunLoopIdentifierNetwork] forMode:NSDefaultRunLoopMode];
[_output close];
_output = nil;
}
[self cleanupOutputBuffer];

NSInteger drainedBytes = 0;
uint8_t* buf = malloc(kPipeBufferSize+1);
NSInteger len = 0;
do
{
if(![_input hasBytesAvailable])
break;
len = [_input read:buf maxLength:kPipeBufferSize];
DDLogVerbose(@"Pipe drained %ld bytes", (long)len);
if(len>0) {
//read bytes but don't increment _outputBufferByteCount (e.g. ignore these bytes)
len = [_input read:_staticOutputBuffer maxLength:kPipeBufferSize];
DDLogDebug(@"Pipe drained %ld bytes", (long)len);
if(len > 0)
{
drainedBytes += len;
buf[len] = '\0'; //null termination for log output of raw string
DDLogVerbose(@"Pipe got raw drained string '%s'", buf);
_staticOutputBuffer[len] = '\0'; //null termination for log output of raw string
DDLogDebug(@"Pipe got raw drained string '%s'", _staticOutputBuffer);
}
} while(len>0 && [_input hasBytesAvailable]);
free(buf);
DDLogVerbose(@"Pipe done draining %ld bytes", (long)drainedBytes);
} while(len > 0 && [_input hasBytesAvailable]);
DDLogDebug(@"Pipe done draining %ld bytes", (long)drainedBytes);
return @(drainedBytes);
}
}

-(void) cleanupOutputBuffer
{
@synchronized(self) {
if(_outputBuffer)
{
DDLogVerbose(@"Pipe throwing away data in output buffer: %ld bytes", (long)_outputBufferByteCount);
free(_outputBuffer);
}
_outputBuffer = nil;
if(_outputBufferByteCount > 0)
DDLogDebug(@"Pipe throwing away data in output buffer: %ld bytes", (long)_outputBufferByteCount);
_outputBuffer = _staticOutputBuffer;
_outputBufferByteCount = 0;
}
}

-(void) process
{
//only start processing if piping is possible
if(!_output || ![_output hasSpaceAvailable])
{
DDLogVerbose(@"not starting pipe processing, _output = %@", _output);
return;
}

//DDLogVerbose(@"starting pipe processing");

//try to send remaining buffered data first
if(_outputBufferByteCount > 0)
{
NSInteger writtenLen = [_output write:_outputBuffer maxLength:_outputBufferByteCount];
if(writtenLen > 0)
@synchronized(self) {
//only start processing if piping is possible
if(!_output)
{
if((NSUInteger) writtenLen != _outputBufferByteCount) //some bytes remaining to send
DDLogDebug(@"not starting pipe processing: no output stream available");
return;
}
if(![_output hasSpaceAvailable])
{
DDLogDebug(@"not starting pipe processing: no space to write");
return;
}

//DDLogVerbose(@"starting pipe processing");

//try to send remaining buffered data first
if(_outputBufferByteCount > 0)
{
DDLogDebug(@"trying to send buffered data: %lu bytes", (unsigned long)_outputBufferByteCount);
NSInteger writtenLen = [_output write:_outputBuffer maxLength:_outputBufferByteCount];
if(writtenLen > 0)
{
memmove(_outputBuffer, _outputBuffer+(size_t)writtenLen, _outputBufferByteCount-(size_t)writtenLen);
_outputBufferByteCount -= writtenLen;
DDLogVerbose(@"pipe processing sent part of buffered data");
return;
if((NSUInteger) writtenLen != _outputBufferByteCount) //some bytes remaining to send
{
_outputBuffer += writtenLen;
_outputBufferByteCount -= writtenLen;
DDLogDebug(@"pipe processing sent part of buffered data: %ld", (long)writtenLen);
return;
}
else
{
//reset empty buffer
_outputBuffer = _staticOutputBuffer;
_outputBufferByteCount = 0; //everything sent
DDLogDebug(@"pipe processing sent all remaining buffered data");
}
}
else
{
//dealloc empty buffer
free(_outputBuffer);
_outputBuffer = nil;
_outputBufferByteCount = 0; //everything sent
DDLogVerbose(@"pipe processing sent all buffered data");
NSError* error = [_output streamError];
DDLogError(@"pipe sending failed with error %ld domain %@ message %@", (long)error.code, error.domain, error.userInfo);
return;
}
}
else

//return here if we have nothing to read
if(![_input hasBytesAvailable])
{
NSError* error = [_output streamError];
DDLogError(@"pipe sending failed with error %ld domain %@ message %@", (long)error.code, error.domain, error.userInfo);
DDLogDebug(@"stopped pipe processing: nothing to read");
return;
}
}

//return here if we have nothing to read
if(![_input hasBytesAvailable])
{
//DDLogVerbose(@"stopped pipe processing: nothing to read");
return;
}

uint8_t* buf = malloc(kPipeBufferSize+1);
NSInteger readLen = 0;
NSInteger writtenLen = 0;
do
{
readLen = [_input read:buf maxLength:kPipeBufferSize];
DDLogVerbose(@"pipe read %ld bytes", (long)readLen);
if(readLen>0) {
buf[readLen] = '\0'; //null termination for log output of raw string
DDLogVerbose(@"RECV: %s", buf);
writtenLen = [_output write:buf maxLength:readLen];
if(writtenLen == -1)
{
NSError* error = [_output streamError];
DDLogError(@"pipe sending failed with error %ld domain %@ message %@", (long)error.code, error.domain, error.userInfo);
break;
}
else if(writtenLen < readLen)

NSInteger readLen = 0;
NSInteger writtenLen = 0;
do {
readLen = [_input read:_outputBuffer maxLength:kPipeBufferSize];
if(readLen > 0)
{
DDLogVerbose(@"pipe could only write %ld of %ld bytes, buffering", (long)writtenLen, (long)readLen);
//allocate new _outputBuffer
_outputBuffer = malloc(sizeof(uint8_t) * (readLen-writtenLen));
//copy the remaining data into the buffer and set the buffer pointer accordingly
memcpy(_outputBuffer, buf+(size_t)writtenLen, (size_t)(readLen-writtenLen));
_outputBufferByteCount = (size_t)(readLen-writtenLen);
break;
_outputBuffer[readLen] = '\0'; //null termination for log output of raw string
DDLogVerbose(@"RECV(%ld): %s", (long)readLen, _outputBuffer);
writtenLen = [_output write:_outputBuffer maxLength:readLen];
if(writtenLen == -1)
{
NSError* error = [_output streamError];
DDLogError(@"pipe sending failed with error %ld domain %@ message %@", (long)error.code, error.domain, error.userInfo);
break;
}
else if(writtenLen < readLen)
{
DDLogDebug(@"pipe could only write %ld of %ld bytes, buffering", (long)writtenLen, (long)readLen);
//set the buffer pointer to the remaining data and leave our copy loop
_outputBuffer += (size_t)writtenLen;
_outputBufferByteCount = (size_t)(readLen-writtenLen);
break;
}
}
}
} while(readLen>0 && [_input hasBytesAvailable] && [_output hasSpaceAvailable]);
free(buf);
//DDLogVerbose(@"pipe processing done");
else
DDLogDebug(@"pipe read %ld <= 0 bytes", (long)readLen);
} while(readLen > 0 && [_input hasBytesAvailable] && [_output hasSpaceAvailable]);
//DDLogVerbose(@"pipe processing done");
}
}

-(void) stream:(NSStream*) stream handleEvent:(NSStreamEvent) eventCode
Expand All @@ -247,7 +250,7 @@ -(void) stream:(NSStream*) stream handleEvent:(NSStreamEvent) eventCode
//only log open and none events
case NSStreamEventOpenCompleted:
{
DDLogVerbose(@"Pipe stream %@ completed open", stream);
DDLogDebug(@"Pipe stream %@ completed open", stream);
break;
}

Expand Down
4 changes: 2 additions & 2 deletions Monal/Classes/xmpp.m
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ -(void) createStreams
//open sockets and start connecting (including TLS handshake if isDirectTLS==YES)
DDLogInfo(@"opening TCP streams");
[_oStream setDelegate:self];
[_oStream scheduleInRunLoop:[NSRunLoop mainRunLoop] forMode:NSDefaultRunLoopMode];
[_oStream scheduleInRunLoop:[HelperTools getExtraRunloopWithIdentifier:MLRunLoopIdentifierNetwork] forMode:NSDefaultRunLoopMode];
_iPipe = [[MLPipe alloc] initWithInputStream:localIStream andOuterDelegate:self];
[localIStream open];
[_oStream open];
Expand Down Expand Up @@ -1160,7 +1160,7 @@ -(void) closeSocket
[self cleanupSendQueue];

//remove from runloop *after* cleaning up sendQueue (maybe this fixes a rare crash)
[self->_oStream removeFromRunLoop:[NSRunLoop mainRunLoop] forMode:NSDefaultRunLoopMode];
[self->_oStream removeFromRunLoop:[HelperTools getExtraRunloopWithIdentifier:MLRunLoopIdentifierNetwork] forMode:NSDefaultRunLoopMode];

DDLogInfo(@"resetting internal stream state to disconnected");
self->_startTLSComplete = NO;
Expand Down

0 comments on commit 7663e02

Please sign in to comment.