diff --git a/service/delete_app_consumer.go b/service/delete_app_consumer.go index 57865ab4..27929b0e 100644 --- a/service/delete_app_consumer.go +++ b/service/delete_app_consumer.go @@ -48,6 +48,9 @@ func (s *deleteAppConsumer) ConsumeEvent(ctx context.Context, event *events.Even if s.walletPubkey == walletPubKey { s.nostrSubscription.Unsub() + // remove this consumer as subscriber in eventPublisher + s.svc.eventPublisher.RemoveSubscriber(s) + // get nip47 event info for this app wallet key nip47InfoEvent, err := s.svc.GetNip47Service().GetNip47Info(ctx, s.relay, s.walletPubkey) if err != nil { @@ -55,7 +58,7 @@ func (s *deleteAppConsumer) ConsumeEvent(ctx context.Context, event *events.Even return } if nip47InfoEvent != nil { - err := s.svc.nip47Service.PublishNip47InfoDeletion(ctx, s.relay, walletPubKey, walletPrivKey, nip47InfoEvent.ID) + err = s.svc.nip47Service.PublishNip47InfoDeletion(ctx, s.relay, walletPubKey, walletPrivKey, nip47InfoEvent.ID) if err != nil { logger.Logger.WithError(err).WithField("event", event).Error("Failed to publish nip47 info deletion") } diff --git a/service/start.go b/service/start.go index dab1a36c..3bb8426e 100644 --- a/service/start.go +++ b/service/start.go @@ -47,7 +47,7 @@ func (svc *service) startNostr(ctx context.Context) error { //Start infinite loop which will be only broken by canceling ctx (SIGINT) var relay *nostr.Relay waitToReconnectSeconds := 0 - + var createAppEventListener events.EventSubscriber for i := 0; ; i++ { // wait for a delay if any before retrying @@ -87,12 +87,16 @@ func (svc *service) startNostr(ctx context.Context) error { } waitToReconnectSeconds = 0 + // register a subscriber for events of "app_created" which handles creation of nostr subscription for new app + if createAppEventListener != nil { + svc.eventPublisher.RemoveSubscriber(createAppEventListener) + } + createAppEventListener = &createAppConsumer{svc: svc, relay: relay} + svc.eventPublisher.RegisterSubscriber(createAppEventListener) + // start each app wallet subscription which have a child derived wallet key svc.startAllExistingAppsWalletSubscriptions(ctx, relay) - // register a subscriber for events of "app_created" which handles creation of nostr subscription for new app - svc.eventPublisher.RegisterSubscriber(&createAppConsumer{svc: svc, relay: relay}) - // check if there are still legacy apps in DB var legacyAppCount int64 result := svc.db.Model(&db.App{}).Where("wallet_pubkey IS NULL").Count(&legacyAppCount) @@ -116,9 +120,13 @@ func (svc *service) startNostr(ctx context.Context) error { logger.Logger.WithError(err).Error("Got an error from the relay while listening to subscription.") continue } - } else { - // there is only new apps, so no single master key nostr subscription needed - <-ctx.Done() + break + } + <-relay.Context().Done() + if relay.ConnectionError != nil { + //err being non-nil means that we have an error on the websocket error channel. In this case we just try to reconnect. + logger.Logger.WithError(relay.ConnectionError).Error("Got an error from the relay, trying to reconnect") + continue } //err being nil means that the context was canceled and we should exit the program. break @@ -142,7 +150,7 @@ func (svc *service) startAllExistingAppsWalletSubscriptions(ctx context.Context, err := svc.startAppWalletSubscription(ctx, relay, *app.WalletPubkey) if err != nil { logger.Logger.WithError(err).WithFields(logrus.Fields{ - "app_id": app.ID}).Error("Failed to subscribe to wallet") + "app_id": app.ID}).Error("Subscription error") return } }(app) @@ -159,9 +167,11 @@ func (svc *service) startAppWalletSubscription(ctx context.Context, relay *nostr } // register a subscriber for "app_deleted" events, which handles nostr subscription cancel and nip47 info event deletion - svc.eventPublisher.RegisterSubscriber(&deleteAppConsumer{nostrSubscription: sub, walletPubkey: appWalletPubKey, svc: svc, relay: relay}) + deleteEventSubscriber := deleteAppConsumer{nostrSubscription: sub, walletPubkey: appWalletPubKey, svc: svc, relay: relay} + svc.eventPublisher.RegisterSubscriber(&deleteEventSubscriber) err = svc.StartSubscription(sub.Context, sub) + svc.eventPublisher.RemoveSubscriber(&deleteEventSubscriber) if err != nil { logger.Logger.WithError(err).Error("Got an error from the relay while listening to subscription.") return err