Skip to content

Commit

Permalink
add ctx to Recv (#259)
Browse files Browse the repository at this point in the history
boolangery authored Nov 14, 2024
1 parent 59d8ecb commit 9496e42
Showing 22 changed files with 94 additions and 37 deletions.
25 changes: 16 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -2954,6 +2954,7 @@ import (
)

func main() {
ctx := context.Background()
client, err := ws.Connect(context.Background(), rpc.MainNetBeta_WS)
if err != nil {
panic(err)
@@ -2971,7 +2972,7 @@ func main() {
defer sub.Unsubscribe()

for {
got, err := sub.Recv()
got, err := sub.Recv(ctx)
if err != nil {
panic(err)
}
@@ -2991,7 +2992,7 @@ func main() {
defer sub.Unsubscribe()

for {
got, err := sub.Recv()
got, err := sub.Recv(ctx)
if err != nil {
panic(err)
}
@@ -3016,6 +3017,7 @@ import (
)

func main() {
ctx := context.Background()
client, err := ws.Connect(context.Background(), rpc.MainNetBeta_WS)
if err != nil {
panic(err)
@@ -3034,7 +3036,7 @@ func main() {
defer sub.Unsubscribe()

for {
got, err := sub.Recv()
got, err := sub.Recv(ctx)
if err != nil {
panic(err)
}
@@ -3053,7 +3055,7 @@ func main() {
defer sub.Unsubscribe()

for {
got, err := sub.Recv()
got, err := sub.Recv(ctx)
if err != nil {
panic(err)
}
@@ -3078,6 +3080,7 @@ import (
)

func main() {
ctx := context.Background()
client, err := ws.Connect(context.Background(), rpc.MainNetBeta_WS)
if err != nil {
panic(err)
@@ -3096,7 +3099,7 @@ func main() {
defer sub.Unsubscribe()

for {
got, err := sub.Recv()
got, err := sub.Recv(ctx)
if err != nil {
panic(err)
}
@@ -3130,6 +3133,7 @@ import (
)

func main() {
ctx := context.Background()
client, err := ws.Connect(context.Background(), rpc.TestNet_WS)
if err != nil {
panic(err)
@@ -3141,7 +3145,7 @@ func main() {
}

for {
got, err := sub.Recv()
got, err := sub.Recv(ctx)
if err != nil {
panic(err)
}
@@ -3165,6 +3169,7 @@ import (
)

func main() {
ctx := context.Background()
client, err := ws.Connect(context.Background(), rpc.TestNet_WS)
if err != nil {
panic(err)
@@ -3182,7 +3187,7 @@ func main() {
defer sub.Unsubscribe()

for {
got, err := sub.Recv()
got, err := sub.Recv(ctx)
if err != nil {
panic(err)
}
@@ -3205,6 +3210,7 @@ import (
)

func main() {
ctx := context.Background()
client, err := ws.Connect(context.Background(), rpc.TestNet_WS)
if err != nil {
panic(err)
@@ -3217,7 +3223,7 @@ func main() {
defer sub.Unsubscribe()

for {
got, err := sub.Recv()
got, err := sub.Recv(ctx)
if err != nil {
panic(err)
}
@@ -3240,6 +3246,7 @@ import (
)

func main() {
ctx := context.Background()
client, err := ws.Connect(context.Background(), rpc.MainNetBeta_WS)
if err != nil {
panic(err)
@@ -3254,7 +3261,7 @@ func main() {
defer sub.Unsubscribe()

for {
got, err := sub.Recv()
got, err := sub.Recv(ctx)
if err != nil {
panic(err)
}
4 changes: 2 additions & 2 deletions programs/serum/rpc.go
Original file line number Diff line number Diff line change
@@ -101,14 +101,14 @@ func FetchMarket(ctx context.Context, rpcCli *rpc.Client, marketAddr solana.Publ
return meta, nil
}

func StreamOpenOrders(client *ws.Client) error {
func StreamOpenOrders(ctx context.Context, client *ws.Client) error {
sub, err := client.ProgramSubscribe(DEXProgramIDV2, rpc.CommitmentSingleGossip)
if err != nil {
return fmt.Errorf("unable to subscribe to programID %q: %w", DEXProgramIDV2, err)
}
count := 0
for {
d, err := sub.Recv()
d, err := sub.Recv(ctx)
if err != nil {
return fmt.Errorf("received error from programID subscription: %w", err)
}
2 changes: 1 addition & 1 deletion programs/serum/rpc_test.go
Original file line number Diff line number Diff line change
@@ -66,6 +66,6 @@ func TestStreamOpenOrders(t *testing.T) {
client, err := ws.Connect(context.Background(), rpcURL)
require.NoError(t, err)

err = StreamOpenOrders(client)
err = StreamOpenOrders(context.Background(), client)
require.NoError(t, err)
}
6 changes: 5 additions & 1 deletion rpc/ws/accountSubscribe.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,8 @@
package ws

import (
"context"

"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
)
@@ -83,8 +85,10 @@ type AccountSubscription struct {
sub *Subscription
}

func (sw *AccountSubscription) Recv() (*AccountResult, error) {
func (sw *AccountSubscription) Recv(ctx context.Context) (*AccountResult, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
5 changes: 4 additions & 1 deletion rpc/ws/blockSubscribe.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
package ws

import (
"context"
"fmt"

"github.com/gagliardetto/solana-go"
@@ -148,8 +149,10 @@ type BlockSubscription struct {
sub *Subscription
}

func (sw *BlockSubscription) Recv() (*BlockResult, error) {
func (sw *BlockSubscription) Recv(ctx context.Context) (*BlockResult, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
8 changes: 4 additions & 4 deletions rpc/ws/client_test.go
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ func Test_AccountSubscribe(t *testing.T) {
sub, err := c.AccountSubscribe(accountID, "")
require.NoError(t, err)

data, err := sub.Recv()
data, err := sub.Recv(context.Background())
if err != nil {
fmt.Println("receive an error: ", err)
return
@@ -95,7 +95,7 @@ func Test_AccountSubscribeWithHttpHeader(t *testing.T) {
sub.Unsubscribe()
}(sub)

data, err := sub.Recv()
data, err := sub.Recv(context.Background())
if err != nil {
t.Errorf("Received an error: %v", err)
}
@@ -127,7 +127,7 @@ func Test_ProgramSubscribe(t *testing.T) {
require.NoError(t, err)

for {
data, err := sub.Recv()
data, err := sub.Recv(context.Background())
if err != nil {
fmt.Println("receive an error: ", err)
return
@@ -148,7 +148,7 @@ func Test_SlotSubscribe(t *testing.T) {
sub, err := c.SlotSubscribe()
require.NoError(t, err)

data, err := sub.Recv()
data, err := sub.Recv(context.Background())
if err != nil {
fmt.Println("receive an error: ", err)
return
4 changes: 2 additions & 2 deletions rpc/ws/examples/accountSubscribe/accountSubscribe.go
Original file line number Diff line number Diff line change
@@ -42,7 +42,7 @@ func main() {
defer sub.Unsubscribe()

for {
got, err := sub.Recv()
got, err := sub.Recv(context.Background())
if err != nil {
panic(err)
}
@@ -62,7 +62,7 @@ func main() {
defer sub.Unsubscribe()

for {
got, err := sub.Recv()
got, err := sub.Recv(context.Background())
if err != nil {
panic(err)
}
5 changes: 3 additions & 2 deletions rpc/ws/examples/logsSubscribe/logsSubscribe.go
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ import (
)

func main() {
ctx := context.Background()
client, err := ws.Connect(context.Background(), rpc.MainNetBeta_WS)
if err != nil {
panic(err)
@@ -43,7 +44,7 @@ func main() {
defer sub.Unsubscribe()

for {
got, err := sub.Recv()
got, err := sub.Recv(ctx)
if err != nil {
panic(err)
}
@@ -62,7 +63,7 @@ func main() {
defer sub.Unsubscribe()

for {
got, err := sub.Recv()
got, err := sub.Recv(ctx)
if err != nil {
panic(err)
}
3 changes: 2 additions & 1 deletion rpc/ws/examples/programSubscribe/programSubscribe.go
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ import (
)

func main() {
ctx := context.Background()
client, err := ws.Connect(context.Background(), rpc.MainNetBeta_WS)
if err != nil {
panic(err)
@@ -43,7 +44,7 @@ func main() {
defer sub.Unsubscribe()

for {
got, err := sub.Recv()
got, err := sub.Recv(ctx)
if err != nil {
panic(err)
}
3 changes: 2 additions & 1 deletion rpc/ws/examples/rootSubscribe/rootSubscribe.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import (
)

func main() {
ctx := context.Background()
client, err := ws.Connect(context.Background(), rpc.TestNet_WS)
if err != nil {
panic(err)
@@ -35,7 +36,7 @@ func main() {
}

for {
got, err := sub.Recv()
got, err := sub.Recv(ctx)
if err != nil {
panic(err)
}
3 changes: 2 additions & 1 deletion rpc/ws/examples/signatureSubscribe/signatureSubscribe.go
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ import (
)

func main() {
ctx := context.Background()
client, err := ws.Connect(context.Background(), rpc.TestNet_WS)
if err != nil {
panic(err)
@@ -42,7 +43,7 @@ func main() {
defer sub.Unsubscribe()

for {
got, err := sub.Recv()
got, err := sub.Recv(ctx)
if err != nil {
panic(err)
}
3 changes: 2 additions & 1 deletion rpc/ws/examples/slotSubscribe/slotSubscribe.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import (
)

func main() {
ctx := context.Background()
client, err := ws.Connect(context.Background(), rpc.TestNet_WS)
if err != nil {
panic(err)
@@ -36,7 +37,7 @@ func main() {
defer sub.Unsubscribe()

for {
got, err := sub.Recv()
got, err := sub.Recv(ctx)
if err != nil {
panic(err)
}
3 changes: 2 additions & 1 deletion rpc/ws/examples/voteSubscribe/voteSubscribe.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import (
)

func main() {
ctx := context.Background()
client, err := ws.Connect(context.Background(), rpc.MainNetBeta_WS)
if err != nil {
panic(err)
@@ -38,7 +39,7 @@ func main() {
defer sub.Unsubscribe()

for {
got, err := sub.Recv()
got, err := sub.Recv(ctx)
if err != nil {
panic(err)
}
6 changes: 5 additions & 1 deletion rpc/ws/logsSubscribe.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,8 @@
package ws

import (
"context"

"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
)
@@ -107,8 +109,10 @@ type LogSubscription struct {
sub *Subscription
}

func (sw *LogSubscription) Recv() (*LogResult, error) {
func (sw *LogSubscription) Recv(ctx context.Context) (*LogResult, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
6 changes: 5 additions & 1 deletion rpc/ws/parsedBlockSubscribe.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,8 @@
package ws

import (
"context"

"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
)
@@ -92,8 +94,10 @@ type ParsedBlockSubscription struct {
sub *Subscription
}

func (sw *ParsedBlockSubscription) Recv() (*ParsedBlockResult, error) {
func (sw *ParsedBlockSubscription) Recv(ctx context.Context) (*ParsedBlockResult, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case d := <-sw.sub.stream:
return d.(*ParsedBlockResult), nil
case err := <-sw.sub.err:
6 changes: 5 additions & 1 deletion rpc/ws/programSubscribe.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,8 @@
package ws

import (
"context"

"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
)
@@ -86,8 +88,10 @@ type ProgramSubscription struct {
sub *Subscription
}

func (sw *ProgramSubscription) Recv() (*ProgramResult, error) {
func (sw *ProgramSubscription) Recv(ctx context.Context) (*ProgramResult, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
6 changes: 5 additions & 1 deletion rpc/ws/rootSubscribe.go
Original file line number Diff line number Diff line change
@@ -14,6 +14,8 @@

package ws

import "context"

type RootResult uint64

// SignatureSubscribe subscribes to receive notification
@@ -42,8 +44,10 @@ type RootSubscription struct {
sub *Subscription
}

func (sw *RootSubscription) Recv() (*RootResult, error) {
func (sw *RootSubscription) Recv(ctx context.Context) (*RootResult, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
5 changes: 4 additions & 1 deletion rpc/ws/signatureSubscribe.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
package ws

import (
"context"
"fmt"
"time"

@@ -67,8 +68,10 @@ type SignatureSubscription struct {
sub *Subscription
}

func (sw *SignatureSubscription) Recv() (*SignatureResult, error) {
func (sw *SignatureSubscription) Recv(ctx context.Context) (*SignatureResult, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
6 changes: 5 additions & 1 deletion rpc/ws/slotSubscribe.go
Original file line number Diff line number Diff line change
@@ -14,6 +14,8 @@

package ws

import "context"

type SlotResult struct {
Parent uint64 `json:"parent"`
Root uint64 `json:"root"`
@@ -45,8 +47,10 @@ type SlotSubscription struct {
sub *Subscription
}

func (sw *SlotSubscription) Recv() (*SlotResult, error) {
func (sw *SlotSubscription) Recv(ctx context.Context) (*SlotResult, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
10 changes: 8 additions & 2 deletions rpc/ws/slotsUpdatesSubscribe.go
Original file line number Diff line number Diff line change
@@ -14,7 +14,11 @@

package ws

import "github.com/gagliardetto/solana-go"
import (
"context"

"github.com/gagliardetto/solana-go"
)

type SlotsUpdatesResult struct {
// The parent slot.
@@ -77,8 +81,10 @@ type SlotsUpdatesSubscription struct {
sub *Subscription
}

func (sw *SlotsUpdatesSubscription) Recv() (*SlotsUpdatesResult, error) {
func (sw *SlotsUpdatesSubscription) Recv(ctx context.Context) (*SlotsUpdatesResult, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
6 changes: 5 additions & 1 deletion rpc/ws/subscription.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,8 @@

package ws

import "context"

type Subscription struct {
req *request
subID uint64
@@ -47,8 +49,10 @@ func newSubscription(
}
}

func (s *Subscription) Recv() (interface{}, error) {
func (s *Subscription) Recv(ctx context.Context) (interface{}, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case d := <-s.stream:
return d, nil
case err := <-s.err:
6 changes: 5 additions & 1 deletion rpc/ws/voteSubscribe.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,8 @@
package ws

import (
"context"

"github.com/gagliardetto/solana-go"
)

@@ -59,8 +61,10 @@ type VoteSubscription struct {
sub *Subscription
}

func (sw *VoteSubscription) Recv() (*VoteResult, error) {
func (sw *VoteSubscription) Recv(ctx context.Context) (*VoteResult, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed

0 comments on commit 9496e42

Please sign in to comment.