Skip to content

Commit

Permalink
fix: cleanup eventPublisher Subscribers when relay reconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
frnandu committed Nov 4, 2024
1 parent 877509e commit c47d923
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
5 changes: 4 additions & 1 deletion service/delete_app_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,17 @@ func (s *deleteAppConsumer) ConsumeEvent(ctx context.Context, event *events.Even
if s.walletPubkey == walletPubKey {
s.nostrSubscription.Unsub()

// remove this consumer as subscriber in eventPublisher
s.svc.eventPublisher.RemoveSubscriber(s)

// get nip47 event info for this app wallet key
nip47InfoEvent, err := s.svc.GetNip47Service().GetNip47Info(ctx, s.relay, s.walletPubkey)
if err != nil {
logger.Logger.WithError(err).Error("Could not get nip47 info event")
return
}
if nip47InfoEvent != nil {
err := s.svc.nip47Service.PublishNip47InfoDeletion(ctx, s.relay, walletPubKey, walletPrivKey, nip47InfoEvent.ID)
err = s.svc.nip47Service.PublishNip47InfoDeletion(ctx, s.relay, walletPubKey, walletPrivKey, nip47InfoEvent.ID)
if err != nil {
logger.Logger.WithError(err).WithField("event", event).Error("Failed to publish nip47 info deletion")
}
Expand Down
28 changes: 19 additions & 9 deletions service/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (svc *service) startNostr(ctx context.Context) error {
//Start infinite loop which will be only broken by canceling ctx (SIGINT)
var relay *nostr.Relay
waitToReconnectSeconds := 0

var createAppEventListener events.EventSubscriber
for i := 0; ; i++ {

// wait for a delay if any before retrying
Expand Down Expand Up @@ -87,12 +87,16 @@ func (svc *service) startNostr(ctx context.Context) error {
}
waitToReconnectSeconds = 0

// register a subscriber for events of "app_created" which handles creation of nostr subscription for new app
if createAppEventListener != nil {
svc.eventPublisher.RemoveSubscriber(createAppEventListener)
}
createAppEventListener = &createAppConsumer{svc: svc, relay: relay}
svc.eventPublisher.RegisterSubscriber(createAppEventListener)

// start each app wallet subscription which have a child derived wallet key
svc.startAllExistingAppsWalletSubscriptions(ctx, relay)

// register a subscriber for events of "app_created" which handles creation of nostr subscription for new app
svc.eventPublisher.RegisterSubscriber(&createAppConsumer{svc: svc, relay: relay})

// check if there are still legacy apps in DB
var legacyAppCount int64
result := svc.db.Model(&db.App{}).Where("wallet_pubkey IS NULL").Count(&legacyAppCount)
Expand All @@ -116,9 +120,13 @@ func (svc *service) startNostr(ctx context.Context) error {
logger.Logger.WithError(err).Error("Got an error from the relay while listening to subscription.")
continue
}
} else {
// there is only new apps, so no single master key nostr subscription needed
<-ctx.Done()
break
}
<-relay.Context().Done()
if relay.ConnectionError != nil {
//err being non-nil means that we have an error on the websocket error channel. In this case we just try to reconnect.
logger.Logger.WithError(relay.ConnectionError).Error("Got an error from the relay, trying to reconnect")
continue
}
//err being nil means that the context was canceled and we should exit the program.
break
Expand All @@ -142,7 +150,7 @@ func (svc *service) startAllExistingAppsWalletSubscriptions(ctx context.Context,
err := svc.startAppWalletSubscription(ctx, relay, *app.WalletPubkey)
if err != nil {
logger.Logger.WithError(err).WithFields(logrus.Fields{
"app_id": app.ID}).Error("Failed to subscribe to wallet")
"app_id": app.ID}).Error("Subscription error")
return
}
}(app)
Expand All @@ -159,9 +167,11 @@ func (svc *service) startAppWalletSubscription(ctx context.Context, relay *nostr
}

// register a subscriber for "app_deleted" events, which handles nostr subscription cancel and nip47 info event deletion
svc.eventPublisher.RegisterSubscriber(&deleteAppConsumer{nostrSubscription: sub, walletPubkey: appWalletPubKey, svc: svc, relay: relay})
deleteEventSubscriber := deleteAppConsumer{nostrSubscription: sub, walletPubkey: appWalletPubKey, svc: svc, relay: relay}
svc.eventPublisher.RegisterSubscriber(&deleteEventSubscriber)

err = svc.StartSubscription(sub.Context, sub)
svc.eventPublisher.RemoveSubscriber(&deleteEventSubscriber)
if err != nil {
logger.Logger.WithError(err).Error("Got an error from the relay while listening to subscription.")
return err
Expand Down

0 comments on commit c47d923

Please sign in to comment.