Skip to content

Commit

Permalink
Merge pull request #26 from mutablelogic/v1
Browse files Browse the repository at this point in the history
Added streaming chat, and tool calling for openai
  • Loading branch information
djthorpe authored May 24, 2024
2 parents 2ec7d13 + d26484c commit f96eb6f
Show file tree
Hide file tree
Showing 15 changed files with 576 additions and 135 deletions.
123 changes: 95 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This repository contains a generic HTTP client which can be adapted to provide:
* Ability to send files and data of type `multipart/form-data`
* Ability to send data of type `application/x-www-form-urlencoded`
* Debugging capabilities to see the request and response data
* Streaming JSON responses
* Streaming text events

API Documentation: https://pkg.go.dev/github.com/mutablelogic/go-client

Expand Down Expand Up @@ -58,28 +58,31 @@ func main() {
Various options can be passed to the client `New` method to control its behaviour:

* `OptEndpoint(value string)` sets the endpoint for all requests
* `OptTimeout(value time.Duration)` sets the timeout on any request, which defaults to 30 seconds
* `OptUserAgent(value string)` sets the user agent string on each API request
* `OptTrace(w io.Writer, verbose bool)` allows you to debug the request and response data.
When `verbose` is set to true, it also displays the payloads
* `OptStrict()` turns on strict content type checking on anything returned from the API
* `OptRateLimit(value float32)` sets the limit on number of requests per second and the API will sleep to regulate
the rate limit when exceeded
* `OptReqToken(value Token)` sets a request token for all client requests. This can be overridden by the client
for individual requests using `OptToken`
* `OptSkipVerify()` skips TLS certificate domain verification
* `OptHeader(key, value string)` appends a custom header to each request
* `OptTimeout(value time.Duration)` sets the timeout on any request, which defaults to 30 seconds.
Timeouts can be ignored on a request-by-request basis using the `OptNoTimeout` option (see below).
* `OptUserAgent(value string)` sets the user agent string on each API request.
* `OptTrace(w io.Writer, verbose bool)` allows you to debug the request and response data.
When `verbose` is set to true, it also displays the payloads.
* `OptStrict()` turns on strict content type checking on anything returned from the API.
* `OptRateLimit(value float32)` sets the limit on number of requests per second and the API
will sleep to regulate the rate limit when exceeded.
* `OptReqToken(value Token)` sets a request token for all client requests. This can be
overridden by the client for individual requests using `OptToken` (see below).
* `OptSkipVerify()` skips TLS certificate domain verification.
* `OptHeader(key, value string)` appends a custom header to each request.

## Usage with a payload

The first argument to the `Do` method is the payload to send to the server, when set. You can create a payload
using the following methods:
The first argument to the `Do` method is the payload to send to the server, when set.
You can create a payload using the following methods:

* `client.NewRequest()` returns a new empty payload which defaults to GET.
* `client.NewJSONRequest(payload any, accept string)` returns a new request with a JSON payload which defaults to POST.
* `client.NewMultipartRequest(payload any, accept string)` returns a new request with a Multipart Form data payload which
defaults to POST.
* `client.NewFormRequest(payload any, accept string)` returns a new request with a Form data payload which defaults to POST.
* `client.NewJSONRequest(payload any, accept string)` returns a new request with
a JSON payload which defaults to POST.
* `client.NewMultipartRequest(payload any, accept string)` returns a new request with
a Multipart Form data payload which defaults to POST.
* `client.NewFormRequest(payload any, accept string)` returns a new request with a
Form data payload which defaults to POST.

For example,

Expand Down Expand Up @@ -131,7 +134,7 @@ type Payload interface {

## Request options

The signature of the `Do` method is:
The signature of the `Do` method is as follows:

```go
type Client interface {
Expand All @@ -143,16 +146,19 @@ type Client interface {
}
```

Various options can be passed to modify each individual request when using the `Do` method:
If you pass a context to the `DoWithContext` method, then the request can be
cancelled using the context in addition to the timeout. Various options can be passed to
modify each individual request when using the `Do` method:

* `OptReqEndpoint(value string)` sets the endpoint for the request
* `OptPath(value ...string)` appends path elements onto a request endpoint
* `OptToken(value Token)` adds an authorization header (overrides the client OptReqToken option)
* `OptQuery(value url.Values)` sets the query parameters to a request
* `OptHeader(key, value string)` appends a custom header to the request
* `OptResponse(func() error)` allows you to set a callback function to process a streaming response.
See below for more details.
* `OptHeader(key, value string)` sets a custom header to the request
* `OptNoTimeout()` disables the timeout on the request, which is useful for long running requests
* `OptTextStreamCallback(func(TextStreamCallback) error)` allows you to set a callback
function to process a streaming text response of type `text/event-stream`. See below for
more details.

## Authentication

Expand Down Expand Up @@ -185,10 +191,45 @@ You can also set the token on a per-request basis using the `OptToken` option in

You can create a payload with form data:

* `client.NewFormRequest(payload any, accept string)` returns a new request with a Form data payload which defaults to POST.
* `client.NewMultipartRequest(payload any, accept string)` returns a new request with a Multipart Form data payload which defaults to POST. This is useful for file uploads.
* `client.NewFormRequest(payload any, accept string)` returns a new request with a Form
data payload which defaults to POST.
* `client.NewMultipartRequest(payload any, accept string)` returns a new request with
a Multipart Form data payload which defaults to POST. This is useful for file uploads.

The payload should be a `struct` where the fields are converted to form tuples. File uploads require a field of type `multipart.File`.
The payload should be a `struct` where the fields are converted to form tuples. File uploads require a field of type `multipart.File`. For example,

```go
package main

import (
client "github.com/mutablelogic/go-client"
multipart "github.com/mutablelogic/go-client/pkg/multipart"
)

type FileUpload struct {
File multipart.File `json:"file"`
}

func main() {
// Create a new client
c := client.New(client.OptEndpoint("https://api.example.com/api/v1"))

// Create a file upload request
request := FileUpload{
File: multipart.File{
Path: "helloworld.txt",
Body: strings.NewReader("Hello, world!"),
}
}

// Upload a file
if payload, err := client.NewMultipartRequest(request, "*/*"); err != nil {
// Handle error
} else if err := c.Do(payload, &response, OptPath("upload")); err != nil {
// Handle error
}
}
```

## Unmarshalling responses

Expand All @@ -202,6 +243,32 @@ type Unmarshaler interface {

## Streaming Responses

If the returned content is a stream of JSON responses, then you can use the `OptResponse(fn func() error)` option, which
will be called by the `Do` method for each response. The function should return an error if the stream should be terminated.
The client implements a streaming text event callback which can be used to process a stream of text events, as per the [Mozilla specification](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events).

In order to process streamed events, pass the `OptTextStreamCallback()` option to the request
with a callback function, which should have the following signature:

```go
func Callback(event client.TextStreamEvent) error {
// Finish processing successfully
if event.Event == "close" {
return io.EOF
}

// Decode the data into a JSON object
var data map[string]any
if err := event.Json(data); err != nil {
return err
}

// Return success - continue streaming
return nil
}
```

The `TextStreamEvent` object has the following

If you return an error of type `io.EOF` from the callback, then the stream will be closed.
Similarly, if you return any other error the stream will be closed and the error returned.

Usually, you would pair this option with `OptNoTimeout` to prevent the request from timing out.
34 changes: 8 additions & 26 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"io"
"mime"
Expand Down Expand Up @@ -57,7 +56,6 @@ const (
PathSeparator = string(os.PathSeparator)
ContentTypeAny = "*/*"
ContentTypeJson = "application/json"
ContentTypeJsonStream = "application/x-ndjson"
ContentTypeTextXml = "text/xml"
ContentTypeApplicationXml = "application/xml"
ContentTypeTextPlain = "text/plain"
Expand Down Expand Up @@ -301,31 +299,20 @@ func do(client *http.Client, req *http.Request, accept string, strict bool, out
return nil
}

// Decode the body - and call any callback once the body has been decoded
// Decode the body
switch mimetype {
case ContentTypeJson, ContentTypeJsonStream:
dec := json.NewDecoder(response.Body)
for {
if err := dec.Decode(out); errors.Is(err, io.EOF) {
break
} else if err != nil {
return err
}
if reqopts.callback != nil {
if err := reqopts.callback(); err != nil {
return err
}
}
case ContentTypeJson:
if err := json.NewDecoder(response.Body).Decode(out); err != nil {
return err
}
case ContentTypeTextStream:
if err := NewTextStream().Decode(response.Body, reqopts.textStreamCallback); err != nil {
return err
}
case ContentTypeTextXml, ContentTypeApplicationXml:
if err := xml.NewDecoder(response.Body).Decode(out); err != nil {
return err
}
if reqopts.callback != nil {
if err := reqopts.callback(); err != nil {
return err
}
}
default:
if v, ok := out.(Unmarshaler); ok {
return v.Unmarshal(mimetype, response.Body)
Expand All @@ -336,11 +323,6 @@ func do(client *http.Client, req *http.Request, accept string, strict bool, out
} else {
return ErrInternalAppError.Withf("do: response does not implement Unmarshaler for %q", mimetype)
}
if reqopts.callback != nil {
if err := reqopts.callback(); err != nil {
return err
}
}
}

// Return success
Expand Down
2 changes: 1 addition & 1 deletion clientopts.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func OptUserAgent(value string) ClientOpt {
// Setting verbose to true also displays the JSON response
func OptTrace(w io.Writer, verbose bool) ClientOpt {
return func(client *Client) error {
client.Client.Transport = NewLogTransport(w, client.Client.Transport, verbose)
client.Client.Transport = newLogTransport(w, client.Client.Transport, verbose)
return nil
}
}
Expand Down
23 changes: 21 additions & 2 deletions cmd/api/openai.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,21 @@ func openaiChat(ctx context.Context, w *tablewriter.Writer, args []string) error
opts = append(opts, openai.OptResponseFormat(openaiResponseFormat))
}
if openaiStream {
opts = append(opts, openai.OptStream())
opts = append(opts, openai.OptStream(func(choice schema.MessageChoice) {
w := w.Output()
if choice.Delta == nil {
return
}
if choice.Delta.Role != "" {
fmt.Fprintf(w, "\nrole: %q\n", choice.Delta.Role)
}
if choice.Delta.Content != "" {
fmt.Fprintf(w, "%v", choice.Delta.Content)
}
if choice.FinishReason != "" {
fmt.Printf("\nfinish_reason: %q\n", choice.FinishReason)
}
}))
}
if openaiUser != "" {
opts = append(opts, openai.OptUser(openaiUser))
Expand All @@ -243,7 +257,12 @@ func openaiChat(ctx context.Context, w *tablewriter.Writer, args []string) error
return err
}

return w.Write(responses)
// Write table (if not streaming)
if !openaiStream {
return w.Write(responses)
} else {
return nil
}
}

func openaiTranscribe(ctx context.Context, w *tablewriter.Writer, args []string) error {
Expand Down
7 changes: 1 addition & 6 deletions pkg/ollama/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,7 @@ func (c *Client) ChatGenerate(ctx context.Context, model, prompt string, opts ..
// Create a new request
if req, err := client.NewJSONRequest(request); err != nil {
return response.ChatStatus, err
} else if err := c.DoWithContext(ctx, req, &response, client.OptPath("generate"), client.OptNoTimeout(), client.OptResponse(func() error {
if request.callback != nil && response.Response != "" {
request.callback(response.Response)
}
return nil
})); err != nil {
} else if err := c.DoWithContext(ctx, req, &response, client.OptPath("generate"), client.OptNoTimeout()); err != nil {
return response.ChatStatus, err
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/ollama/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ollama
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"

Expand Down Expand Up @@ -148,10 +147,7 @@ func (c *Client) PullModel(ctx context.Context, name string) error {

// Send the request
var response respPullModel
return c.DoWithContext(ctx, req, &response, client.OptPath("pull"), client.OptNoTimeout(), client.OptResponse(func() error {
fmt.Println("TOOD:", response)
return nil
}))
return c.DoWithContext(ctx, req, &response, client.OptPath("pull"), client.OptNoTimeout())
}

// Create a new model with a name and contents of the Modelfile
Expand Down
Loading

0 comments on commit f96eb6f

Please sign in to comment.