diff --git a/lib/src/api/bucket.dart b/lib/src/api/bucket.dart index 3164dca..3d785c2 100644 --- a/lib/src/api/bucket.dart +++ b/lib/src/api/bucket.dart @@ -182,5 +182,7 @@ class BlobEventWorker implements Worker { requestStream.add(ctx.toResponse()); } } + + await channel.shutdown(); } } diff --git a/lib/src/resources/api.dart b/lib/src/resources/api.dart index 707d942..7979a1f 100644 --- a/lib/src/resources/api.dart +++ b/lib/src/resources/api.dart @@ -38,13 +38,15 @@ class Api extends Resource { var apiResource = $p.ApiResource(); for (var opt in opts.security) { - await _attach_oidc(name, opt); + await _attachOidc(name, opt); apiResource.security[opt.name] = $p.ApiScopes(scopes: opt.scopes); } await client .declare($p.ResourceDeclareRequest(id: resource, api: apiResource)); + + await channel.shutdown(); } /// A GET request [handler] that [match]es a specific route. @@ -170,7 +172,7 @@ class ApiWorker implements Worker { var options = $ap.ApiWorkerOptions(); for (var s in security) { - await _attach_oidc(route.api.name, s); + await _attachOidc(route.api.name, s); options.security[s.name] = $ap.ApiWorkerScopes(scopes: s.scopes); } @@ -189,7 +191,8 @@ class ApiWorker implements Worker { final initMsg = $ap.ClientMessage(registrationRequest: registrationRequest); // Create the request stream and send the initial message - final requestStream = StreamController<$ap.ClientMessage>(); + final requestStream = + StreamController<$ap.ClientMessage>(onCancel: () => channel.shutdown()); requestStream.add(initMsg); final response = client.serve( @@ -216,5 +219,7 @@ class ApiWorker implements Worker { requestStream.add(ctx.toResponse()); } } + + await channel.shutdown(); } } diff --git a/lib/src/resources/common.dart b/lib/src/resources/common.dart index 49e7e8a..f8ac410 100644 --- a/lib/src/resources/common.dart +++ b/lib/src/resources/common.dart @@ -51,10 +51,12 @@ abstract class Resource { @protected late final $p.ResourcesClient client; + late final ClientChannel channel; + @protected Resource(this.name, $p.ResourcesClient? client) { if (client == null) { - final channel = createClientChannelFromEnvVar(); + channel = createClientChannelFromEnvVar(); this.client = $p.ResourcesClient(channel); } else { @@ -86,5 +88,7 @@ abstract class SecureResource extends Resource { await client .declare($p.ResourceDeclareRequest(policy: policy, id: policyResource)); + + await channel.shutdown(); } } diff --git a/lib/src/resources/oidc.dart b/lib/src/resources/oidc.dart index 58ca6d2..d797ebe 100644 --- a/lib/src/resources/oidc.dart +++ b/lib/src/resources/oidc.dart @@ -11,7 +11,7 @@ class OidcOptions { typedef SecurityOption = OidcOptions Function(List scopes); -Future _attach_oidc( +Future _attachOidc( String apiName, OidcOptions options) async { var secDef = OidcSecurityDefinition(apiName, options); await secDef.register(); @@ -46,5 +46,7 @@ class OidcSecurityDefinition extends Resource { await client.declare($p.ResourceDeclareRequest( id: resource, apiSecurityDefinition: securityDefinition)); + + await channel.shutdown(); } } diff --git a/lib/src/resources/schedule.dart b/lib/src/resources/schedule.dart index 62ccd71..107b583 100644 --- a/lib/src/resources/schedule.dart +++ b/lib/src/resources/schedule.dart @@ -6,7 +6,10 @@ class Schedule extends Resource { @override Future register() async { var res = $p.ResourceIdentifier(name: name, type: $p.ResourceType.Schedule); + await client.declare($p.ResourceDeclareRequest(id: res)); + + await channel.shutdown(); } /// Run [middleware] at a certain interval defined by the [rate]. E.g. '7 days', '3 hours', '30 minutes'. @@ -16,7 +19,7 @@ class Schedule extends Resource { var worker = IntervalWorker(registrationRequest, middleware); - worker.start(); + await worker.start(); } /// Run [middleware] at a certain interval defined by the [cronExpression]. @@ -28,7 +31,7 @@ class Schedule extends Resource { var worker = IntervalWorker(registrationRequest, middleware); - worker.start(); + await worker.start(); } } @@ -74,5 +77,7 @@ class IntervalWorker implements Worker { requestStream.add(ctx.toResponse()); } } + + await channel.shutdown(); } } diff --git a/lib/src/resources/topic.dart b/lib/src/resources/topic.dart index 9f11a79..f09dc73 100644 --- a/lib/src/resources/topic.dart +++ b/lib/src/resources/topic.dart @@ -19,6 +19,8 @@ class Topic extends SecureResource { $p.ResourceDeclareRequest(id: resource, topic: $p.TopicResource())); registrationCompletion.complete(resource); + + await channel.shutdown(); } @override @@ -89,5 +91,7 @@ class SubscriptionWorker implements Worker { requestStream.add(ctx.toResponse()); } } + + await channel.shutdown(); } } diff --git a/lib/src/resources/websocket.dart b/lib/src/resources/websocket.dart index d88d138..8adac91 100644 --- a/lib/src/resources/websocket.dart +++ b/lib/src/resources/websocket.dart @@ -17,6 +17,8 @@ class Websocket extends Resource { ); await client.declare($p.ResourceDeclareRequest(id: resource)); + + await channel.shutdown(); } /// Send message [data] to a connection, referenced by its [connectionId]. @@ -103,5 +105,7 @@ class WebsocketWorker implements Worker { requestStream.add(ctx.toResponse()); } } + + await channel.shutdown(); } }