Skip to content

Commit

Permalink
session: implement Call and CallStreaming
Browse files Browse the repository at this point in the history
  • Loading branch information
enr0n committed Aug 20, 2024
1 parent eac4c79 commit 2c7e180
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 11 deletions.
54 changes: 53 additions & 1 deletion vici/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"errors"
"fmt"
"iter"
"net"
"sync"
)
Expand Down Expand Up @@ -194,20 +195,71 @@ func withTestConn(conn net.Conn) SessionOption {
// the command fails, the response Message is returned along with the error returned by
// Message.Err.
func (s *Session) CommandRequest(cmd string, msg *Message) (*Message, error) {
return s.Call(context.Background(), cmd, msg)
}

// Call
func (s *Session) Call(ctx context.Context, cmd string, in *Message) (*Message, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.cc == nil {
return nil, errors.New("session closed")
}

resp, err := s.request(context.Background(), cmd, msg)
resp, err := s.request(ctx, cmd, m)

Check failure on line 209 in vici/session.go

View workflow job for this annotation

GitHub Actions / lint (1.23.x, ubuntu-latest)

undefined: m

Check failure on line 209 in vici/session.go

View workflow job for this annotation

GitHub Actions / test (1.23.x, ubuntu-latest)

undefined: m
if err != nil {
return nil, err
}

return resp, resp.Err()
}

// CallStreaming
func (s *Session) CallStreaming(ctx context.Context, cmd string, event string, in *Message) iter.Seq[*Message, error] {

Check failure on line 218 in vici/session.go

View workflow job for this annotation

GitHub Actions / lint (1.23.x, ubuntu-latest)

too many type arguments for type Seq: have 2, want 1

Check failure on line 218 in vici/session.go

View workflow job for this annotation

GitHub Actions / test (1.23.x, ubuntu-latest)

too many type arguments for type Seq: have 2, want 1
return func(yield func(*Message, error) bool) {

Check failure on line 219 in vici/session.go

View workflow job for this annotation

GitHub Actions / lint (1.23.x, ubuntu-latest)

cannot use func(yield func(*Message, error) bool) {…} (value of type func(yield func(*Message, error) bool)) as iter.Seq[*Message, error] value in return statement

Check failure on line 219 in vici/session.go

View workflow job for this annotation

GitHub Actions / test (1.23.x, ubuntu-latest)

cannot use func(yield func(*Message, error) bool) {…} (value of type func(yield func(*Message, error) bool)) as iter.Seq[*Message, error] value in return statement
s.mu.Lock()
defer s.mu.Unlock()

if s.cc == nil {
yield(nil, errors.New("session closed"))
return
}

if err := s.eventRegister(ctx, event); err != nil {
yield(nil, err)
return
}
// nolint
defer s.eventUnregister(ctx, event)

if err := s.cc.packetWrite(ctx, newPacket(pktCmdRequest, cmd, in)); err != nil {
yield(nil, err)
return
}

for {
p, err := s.cc.packetRead(ctx)
if err != nil {
yield(nil, err)
return
}

switch p.ptype {
case pktEvent:
if !yield(p.msg, p.msg.Err()) {
return
}
case pktCmdResponse:
yield(p.msg, p.msg.Err())
return // End of event stream
default:
yield(nil, fmt.Errorf("%v: %v", errUnexpectedResponse, p.ptype))
return
}
}
}
}

// StreamedCommandRequest sends a streamed command request to the server. StreamedCommandRequest
// behaves like CommandRequest, but accepts an event argument, which specifies the event type
// to stream while the command request is active. The complete stream of messages received from
Expand Down
18 changes: 8 additions & 10 deletions vici/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package vici

import (
"context"
"flag"
"fmt"
"net"
Expand Down Expand Up @@ -84,7 +85,7 @@ func TestCommandRequestAfterClose(t *testing.T) {
// only meant to test the package API, and the specific commands used are out
// of convenience; any command that satisfies the need of the test could be used.
//
// For example, TestStreamedCommandRequest uses the 'list-authorities' command, but
// For example, TestCallStreaming uses the 'list-authorities' command, but
// any event-streaming vici command could be used.
//
// These tests are only run when the -integration flag is set to true.
Expand Down Expand Up @@ -119,10 +120,10 @@ func TestCommandRequest(t *testing.T) {
}
}

// TestStreamedCommandRequest tests StreamedCommandRequest by calling the
// TestCallStreaming tests CallStreaming by calling the
// 'list-authorities' command. Likely, there will be no authorities returned,
// but make sure any Messages that are streamed have non-nil err.
func TestStreamedCommandRequest(t *testing.T) {
func TestCallStreaming(t *testing.T) {
maybeSkipIntegrationTest(t)

s, err := NewSession()
Expand All @@ -131,14 +132,11 @@ func TestStreamedCommandRequest(t *testing.T) {
}
defer s.Close()

ms, err := s.StreamedCommandRequest("list-authorities", "list-authority", nil)
if err != nil {
t.Fatalf("Failed to list authorities: %v", err)
}
resp := s.CallStreaming(context.Background(), "list-authorities", "list-authority", nil)

for i, m := range ms {
if m.Err() != nil {
t.Fatalf("Got error in message #%d: %v", i+1, m.Err())
for m, err := range resp {

Check failure on line 137 in vici/session_test.go

View workflow job for this annotation

GitHub Actions / lint (1.23.x, ubuntu-latest)

cannot range over resp (variable of type iter.Seq[*Message, error]) (typecheck)

Check failure on line 137 in vici/session_test.go

View workflow job for this annotation

GitHub Actions / test (1.23.x, ubuntu-latest)

cannot range over resp (variable of type iter.Seq[*Message, error])
if err != nil {
t.Fatalf("Got error from CallStreaming: %v", err)
}
}
}
Expand Down

0 comments on commit 2c7e180

Please sign in to comment.