From b261afa02993e6c75d3761adc7d7b8a3bffab33c Mon Sep 17 00:00:00 2001 From: Airon <111259764+airon-applyinnovations@users.noreply.github.com> Date: Sat, 2 Sep 2023 10:34:00 +0800 Subject: [PATCH 1/5] feat: added optional WithKeepAlive and WithRetryDelay (#107) Some servers require the client to send ping to prevent the connection from being broken due to idling. We have added WithKeepAlive which accepts time.Duration will be used as an interval for doing a continuous ping to the server. This is also implemented in npm package graphql-ws. WithRetryDelay makes retry delay customizable. If not set it will use the default (currently hard coded 1 second). --------- Co-authored-by: applyinnovations --- subscription.go | 57 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 5 deletions(-) diff --git a/subscription.go b/subscription.go index 0664590..2efd32e 100644 --- a/subscription.go +++ b/subscription.go @@ -102,6 +102,7 @@ func (om OperationMessage) String() string { type WebsocketConn interface { ReadJSON(v interface{}) error WriteJSON(v interface{}) error + Ping() error Close() error // SetReadLimit sets the maximum size in bytes for a message read from the peer. If a // message exceeds the limit, the connection sends a close message to the peer @@ -363,6 +364,8 @@ type SubscriptionClient struct { onError func(sc *SubscriptionClient, err error) error errorChan chan error exitWhenNoSubscription bool + keepAliveInterval time.Duration + retryDelay time.Duration mutex sync.Mutex } @@ -377,6 +380,8 @@ func NewSubscriptionClient(url string) *SubscriptionClient { errorChan: make(chan error), protocol: &subscriptionsTransportWS{}, exitWhenNoSubscription: true, + keepAliveInterval: 0 * time.Second, + retryDelay: 1 * time.Second, context: &SubscriptionContext{ subscriptions: make(map[string]Subscription), }, @@ -459,6 +464,39 @@ func (sc *SubscriptionClient) WithExitWhenNoSubscription(value bool) *Subscripti return sc } +// Keep alive subroutine to send ping on specified interval +func startKeepAlive(ctx context.Context, c WebsocketConn, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // Ping the websocket. You might want to handle any potential errors. + err := c.Ping() + if err != nil { + fmt.Printf("%s => Failed to ping server\n", time.Now().Format(time.TimeOnly)) + // Handle the error, maybe log it, close the connection, etc. + } + case <-ctx.Done(): + // If the context is cancelled, stop the pinging. + return + } + } +} + +// WithKeepAlive programs the websocket to ping on the specified interval +func (sc *SubscriptionClient) WithKeepAlive(interval time.Duration) *SubscriptionClient { + sc.keepAliveInterval = interval + return sc +} + +// WithRetryDelay set the delay time before retrying the connection +func (sc *SubscriptionClient) WithRetryDelay(delay time.Duration) *SubscriptionClient { + sc.retryDelay = delay + return sc +} + // WithLog sets logging function to print out received messages. By default, nothing is printed func (sc *SubscriptionClient) WithLog(logger func(args ...interface{})) *SubscriptionClient { sc.context.log = logger @@ -580,8 +618,8 @@ func (sc *SubscriptionClient) init() error { } return err } - ctx.Log(fmt.Sprintf("%s. retry in second...", err.Error()), "client", GQLInternal) - time.Sleep(time.Second) + ctx.Log(fmt.Sprintf("%s. retry in %d second...", err.Error(), sc.retryDelay/time.Second), "client", GQLInternal) + time.Sleep(sc.retryDelay) } } @@ -709,6 +747,10 @@ func (sc *SubscriptionClient) Run() error { sc.setClientStatus(scStatusRunning) ctx := subContext.GetContext() + if sc.keepAliveInterval > 0 { + go startKeepAlive(ctx, conn, sc.keepAliveInterval) + } + go func() { for { select { @@ -966,6 +1008,13 @@ func (wh *WebsocketHandler) ReadJSON(v interface{}) error { return wsjson.Read(ctx, wh.Conn, v) } +// Ping sends a ping to the peer and waits for a pong +func (wh *WebsocketHandler) Ping() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return wh.Conn.Ping(ctx) +} + // Close implements the function to close the websocket connection func (wh *WebsocketHandler) Close() error { return wh.Conn.Close(websocket.StatusNormalClosure, "close websocket") @@ -977,9 +1026,7 @@ func (wh *WebsocketHandler) GetCloseStatus(err error) int32 { // context timeout error returned from ReadJSON or WriteJSON // try to ping the server, if failed return abnormal closeure error if errors.Is(err, context.DeadlineExceeded) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if pingErr := wh.Ping(ctx); pingErr != nil { + if pingErr := wh.Ping(); pingErr != nil { return int32(websocket.StatusNoStatusRcvd) } return -1 From 43b90bcfebe0df74164ba120d29235d234112916 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zden=C4=9Bk=20Dev=C3=A1t=C3=BD?= Date: Mon, 18 Sep 2023 11:34:10 +0200 Subject: [PATCH 2/5] Add debugging details to readme (#109) * Add debugging details to readme --------- Co-authored-by: Toan Nguyen --- README.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index bed385e..136ef8e 100644 --- a/README.md +++ b/README.md @@ -906,23 +906,24 @@ Enable debug mode with the `WithDebug` function. If the request is failed, the r } ``` -Because the GraphQL query string is generated in runtime using reflection, it isn't really safe. To assure the GraphQL query is expected, it's necessary to write some unit test for query construction. - +For debugging queries, you can use `Construct*` functions to see what the generated query looks like: ```go // ConstructQuery build GraphQL query string from struct and variables func ConstructQuery(v interface{}, variables map[string]interface{}, options ...Option) (string, error) -// ConstructQuery build GraphQL mutation string from struct and variables +// ConstructMutation build GraphQL mutation string from struct and variables func ConstructMutation(v interface{}, variables map[string]interface{}, options ...Option) (string, error) // ConstructSubscription build GraphQL subscription string from struct and variables -func ConstructSubscription(v interface{}, variables map[string]interface{}, options ...Option) (string, error) +func ConstructSubscription(v interface{}, variables map[string]interface{}, options ...Option) (string, string, error) // UnmarshalGraphQL parses the JSON-encoded GraphQL response data and stores // the result in the GraphQL query data structure pointed to by v. func UnmarshalGraphQL(data []byte, v interface{}) error ``` +Because the GraphQL query string is generated in runtime using reflection, it isn't really safe. To assure the GraphQL query is expected, it's necessary to write some unit test for query construction. + Directories ----------- From dacf52db2541995f8cce5f23feba2ca744f5dd6d Mon Sep 17 00:00:00 2001 From: Toan Nguyen Date: Sun, 3 Dec 2023 09:14:57 +0700 Subject: [PATCH 3/5] add sync mode for subscription events (#114) --- README.md | 4 +++- subscription.go | 18 ++++++++++++++++-- subscription_test.go | 12 +++++++++++- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 136ef8e..f0d699d 100644 --- a/README.md +++ b/README.md @@ -590,7 +590,9 @@ client. WithExitWhenNoSubscription(false). // WithRetryStatusCodes allow retry the subscription connection when receiving one of these codes // the input parameter can be number string or range, e.g 4000-5000 - WithRetryStatusCodes("4000", "4000-4050") + WithRetryStatusCodes("4000", "4000-4050"). + // WithSyncMode subscription messages are executed in sequence (without goroutine) + WithSyncMode(true) ``` #### Subscription Protocols diff --git a/subscription.go b/subscription.go index 2efd32e..e34d6c9 100644 --- a/subscription.go +++ b/subscription.go @@ -364,6 +364,7 @@ type SubscriptionClient struct { onError func(sc *SubscriptionClient, err error) error errorChan chan error exitWhenNoSubscription bool + syncMode bool keepAliveInterval time.Duration retryDelay time.Duration mutex sync.Mutex @@ -464,6 +465,12 @@ func (sc *SubscriptionClient) WithExitWhenNoSubscription(value bool) *Subscripti return sc } +// WithSyncMode subscription messages are executed in sequence (without goroutine) +func (sc *SubscriptionClient) WithSyncMode(value bool) *SubscriptionClient { + sc.syncMode = value + return sc +} + // Keep alive subroutine to send ping on specified interval func startKeepAlive(ctx context.Context, c WebsocketConn, interval time.Duration) { ticker := time.NewTicker(interval) @@ -806,13 +813,20 @@ func (sc *SubscriptionClient) Run() error { if sub == nil { sub = &Subscription{} } - go func() { + + execMessage := func() { if err := sc.protocol.OnMessage(subContext, *sub, message); err != nil { sc.errorChan <- err } sc.checkSubscriptionStatuses(subContext) - }() + } + + if sc.syncMode { + execMessage() + } else { + go execMessage() + } } } }() diff --git a/subscription_test.go b/subscription_test.go index 2e3b9b7..5add1f2 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -13,7 +13,8 @@ import ( "nhooyr.io/websocket" ) -func TestSubscription_LifeCycleEvents(t *testing.T) { +func testSubscription_LifeCycleEvents(t *testing.T, syncMode bool) { + server := subscription_setupServer(8082) client, subscriptionClient := subscription_setupClients(8082) msg := randomID() @@ -84,6 +85,7 @@ func TestSubscription_LifeCycleEvents(t *testing.T) { subscriptionClient = subscriptionClient. WithExitWhenNoSubscription(false). WithTimeout(3 * time.Second). + WithSyncMode(syncMode). OnConnected(func() { lock.Lock() defer lock.Unlock() @@ -200,6 +202,14 @@ func TestSubscription_LifeCycleEvents(t *testing.T) { } } +func TestSubscription_LifeCycleEvents(t *testing.T) { + testSubscription_LifeCycleEvents(t, false) +} + +func TestSubscription_WithSyncMode(t *testing.T) { + testSubscription_LifeCycleEvents(t, true) +} + func TestSubscription_WithRetryStatusCodes(t *testing.T) { stop := make(chan bool) msg := randomID() From 57b21fd5b5991927ddfdc5093d4006abd1059e15 Mon Sep 17 00:00:00 2001 From: Toan Nguyen Date: Sat, 6 Jan 2024 21:27:06 +0700 Subject: [PATCH 4/5] update latest package 20240106 (#118) --- go.mod | 15 ++++++++------- go.sum | 17 +++++++++++++++++ 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 16b0aa3..85acea0 100644 --- a/go.mod +++ b/go.mod @@ -3,17 +3,18 @@ module github.com/hasura/go-graphql-client go 1.20 require ( - github.com/google/uuid v1.3.0 + github.com/google/uuid v1.5.0 github.com/graph-gophers/graphql-go v1.5.0 github.com/graph-gophers/graphql-transport-ws v0.0.2 - nhooyr.io/websocket v1.8.7 + nhooyr.io/websocket v1.8.10 ) require ( - github.com/gorilla/websocket v1.5.0 // indirect - github.com/klauspost/compress v1.16.7 // indirect - golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b // indirect - golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect + github.com/gorilla/websocket v1.5.1 // indirect + github.com/klauspost/compress v1.17.4 // indirect + golang.org/x/crypto v0.17.0 // indirect + golang.org/x/net v0.19.0 // indirect + golang.org/x/sys v0.16.0 // indirect ) -replace github.com/gin-gonic/gin v1.6.3 => github.com/gin-gonic/gin v1.7.7 +replace github.com/gin-gonic/gin v1.6.3 => github.com/gin-gonic/gin v1.9.1 diff --git a/go.sum b/go.sum index d9d7fcd..0953f3e 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,4 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -30,10 +31,14 @@ github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8 github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/graph-gophers/graphql-go v1.5.0 h1:fDqblo50TEpD0LY7RXk/LFVYEVqo3+tXMNMPSVXA1yc= github.com/graph-gophers/graphql-go v1.5.0/go.mod h1:YtmJZDLbF1YYNrlNAuiO5zAStUWc3XZT07iGsVqe1Os= github.com/graph-gophers/graphql-transport-ws v0.0.2 h1:DbmSkbIGzj8SvHei6n8Mh9eLQin8PtA8xY9eCzjRpvo= @@ -44,6 +49,8 @@ github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eT github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= +github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= @@ -71,8 +78,13 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b h1:Qwe1rC8PSniVfAFPFJeyUkB+zcysC3RgJBAGk7eqBEU= golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -81,6 +93,9 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad h1:ntjMns5wyP/fN65tdBD4g8J5w8n015+iIIs9rtjXkY0= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -97,3 +112,5 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= +nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q= +nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= From 48aa45cb39ed78c5bee391e8c7043078963eb0c2 Mon Sep 17 00:00:00 2001 From: Toan Nguyen Date: Sun, 7 Jan 2024 00:00:01 +0700 Subject: [PATCH 5/5] fix: handle closed error on subscription (#119) --- subscription.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/subscription.go b/subscription.go index e34d6c9..0318fc1 100644 --- a/subscription.go +++ b/subscription.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "net" "net/http" "strconv" "strings" @@ -293,6 +294,9 @@ func (sc *SubscriptionContext) Close() error { sc.Cancel() + if errors.Is(err, net.ErrClosed) { + return nil + } return err } @@ -767,7 +771,7 @@ func (sc *SubscriptionClient) Run() error { var message OperationMessage if err := conn.ReadJSON(&message); err != nil { // manual EOF check - if err == io.EOF || strings.Contains(err.Error(), "EOF") || strings.Contains(err.Error(), "connection reset by peer") { + if err == io.EOF || strings.Contains(err.Error(), "EOF") || errors.Is(err, net.ErrClosed) || strings.Contains(err.Error(), "connection reset by peer") { sc.errorChan <- errRetry return } @@ -922,7 +926,7 @@ func (sc *SubscriptionClient) close(ctx *SubscriptionContext) (err error) { continue } if sub.status == SubscriptionRunning { - if err := sc.protocol.Unsubscribe(ctx, sub); err != nil { + if err := sc.protocol.Unsubscribe(ctx, sub); err != nil && !errors.Is(err, net.ErrClosed) { unsubscribeErrors[key] = err } }