Skip to content

Commit

Permalink
add channel close after declaration
Browse files Browse the repository at this point in the history
  • Loading branch information
HomelessDinosaur committed Mar 6, 2024
1 parent 9586189 commit 41c0930
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 7 deletions.
2 changes: 2 additions & 0 deletions lib/src/api/bucket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -182,5 +182,7 @@ class BlobEventWorker implements Worker {
requestStream.add(ctx.toResponse());
}
}

await channel.shutdown();
}
}
11 changes: 8 additions & 3 deletions lib/src/resources/api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ class Api extends Resource {
var apiResource = $p.ApiResource();

for (var opt in opts.security) {
await _attach_oidc(name, opt);
await _attachOidc(name, opt);

apiResource.security[opt.name] = $p.ApiScopes(scopes: opt.scopes);
}

await client
.declare($p.ResourceDeclareRequest(id: resource, api: apiResource));

await channel.shutdown();
}

/// A GET request [handler] that [match]es a specific route.
Expand Down Expand Up @@ -170,7 +172,7 @@ class ApiWorker implements Worker {

var options = $ap.ApiWorkerOptions();
for (var s in security) {
await _attach_oidc(route.api.name, s);
await _attachOidc(route.api.name, s);

options.security[s.name] = $ap.ApiWorkerScopes(scopes: s.scopes);
}
Expand All @@ -189,7 +191,8 @@ class ApiWorker implements Worker {
final initMsg = $ap.ClientMessage(registrationRequest: registrationRequest);

// Create the request stream and send the initial message
final requestStream = StreamController<$ap.ClientMessage>();
final requestStream =
StreamController<$ap.ClientMessage>(onCancel: () => channel.shutdown());
requestStream.add(initMsg);

final response = client.serve(
Expand All @@ -216,5 +219,7 @@ class ApiWorker implements Worker {
requestStream.add(ctx.toResponse());
}
}

await channel.shutdown();
}
}
6 changes: 5 additions & 1 deletion lib/src/resources/common.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ abstract class Resource {
@protected
late final $p.ResourcesClient client;

late final ClientChannel channel;

@protected
Resource(this.name, $p.ResourcesClient? client) {
if (client == null) {
final channel = createClientChannelFromEnvVar();
channel = createClientChannelFromEnvVar();

this.client = $p.ResourcesClient(channel);
} else {
Expand Down Expand Up @@ -86,5 +88,7 @@ abstract class SecureResource<T extends Enum> extends Resource {

await client
.declare($p.ResourceDeclareRequest(policy: policy, id: policyResource));

await channel.shutdown();
}
}
4 changes: 3 additions & 1 deletion lib/src/resources/oidc.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class OidcOptions {

typedef SecurityOption = OidcOptions Function(List<String> scopes);

Future<OidcSecurityDefinition> _attach_oidc(
Future<OidcSecurityDefinition> _attachOidc(
String apiName, OidcOptions options) async {
var secDef = OidcSecurityDefinition(apiName, options);
await secDef.register();
Expand Down Expand Up @@ -46,5 +46,7 @@ class OidcSecurityDefinition extends Resource {

await client.declare($p.ResourceDeclareRequest(
id: resource, apiSecurityDefinition: securityDefinition));

await channel.shutdown();
}
}
9 changes: 7 additions & 2 deletions lib/src/resources/schedule.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ class Schedule extends Resource {
@override
Future<void> register() async {
var res = $p.ResourceIdentifier(name: name, type: $p.ResourceType.Schedule);

await client.declare($p.ResourceDeclareRequest(id: res));

await channel.shutdown();
}

/// Run [middleware] at a certain interval defined by the [rate]. E.g. '7 days', '3 hours', '30 minutes'.
Expand All @@ -16,7 +19,7 @@ class Schedule extends Resource {

var worker = IntervalWorker(registrationRequest, middleware);

worker.start();
await worker.start();
}

/// Run [middleware] at a certain interval defined by the [cronExpression].
Expand All @@ -28,7 +31,7 @@ class Schedule extends Resource {

var worker = IntervalWorker(registrationRequest, middleware);

worker.start();
await worker.start();
}
}

Expand Down Expand Up @@ -74,5 +77,7 @@ class IntervalWorker implements Worker {
requestStream.add(ctx.toResponse());
}
}

await channel.shutdown();
}
}
4 changes: 4 additions & 0 deletions lib/src/resources/topic.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class Topic extends SecureResource<TopicPermission> {
$p.ResourceDeclareRequest(id: resource, topic: $p.TopicResource()));

registrationCompletion.complete(resource);

await channel.shutdown();
}

@override
Expand Down Expand Up @@ -89,5 +91,7 @@ class SubscriptionWorker implements Worker {
requestStream.add(ctx.toResponse());
}
}

await channel.shutdown();
}
}
4 changes: 4 additions & 0 deletions lib/src/resources/websocket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class Websocket extends Resource {
);

await client.declare($p.ResourceDeclareRequest(id: resource));

await channel.shutdown();
}

/// Send message [data] to a connection, referenced by its [connectionId].
Expand Down Expand Up @@ -103,5 +105,7 @@ class WebsocketWorker implements Worker {
requestStream.add(ctx.toResponse());
}
}

await channel.shutdown();
}
}

0 comments on commit 41c0930

Please sign in to comment.