Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build(gql_websocket_link): upgrade web_socket_channel to v3.0.1, rxdart to >=0.26.0 <= 0.28.0 #475

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion links/gql_websocket_link/lib/src/graphql_transport_ws.dart
Original file line number Diff line number Diff line change
Expand Up @@ -551,10 +551,12 @@ class TransportWsClientOptions {
class _Connected {
final WebSocketChannel socket;
final Future<void> throwOnClose;
final Future<LikeCloseEvent?> likeCloseEvent;

_Connected(
this.socket,
this.throwOnClose,
this.likeCloseEvent,
);
}

Expand Down Expand Up @@ -879,9 +881,26 @@ class _ConnectionState {
retries = 0; // reset the retries on connect
final _completer = Completer<void>();
errorOrClosed(_completer.completeError);
// workground for dart linux bug: complete error not being caught by try..catch block
final _likeCloseEventCompleter = Completer<LikeCloseEvent?>();
connected(_Connected(
socket,
_completer.future,
_completer.future.catchError((err) {
// workground for dart linux bug: complete error not being caught by try..catch block
// if the connection is closed, the error is not fatal
if (err is LikeCloseEvent) {
if (!_likeCloseEventCompleter.isCompleted) {
_likeCloseEventCompleter.complete(err);
}
return;
}
throw err;
}).whenComplete(() {
if (!_likeCloseEventCompleter.isCompleted) {
_likeCloseEventCompleter.complete(null);
}
}),
_likeCloseEventCompleter.future,
));
} catch (err) {
// stop reading messages as soon as reading breaks once
Expand Down Expand Up @@ -930,6 +949,7 @@ class _ConnectionState {

final socket = _connection.socket;
final throwOnClose = _connection.throwOnClose;
final likeCloseEvent = _connection.likeCloseEvent;

// if the provided socket is in a closing state, wait for the throw on close
// TODO: WebSocketChannel should have a `state` getter
Expand Down Expand Up @@ -972,6 +992,7 @@ class _ConnectionState {
// or
throwOnClose,
]),
waitForLikeCloseEvent: likeCloseEvent,
);
}
}
Expand Down Expand Up @@ -1013,6 +1034,7 @@ class _Client extends TransportWsClient {
final socket = _c.socket;
final release = _c.release;
final waitForReleaseOrThrowOnClose = _c.waitForReleaseOrThrowOnClose;
final waitForLikeCloseEvent = _c.waitForLikeCloseEvent;
// print("isolate debug name: ${Isolate.current.debugName}");
// print(payload.operation.toString());
// print(payload.variables.toString());
Expand Down Expand Up @@ -1072,6 +1094,12 @@ class _Client extends TransportWsClient {
// whatever happens though, we want to stop listening for messages
await waitForReleaseOrThrowOnClose.whenComplete(unlisten);

// workground for dart linux bug: complete error not being caught by try..catch block
final likeCloseEvent = await waitForLikeCloseEvent;
if (likeCloseEvent != null) {
throw likeCloseEvent;
}

return; // completed, shouldnt try again
} catch (errOrCloseEvent) {
if (!state.shouldRetryConnectOrThrow(errOrCloseEvent)) return;
Expand Down Expand Up @@ -1123,11 +1151,13 @@ class _Connection {
final WebSocketChannel socket;
final Completer<void> release;
final Future<void> waitForReleaseOrThrowOnClose;
final Future<LikeCloseEvent?> waitForLikeCloseEvent;

_Connection({
required this.socket,
required this.release,
required this.waitForReleaseOrThrowOnClose,
required this.waitForLikeCloseEvent,
});
}

Expand Down
4 changes: 2 additions & 2 deletions links/gql_websocket_link/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ dependencies:
gql_exec: ^1.0.0
gql_link: ^1.0.0
meta: ^1.3.0
rxdart: '>=0.26.0 <0.28.0'
rxdart: '>=0.26.0 <=0.28.0'
uuid: '>=3.0.0 <5.0.0'
web_socket_channel: ^2.0.0
web_socket_channel: ^3.0.1
dev_dependencies:
gql_pedantic: ^1.0.2
mockito: ^5.0.0
Expand Down
24 changes: 22 additions & 2 deletions links/gql_websocket_link/test/gql_websocket_link_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,6 @@ void _testLinks(
link.request(request).listen(
expectAsync1(
(response) {
print(response);
expect(
response.data,
responseData1,
Expand Down Expand Up @@ -1239,9 +1238,16 @@ void _testLinks(
server.transform(WebSocketTransformer());

webSocket = await WebSocket.connect("ws://localhost:${server.port}");
channel = IOWebSocketChannel(webSocket);
channel.stream.asBroadcastStream().listen(
null,
onError: (Object err) {
print(err);
},
onDone: () => print("done"),
);
// Close the socket to cause network error.
await webSocket.close();
channel = IOWebSocketChannel(webSocket);
link = makeLink(null, channelGenerator: () => channel);
expect(
link.request(request).first,
Expand Down Expand Up @@ -1466,7 +1472,15 @@ void _testLinks(
),
);

final completer = Completer<void>();
final timer = Timer(const Duration(seconds: 5), () {
if (!completer.isCompleted) {
completer.completeError("Timeout");
}
});

server = await HttpServer.bind("localhost", 0);
var connectCount = 0;
server.transform(WebSocketTransformer()).take(2).listen(
expectAsync1(
(webSocket) async {
Expand All @@ -1485,6 +1499,10 @@ void _testLinks(
),
);
webSocket.close(websocket_status.goingAway);
connectCount++;
if (connectCount == 2) {
completer.complete();
}
}
messageCount++;
},
Expand All @@ -1509,6 +1527,8 @@ void _testLinks(
);
//
link.request(request).listen(print, onError: print);
await completer.future;
timer.cancel();
},
);

Expand Down
Loading