Skip to content

Commit

Permalink
fix tests and add lastReceivedMessageAt
Browse files Browse the repository at this point in the history
  • Loading branch information
hgiasac committed Feb 9, 2025
1 parent ea771ec commit ba04041
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
4 changes: 2 additions & 2 deletions example/subscription/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,8 @@ func testSubscription_LifeCycleEvents(t *testing.T, syncMode bool) {
t.Fatalf("failed to listen OnSubscriptionComplete event. got %+v, want: %+v", len(subscriptionResults), len(fixtures))
}
for i, s := range subscriptionResults {
if s.GetID() != fixtures[i].ExpectedID {
t.Fatalf("%d: subscription id not matched, got: %s, want: %s", i, s.GetPayload().Query, fixtures[i].ExpectedPayload.Query)
if s.GetKey() != fixtures[i].ExpectedID {
t.Fatalf("%d: subscription id not matched, got: %s, want: %s", i, s.GetKey(), fixtures[i].ExpectedID)
}
if s.GetPayload().Query != fixtures[i].ExpectedPayload.Query {
t.Fatalf("%d: query output not matched, got: %s, want: %s", i, s.GetPayload().Query, fixtures[i].ExpectedPayload.Query)
Expand Down
20 changes: 15 additions & 5 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,12 @@ type SubscriptionContext struct {
client *SubscriptionClient
websocketConn WebsocketConn

connectionInitAt time.Time
acknowledged int32
cancel context.CancelFunc
subscriptions map[string]Subscription
mutex sync.Mutex
connectionInitAt time.Time
lastReceivedMessageAt time.Time
acknowledged int32
cancel context.CancelFunc
subscriptions map[string]Subscription
mutex sync.Mutex
}

// Log prints condition logging with message type filters
Expand Down Expand Up @@ -230,6 +231,13 @@ func (sc *SubscriptionContext) SetWebsocketConn(conn WebsocketConn) {
sc.websocketConn = conn
}

func (sc *SubscriptionContext) setLastReceivedMessageAt(t time.Time) {
sc.mutex.Lock()
defer sc.mutex.Unlock()

sc.lastReceivedMessageAt = t
}

// GetSubscription get the subscription state by id
func (sc *SubscriptionContext) GetSubscription(id string) *Subscription {
sc.mutex.Lock()
Expand Down Expand Up @@ -384,6 +392,7 @@ func (sc *SubscriptionContext) init(parentContext context.Context) error {
}
}

// run the subscription client goroutine session to receive WebSocket messages.
func (sc *SubscriptionContext) run() {
for {
select {
Expand Down Expand Up @@ -446,6 +455,7 @@ func (sc *SubscriptionContext) run() {
continue
}

sc.setLastReceivedMessageAt(time.Now())
sub := sc.GetSubscription(message.ID)
if sub == nil {
sub = &Subscription{}
Expand Down

0 comments on commit ba04041

Please sign in to comment.