Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
test(gql_websocket_link): new async task management tests
Browse files Browse the repository at this point in the history
agufagit committed Nov 4, 2024
1 parent 7e8d99a commit a6ba1cd
Showing 1 changed file with 420 additions and 1 deletion.
421 changes: 420 additions & 1 deletion links/gql_websocket_link/test/gql_websocket_link_test.dart
Original file line number Diff line number Diff line change
@@ -117,6 +117,7 @@ void _testLinks(
}) {
final dataMessageType = isApolloSubProtocol ? "data" : "next";
final startMessageType = isApolloSubProtocol ? "start" : "subscribe";
final endMessageType = isApolloSubProtocol ? "stop" : "complete";

test(
"send connection_init",
@@ -134,6 +135,13 @@ void _testLinks(
),
);

final completer = Completer<void>();
final timer = Timer(const Duration(seconds: 5), () {
if (!completer.isCompleted) {
completer.completeError("Timeout");
}
});

server = await HttpServer.bind("localhost", 0);
server.transform(WebSocketTransformer()).listen(
(webSocket) async {
@@ -144,6 +152,9 @@ void _testLinks(
final map =
json.decode(message as String) as Map<String, dynamic>;
expect(map["type"], MessageTypes.connectionInit);
if (!completer.isCompleted) {
completer.complete();
}
},
),
);
@@ -156,6 +167,9 @@ void _testLinks(
link = makeLink(null, channelGenerator: () => channel);
//
link.request(request).listen(print);

await completer.future;
timer.cancel();
},
);

@@ -176,6 +190,13 @@ void _testLinks(
),
);

final completer = Completer<void>();
final timer = Timer(const Duration(seconds: 5), () {
if (!completer.isCompleted) {
completer.completeError("Timeout");
}
});

server = await HttpServer.bind("localhost", 0);
server.transform(WebSocketTransformer()).listen(
(webSocket) async {
@@ -187,6 +208,9 @@ void _testLinks(
json.decode(message as String) as Map<String, dynamic>;
expect(map["type"], MessageTypes.connectionInit);
expect(map["payload"], initialPayload);
if (!completer.isCompleted) {
completer.complete();
}
},
),
);
@@ -204,6 +228,8 @@ void _testLinks(
);
//
link.request(request).listen(print);
await completer.future;
timer.cancel();
},
);

@@ -227,6 +253,13 @@ void _testLinks(
),
);

final completer = Completer<void>();
final timer = Timer(const Duration(seconds: 5), () {
if (!completer.isCompleted) {
completer.completeError("Timeout");
}
});

server = await HttpServer.bind("localhost", 0);
server.transform(WebSocketTransformer()).listen(
(webSocket) async {
@@ -238,6 +271,9 @@ void _testLinks(
json.decode(message as String) as Map<String, dynamic>;
expect(map["type"], MessageTypes.connectionInit);
expect(map["payload"], baseInitialPayload);
if (!completer.isCompleted) {
completer.complete();
}
},
),
);
@@ -254,6 +290,9 @@ void _testLinks(
);
//
link.request(request).listen(print);

await completer.future;
timer.cancel();
},
);

@@ -706,6 +745,370 @@ void _testLinks(
},
);

test(
"yield complete/stop message before next/data message",
() async {
HttpServer server;
WebSocket webSocket;
IOWebSocketChannel channel;
Link link;
Request request;
final responseData1 = {
"data": {
"pokemons": [
{"name": "Bulbasaur"},
{"name": "Ivysaur"},
{"name": "Venusaur"}
]
}
};

request = Request(
operation: Operation(
operationName: "pokemonsSubscription",
document: parseString(
r"subscription MySubscription { pokemons(first: $first) { name } }"),
),
variables: const <String, dynamic>{
"first": 3,
},
);

server = await HttpServer.bind("localhost", 0);
server.transform(WebSocketTransformer()).listen(
(webSocket) async {
final channel = IOWebSocketChannel(webSocket);
channel.stream.listen(
(dynamic message) async {
final map =
json.decode(message as String) as Map<String, dynamic>;
if (map["type"] == "connection_init") {
channel.sink.add(
json.encode(
ConnectionAck(),
),
);
} else if (map["type"] == startMessageType) {
// send end message to complete the operation
channel.sink.add(
json.encode(
<String, dynamic>{
"type": endMessageType,
"id": map["id"],
},
),
);
await Future<void>.delayed(Duration(seconds: 1));
// client should wait for this message to come before completing the operation
channel.sink.add(
json.encode(
<String, dynamic>{
"type": dataMessageType,
"id": map["id"],
"payload": {
"data": responseData1,
"errors": null,
},
},
),
);
}
},
);
},
);

webSocket = await WebSocket.connect("ws://localhost:${server.port}");
channel = IOWebSocketChannel(webSocket);
link = makeLink(null, channelGenerator: () => channel);
link.request(request).listen(
expectAsync1(
(response) {
expect(
response.data,
responseData1,
);
expect(response.errors, null);
expect(
response.context.entry<ResponseExtensions>()?.extensions,
null,
);
},
),
);
},
);

test(
"conneciton is kept alive when no operations are in process",
() async {
// only test for transport ws sub-protocol
if (isApolloSubProtocol) {
return;
}
HttpServer server;
WebSocket webSocket;
IOWebSocketChannel channel;
Link link;
Request request;
final responseData1 = {
"data": {
"pokemons": [
{"name": "Bulbasaur"},
{"name": "Ivysaur"},
{"name": "Venusaur"}
]
}
};

request = Request(
operation: Operation(
operationName: "pokemonsSubscription",
document: parseString(
r"subscription MySubscription { pokemons(first: $first) { name } }"),
),
variables: const <String, dynamic>{
"first": 3,
},
);

final completer = Completer<void>();
final timer = Timer(const Duration(seconds: 5), () {
if (!completer.isCompleted) {
completer.complete();
}
});

server = await HttpServer.bind("localhost", 0);
server.transform(WebSocketTransformer()).listen(
(webSocket) async {
final channel = IOWebSocketChannel(webSocket);
channel.stream.listen(
(dynamic message) async {
final map =
json.decode(message as String) as Map<String, dynamic>;
if (map["type"] == "connection_init") {
channel.sink.add(
json.encode(
ConnectionAck(),
),
);
} else if (map["type"] == startMessageType) {
channel.sink.add(
json.encode(
<String, dynamic>{
"type": dataMessageType,
"id": map["id"],
"payload": {
"data": responseData1,
"errors": null,
},
},
),
);
// send end message to complete the operation
channel.sink.add(
json.encode(
<String, dynamic>{
"type": endMessageType,
"id": map["id"],
},
),
);
}
},
);
},
);

// test case 1: The connection should be kept alive
webSocket = await WebSocket.connect("ws://localhost:${server.port}");
channel = IOWebSocketChannel(webSocket);
final cHashCode = channel.hashCode;
final wsHashCode = webSocket.hashCode;
link = makeLink(null,
channelGenerator: () => channel,
inactivityTimeout: Duration(seconds: 5));
link.request(request).listen(
expectAsync1(
(response) {
expect(
response.data,
responseData1,
);
expect(response.errors, null);
expect(
response.context.entry<ResponseExtensions>()?.extensions,
null,
);
if (!completer.isCompleted) {
completer.complete();
}
},
),
);

await completer.future;
timer.cancel();
// The connection should be kept alive
await Future.delayed(Duration(seconds: 1));
expect(webSocket.closeCode, null);
expect(channel.hashCode, cHashCode);
expect(webSocket.hashCode, wsHashCode);
await channel.sink.close(1000, "Normal Closure");

// test case 2: The connection should be closed, not keepAlive
final completer1 = Completer<void>();
final timer1 = Timer(const Duration(seconds: 5), () {
if (!completer.isCompleted) {
completer.complete();
}
});

webSocket = await WebSocket.connect("ws://localhost:${server.port}");
channel = IOWebSocketChannel(webSocket);
link = makeLink(null, channelGenerator: () => channel);
link.request(request).listen(
expectAsync1(
(response) {
print(response);
expect(
response.data,
responseData1,
);
expect(response.errors, null);
expect(
response.context.entry<ResponseExtensions>()?.extensions,
null,
);
if (!completer1.isCompleted) {
completer1.complete();
}
},
),
);

await completer1.future;
timer1.cancel();
await Future.delayed(Duration(seconds: 1));
expect(webSocket.closeCode, 1000);
},
);

test(
"operation is completed by error",
() async {
// only test for transport ws sub-protocol
if (isApolloSubProtocol) {
return;
}
HttpServer server;
WebSocket webSocket;
IOWebSocketChannel channel;
Link link;
Request request;

request = Request(
operation: Operation(
operationName: "pokemonsSubscription",
document: parseString(
r"subscription MySubscription { pokemons(first: $first) { name } }"),
),
variables: const <String, dynamic>{
"first": 3,
},
);

final completer = Completer<void>();
final timer = Timer(const Duration(seconds: 5), () {
if (!completer.isCompleted) {
completer.complete();
}
});

server = await HttpServer.bind("localhost", 0);
server.transform(WebSocketTransformer()).listen(
(webSocket) async {
final channel = IOWebSocketChannel(webSocket);
channel.stream.listen(
(dynamic message) async {
final map =
json.decode(message as String) as Map<String, dynamic>;
if (map["type"] == "connection_init") {
channel.sink.add(
json.encode(
ConnectionAck(),
),
);
} else if (map["type"] == startMessageType) {
channel.sink.add(
json.encode(
<String, dynamic>{
"type": "error",
"id": map["id"],
"payload": [
{
"message": "error message 2.1",
"locations": [
{"column": 1, "line": 2}
]
},
{
"message": "error message 2.2",
}
],
},
),
);
}
},
);
},
);

webSocket = await WebSocket.connect("ws://localhost:${server.port}");
channel = IOWebSocketChannel(webSocket);
final cHashCode = channel.hashCode;
final wsHashCode = webSocket.hashCode;
link = makeLink(null,
channelGenerator: () => channel,
inactivityTimeout: Duration(seconds: 5));
link.request(request).listen(
(event) {},
onError: (Object err, StackTrace stack) {
final errors = err as List<GraphQLError>;
expect(errors.length, 2);
expect(
errors.first.message,
"error message 2.1",
);
expect(
errors.first.locations!
.map((e) => {"column": e.column, "line": e.line}),
[
{"column": 1, "line": 2}
],
);
expect(
errors.last.message,
"error message 2.2",
);
if (!completer.isCompleted) {
completer.complete();
}
},
);

await completer.future;
timer.cancel();
// The connection should be kept alive
expect(webSocket.closeCode, null);
expect(channel.hashCode, cHashCode);
expect(webSocket.hashCode, wsHashCode);
await channel.sink.close(1000, "Normal Closure");
},
);

test("throw WebSocketLinkParserException when unable to parse response",
() async {
HttpServer server;
@@ -861,6 +1264,13 @@ void _testLinks(
),
);

final completer = Completer<void>();
final timer = Timer(const Duration(seconds: 5), () {
if (!completer.isCompleted) {
completer.completeError("Timeout");
}
});

server = await HttpServer.bind("localhost", 0);
server.transform(WebSocketTransformer()).listen(
(webSocket) async {
@@ -898,7 +1308,13 @@ void _testLinks(
}
messageCount++;
},
() => messageCount == 3,
() {
final isDone = messageCount == 3;
if (isDone && !completer.isCompleted) {
completer.complete();
}
return isDone;
},
),
);
},
@@ -908,6 +1324,9 @@ void _testLinks(
channel = IOWebSocketChannel(webSocket);
link = makeLink(null, channelGenerator: () => channel);
responseSub = link.request(request).listen(print);

await completer.future;
timer.cancel();
});

test(

0 comments on commit a6ba1cd

Please sign in to comment.