Skip to content

Commit

Permalink
fix: subscription goroutine leaks (#162)
Browse files Browse the repository at this point in the history
* fix subscription goroutine leaks and add timeout options
  • Loading branch information
hgiasac authored Feb 19, 2025
1 parent 3b34c44 commit 5970b87
Show file tree
Hide file tree
Showing 10 changed files with 1,092 additions and 510 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Lint

on:
push:
paths:
- "**.go"
- "go.mod"
- "go.sum"
- ".github/workflows/*.yml"
- "example/hasura/docker-compose.yaml"

jobs:
lint:
name: Run Go lint
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: "1.20"
- name: Install dependencies
run: |
go get -t -v ./...
go install ./...
- name: Format
run: diff -u <(echo -n) <(gofmt -d -s .)
- name: Vet
run: go vet ./...
- name: Lint
uses: golangci/golangci-lint-action@v6
with:
version: latest
only-new-issues: true
skip-cache: true
args: --timeout=120s
21 changes: 3 additions & 18 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@ name: Unit tests
on:
pull_request:
push:
paths:
- "**.go"
- "go.mod"
- "go.sum"
- ".github/workflows/*.yml"
- "example/hasura/docker-compose.yaml"
branches:
- master

jobs:
test-go:
name: Run Go lint and unit tests
name: Run unit and integration tests
runs-on: ubuntu-latest
permissions:
pull-requests: write
Expand All @@ -30,21 +26,10 @@ jobs:
run: |
go get -t -v ./...
go install ./...
- name: Format
run: diff -u <(echo -n) <(gofmt -d -s .)
- name: Vet
run: go vet ./...
- name: Setup integration test infrastructure
run: |
cd ./example/hasura
docker compose up -d
- name: Lint
uses: golangci/golangci-lint-action@v6
with:
version: latest
only-new-issues: true
skip-cache: true
args: --timeout=120s
- name: Run Go unit tests for example/subscription
run: |
cd example/subscription
Expand Down
49 changes: 47 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ For more information, see package [`github.com/shurcooL/githubv4`](https://githu
- [Options](#options)
- [Subscription Protocols](#subscription-protocols)
- [Handle connection error](#handle-connection-error)
- [Connection Initialisation Timeout](#connection-initialisation-timeout)
- [WebSocket Connection Idle Timeout](#websocket-connection-idle-timeout)
- [Events](#events)
- [Custom HTTP Client](#custom-http-client)
- [Custom WebSocket client](#custom-websocket-client)
Expand Down Expand Up @@ -646,15 +648,58 @@ GraphQL servers can define custom WebSocket error codes in the 3000-4999 range.
```go
client := graphql.NewSubscriptionClient(serverEndpoint).
OnError(func(sc *graphql.SubscriptionClient, err error) error {
if strings.Contains(err.Error(), "invalid x-hasura-admin-secret/x-hasura-access-key") {
if sc.IsUnauthorized(err) || strings.Contains(err.Error(), "invalid x-hasura-admin-secret/x-hasura-access-key") {
// exit the subscription client due to unauthorized error
return err
}
// otherwise ignore the error and the client continues to run

if sc.IsInternalConnectionError(err) {
return err
}

// otherwise ignore the error and the client will restart.
return nil
})
```
##### Connection Initialisation Timeout
The connection initialisation timeout error happens when the subscription client emitted the `ConnectionInit` event but hasn't received any message for a long duration. The default timeout is a minute. You can adjust the timeout by calling the `WithConnectionInitialisationTimeout` method. This error is disabled if the timeout duration is `0`.
```go
client := graphql.NewSubscriptionClient(serverEndpoint).
WithConnectionInitialisationTimeout(2*time.Minute).
OnError(func(sc *graphql.SubscriptionClient, err error) error {
if sc.IsConnectionInitialisationTimeout(err) {
// restart the client
return nil
}

// catch other errors...

return err
})
```
##### WebSocket Connection Idle Timeout
This error happens if the websocket connection idle timeout duration is larger than `0` and the subscription client doesn't receive any message from the server, include keep-alive message for a long duration. The setting is disabled by default and can be configured by the `WithWebsocketConnectionIdleTimeout` method.
```go
client := graphql.NewSubscriptionClient(serverEndpoint).
WithWebsocketConnectionIdleTimeout(time.Minute).
OnError(func(sc *graphql.SubscriptionClient, err error) error {
if sc.IsWebsocketConnectionIdleTimeout(err) {
// restart the client
return nil
}

// catch other errors...

return err
})
```
#### Events
```Go
Expand Down
5 changes: 2 additions & 3 deletions example/hasura/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "3.7"

services:
postgres:
image: postgres:15
Expand All @@ -10,7 +8,7 @@ services:
POSTGRES_PASSWORD: postgrespassword

hasura:
image: hasura/graphql-engine:v2.16.1.cli-migrations-v3
image: hasura/graphql-engine:v2.45.1.cli-migrations-v3
depends_on:
- "postgres"
ports:
Expand All @@ -24,6 +22,7 @@ services:
## enable the console served by server
HASURA_GRAPHQL_ENABLE_CONSOLE: "true" # set to "false" to disable console
HASURA_GRAPHQL_ENABLED_LOG_TYPES: startup,http-log,query-log,webhook-log,websocket-log
HASURA_GRAPHQL_LOG_LEVEL: debug
## enable debugging mode. It is recommended to disable this in production
HASURA_GRAPHQL_DEV_MODE: "true"
HASURA_GRAPHQL_ADMIN_SECRET: hasura
Expand Down
70 changes: 68 additions & 2 deletions example/subscription/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,8 @@ func testSubscription_LifeCycleEvents(t *testing.T, syncMode bool) {
t.Fatalf("failed to listen OnSubscriptionComplete event. got %+v, want: %+v", len(subscriptionResults), len(fixtures))
}
for i, s := range subscriptionResults {
if s.GetID() != fixtures[i].ExpectedID {
t.Fatalf("%d: subscription id not matched, got: %s, want: %s", i, s.GetPayload().Query, fixtures[i].ExpectedPayload.Query)
if s.GetKey() != fixtures[i].ExpectedID {
t.Fatalf("%d: subscription id not matched, got: %s, want: %s", i, s.GetKey(), fixtures[i].ExpectedID)
}
if s.GetPayload().Query != fixtures[i].ExpectedPayload.Query {
t.Fatalf("%d: query output not matched, got: %s, want: %s", i, s.GetPayload().Query, fixtures[i].ExpectedPayload.Query)
Expand All @@ -556,3 +556,69 @@ func TestSubscription_LifeCycleEvents(t *testing.T) {
func TestSubscription_WithSyncMode(t *testing.T) {
testSubscription_LifeCycleEvents(t, true)
}

func TestTransportWS_ConnectionIdleTimeout(t *testing.T) {
server := subscription_setupServer(8081)
_, subscriptionClient := subscription_setupClients(8081)
msg := randomID()
go func() {
if err := server.ListenAndServe(); err != nil {
log.Println(err)
}
}()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer func() {
_ = server.Shutdown(ctx)
}()
defer cancel()

subscriptionClient.
WithWebsocketConnectionIdleTimeout(2 * time.Second).
OnError(func(sc *gql.SubscriptionClient, err error) error {
return err
})

/*
subscription {
helloSaid {
id
msg
}
}
*/
var sub struct {
HelloSaid struct {
ID gql.String
Message gql.String `graphql:"msg" json:"msg"`
} `graphql:"helloSaid" json:"helloSaid"`
}

_, err := subscriptionClient.Subscribe(sub, nil, func(data []byte, e error) error {
if e != nil {
t.Fatalf("got error: %v, want: nil", e)
return nil
}

log.Println("result", string(data))
e = json.Unmarshal(data, &sub)
if e != nil {
t.Fatalf("got error: %v, want: nil", e)
return nil
}

if sub.HelloSaid.Message != gql.String(msg) {
t.Fatalf("subscription message does not match. got: %s, want: %s", sub.HelloSaid.Message, msg)
}

return errors.New("exit")
})

if err != nil {
t.Fatalf("got error: %v, want: nil", err)
}

if err := subscriptionClient.Run(); err == nil || !errors.Is(err, gql.ErrWebsocketConnectionIdleTimeout) {
t.Errorf("got error: %v, want: %s", err, gql.ErrWebsocketConnectionIdleTimeout)
}
}
Loading

0 comments on commit 5970b87

Please sign in to comment.