Skip to content

Commit

Permalink
adds robust examples
Browse files Browse the repository at this point in the history
  • Loading branch information
ericwindmill committed Jan 2, 2024
1 parent b18904a commit 7fbb199
Show file tree
Hide file tree
Showing 11 changed files with 1,127 additions and 262 deletions.

This file was deleted.

This file was deleted.

10 changes: 10 additions & 0 deletions examples/concurrency/lib/basic_ports_example/complete.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@ void main() async {
await worker.parseJson('{"key":"value"}');
}

// #docregion handleResponses parseJson
class Worker {
late SendPort _sendPort;
final Completer<void> _isolateReady = Completer.sync();
// #enddocregion handleResponses parseJson

// #docregion spawn
Future<void> spawn() async {
final receivePort = ReceivePort();
receivePort.listen(_handleResponsesFromIsolate);
await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}
// #enddocregion spawn

// #docregion handleResponses
void _handleResponsesFromIsolate(dynamic message) {
if (message is SendPort) {
_sendPort = message;
Expand All @@ -26,7 +31,9 @@ class Worker {
print(message);
}
}
// #enddocregion handleResponses

// #docregion startRemoteIsolate
static void _startRemoteIsolate(SendPort port) {
final receivePort = ReceivePort();
port.send(receivePort.sendPort);
Expand All @@ -38,9 +45,12 @@ class Worker {
}
});
}
// #enddocregion startRemoteIsolate

// #docregion parseJson
Future<void> parseJson(String message) async {
await _isolateReady.future;
_sendPort.send(message);
}
// #enddocregion parseJson
}
110 changes: 110 additions & 0 deletions examples/concurrency/lib/robust_ports_example/complete.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
final worker = await Worker.spawn();
print(await worker.parseJson('{"key":"value"}'));
print(await worker.parseJson('"banana"'));
print(await worker.parseJson('[true, false, null, 1, "string"]'));
print(
await Future.wait([worker.parseJson('"yes"'), worker.parseJson('"no"')]));
worker.close();
}

// #docregion constructor
class Worker {
final SendPort _commands;
final ReceivePort _responses;
// #enddocregion constructor
final Map<int, Completer<Object?>> _activeRequests = {};
int _idCounter = 0;
bool _closed = false;

Future<Object?> parseJson(String message) async {
if (_closed) throw StateError('Closed');
final completer = Completer<Object?>.sync();
final id = _idCounter++;
_activeRequests[id] = completer;
_commands.send((id, message));
return await completer.future;
}

static Future<Worker> spawn() async {
// Create a receive port and add it's initial message handler
final initPort = RawReceivePort();
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
final commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};

// Spawn the isolate
final Isolate isolate;
try {
isolate = await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
} on Object {
initPort.close();
rethrow;
}

final (ReceivePort receivePort, SendPort sendPort) =
await connection.future;

return Worker._(receivePort, sendPort);
}

Worker._(this._responses, this._commands) {
_responses.listen(_handleResponsesFromIsolate);
}

void _handleResponsesFromIsolate(dynamic message) {
final (int id, Object? response) = message as (int, Object?);
final completer = _activeRequests.remove(id)!;

if (response is RemoteError) {
completer.completeError(response);
} else {
completer.complete(response);
}

if (_closed && _activeRequests.isEmpty) _responses.close();
}

static void _handleCommandsToIsolate(
ReceivePort receivePort,
SendPort sendPort,
) {
receivePort.listen((message) {
if (message == 'shutdown') {
receivePort.close();
return;
}
final (int id, String jsonText) = message as (int, String);
try {
final jsonData = jsonDecode(jsonText);
sendPort.send((id, jsonData));
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), '')));
}
});
}

static void _startRemoteIsolate(SendPort sendPort) {
final receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
_handleCommandsToIsolate(receivePort, sendPort);
}

void close() {
if (!_closed) {
_closed = true;
_commands.send("shutdown");
if (_activeRequests.isEmpty) _responses.close();
print('--- port closed --- ');
}
}
}
Loading

0 comments on commit 7fbb199

Please sign in to comment.