From 78c4a5917ac65cc7cb5323718bd765024e55c4e9 Mon Sep 17 00:00:00 2001 From: David Thorpe Date: Sat, 27 Jul 2024 16:45:21 +0200 Subject: [PATCH] Added JSON streaming callback --- client.go | 21 +++++++++++++++++++-- pkg/multipart/multipart.go | 2 +- requestopts.go | 9 +++++++++ 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/client.go b/client.go index de5120e..7666ffa 100644 --- a/client.go +++ b/client.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "encoding/xml" + "errors" "fmt" "io" "mime" @@ -47,6 +48,10 @@ type Client struct { type ClientOpt func(*Client) error +// Callback for json stream events, return an error if you want to stop streaming +// with an error and io.EOF if you want to stop streaming and return success +type JsonStreamCallback func(v any) error + /////////////////////////////////////////////////////////////////////////////// // GLOBALS @@ -302,8 +307,20 @@ func do(client *http.Client, req *http.Request, accept string, strict bool, out // Decode the body switch mimetype { case ContentTypeJson: - if err := json.NewDecoder(response.Body).Decode(out); err != nil { - return err + // JSON decode is streamable + dec := json.NewDecoder(response.Body) + for { + if err := dec.Decode(out); err == io.EOF { + break + } else if err != nil { + return err + } else if reqopts.jsonStreamCallback != nil { + if err := reqopts.jsonStreamCallback(out); errors.Is(err, io.EOF) { + break + } else if err != nil { + return err + } + } } case ContentTypeTextStream: if err := NewTextStream().Decode(response.Body, reqopts.textStreamCallback); err != nil { diff --git a/pkg/multipart/multipart.go b/pkg/multipart/multipart.go index a6d91dc..55d064d 100644 --- a/pkg/multipart/multipart.go +++ b/pkg/multipart/multipart.go @@ -55,7 +55,7 @@ func NewMultipartEncoder(w io.Writer) *Encoder { } } -// NewFormEncoder creates a new encoder object, whichwrites +// NewFormEncoder creates a new encoder object, which writes // application/x-www-form-urlencoded to the io.Writer func NewFormEncoder(w io.Writer) *Encoder { return &Encoder{ diff --git a/requestopts.go b/requestopts.go index b026301..46729b2 100644 --- a/requestopts.go +++ b/requestopts.go @@ -17,6 +17,7 @@ type requestOpts struct { *http.Request noTimeout bool // OptNoTimeout textStreamCallback TextStreamCallback // OptTextStreamCallback + jsonStreamCallback JsonStreamCallback // OptJsonStreamCallback } type RequestOpt func(*requestOpts) error @@ -103,3 +104,11 @@ func OptTextStreamCallback(fn TextStreamCallback) RequestOpt { return nil } } + +// OptJsonStreamCallback is called for each decoded JSON event +func OptJsonStreamCallback(fn JsonStreamCallback) RequestOpt { + return func(r *requestOpts) error { + r.jsonStreamCallback = fn + return nil + } +}