From cdba022a09be13abbea7c2b8bd09cf6720cc4e89 Mon Sep 17 00:00:00 2001 From: Ryan Cartwright Date: Fri, 4 Oct 2024 13:49:57 +1000 Subject: [PATCH] handle sigterm and sigint in worker code --- lib/src/workers/api.dart | 2 +- lib/src/workers/batch.dart | 2 +- lib/src/workers/blob_event.dart | 2 +- lib/src/workers/common.dart | 15 ++++++++++++++- lib/src/workers/file_event.dart | 2 +- lib/src/workers/interval.dart | 2 +- lib/src/workers/subscription.dart | 2 +- lib/src/workers/websocket.dart | 2 +- 8 files changed, 21 insertions(+), 8 deletions(-) diff --git a/lib/src/workers/api.dart b/lib/src/workers/api.dart index 5e14ac9..3a342a7 100644 --- a/lib/src/workers/api.dart +++ b/lib/src/workers/api.dart @@ -18,7 +18,7 @@ class ApiWorker extends Worker<$ap.ApiClient> { /// Start the route handler. @override - Future start() async { + Future _startWorkerLoop() async { // Create API client var options = $ap.ApiWorkerOptions(securityDisabled: security.isEmpty); diff --git a/lib/src/workers/batch.dart b/lib/src/workers/batch.dart index d474e85..4cd4f0a 100644 --- a/lib/src/workers/batch.dart +++ b/lib/src/workers/batch.dart @@ -7,7 +7,7 @@ class JobWorker extends Worker<$jp.JobClient> { JobWorker(this.registrationRequest, this.middleware); @override - Future start() async { + Future _startWorkerLoop() async { // Create the request to register the subscription with the membrane final initMsg = $jp.ClientMessage(registrationRequest: registrationRequest); diff --git a/lib/src/workers/blob_event.dart b/lib/src/workers/blob_event.dart index db27dd8..4630f12 100644 --- a/lib/src/workers/blob_event.dart +++ b/lib/src/workers/blob_event.dart @@ -7,7 +7,7 @@ class BlobEventWorker extends Worker<$bp.StorageListenerClient> { BlobEventWorker(this.registrationRequest, this.middleware); @override - Future start() async { + Future _startWorkerLoop() async { final initMsg = $bp.ClientMessage(registrationRequest: registrationRequest); // Create the request stream and send the initial message diff --git a/lib/src/workers/common.dart b/lib/src/workers/common.dart index 76a1a24..1f4038e 100644 --- a/lib/src/workers/common.dart +++ b/lib/src/workers/common.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:io'; import 'package:grpc/grpc.dart'; @@ -26,5 +27,17 @@ part 'websocket.dart'; part 'batch.dart'; abstract class Worker { - Future start(); + Future _startWorkerLoop(); + + Future start() async { + ProcessSignal.sigint.watch().listen((signal) { + exit(0); + }); + + ProcessSignal.sigterm.watch().listen((signal) { + exit(0); + }); + + await _startWorkerLoop(); + } } diff --git a/lib/src/workers/file_event.dart b/lib/src/workers/file_event.dart index 8842d64..2d8acc9 100644 --- a/lib/src/workers/file_event.dart +++ b/lib/src/workers/file_event.dart @@ -8,7 +8,7 @@ class FileEventWorker extends Worker<$bp.StorageListenerClient> { FileEventWorker(this.registrationRequest, this.middleware, this.bucket); @override - Future start() async { + Future _startWorkerLoop() async { final initMsg = $bp.ClientMessage(registrationRequest: registrationRequest); // Create the request stream and send the initial message diff --git a/lib/src/workers/interval.dart b/lib/src/workers/interval.dart index 0790eb9..b251896 100644 --- a/lib/src/workers/interval.dart +++ b/lib/src/workers/interval.dart @@ -8,7 +8,7 @@ class IntervalWorker extends Worker<$sp.SchedulesClient> { /// Starts the interval handling loop to run the [middleware] at a certain frequency. Uses the [registrationRequest] to register the interval with the Nitric server. @override - Future start() async { + Future _startWorkerLoop() async { final initMsg = $sp.ClientMessage(registrationRequest: registrationRequest); // Create the request stream and send the initial message diff --git a/lib/src/workers/subscription.dart b/lib/src/workers/subscription.dart index 42d4afb..4f3b2e1 100644 --- a/lib/src/workers/subscription.dart +++ b/lib/src/workers/subscription.dart @@ -7,7 +7,7 @@ class SubscriptionWorker extends Worker<$tp.SubscriberClient> { SubscriptionWorker(this.registrationRequest, this.middleware); @override - Future start() async { + Future _startWorkerLoop() async { // Create the request to register the subscription with the membrane final initMsg = $tp.ClientMessage(registrationRequest: registrationRequest); diff --git a/lib/src/workers/websocket.dart b/lib/src/workers/websocket.dart index 81000e2..b935d54 100644 --- a/lib/src/workers/websocket.dart +++ b/lib/src/workers/websocket.dart @@ -7,7 +7,7 @@ class WebsocketWorker extends Worker<$wp.WebsocketHandlerClient> { WebsocketWorker(this.registrationRequest, this.middleware); @override - Future start() async { + Future _startWorkerLoop() async { final initMsg = $wp.ClientMessage(registrationRequest: registrationRequest); // Create the request stream and send the initial message