Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BufferedClient Options & Compression #59

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 167 additions & 42 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package gotsrpc

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"net/http"

"github.com/golang/snappy"
"github.com/pkg/errors"
"github.com/ugorji/go/codec"
)
Expand All @@ -15,35 +17,42 @@ const (
HeaderServiceToService = "X-Foomo-S2S"
)

type Compressor int

const (
CompressorNone Compressor = iota
CompressorGZIP
CompressorSnappy
)

func (c Compressor) String() string {
switch c {
case CompressorNone:
return "none"
case CompressorGZIP:
return "gzip"
case CompressorSnappy:
return "snappy"
default:
return "unknown"
}
}

// ClientTransport to use for calls
// var ClientTransport = &http.Transport{}

var _ Client = &bufferedClient{}
var _ Client = &BufferedClient{}

type Client interface {
Call(ctx context.Context, url string, endpoint string, method string, args []interface{}, reply []interface{}) (err error)
SetClientEncoding(encoding ClientEncoding)
SetTransportHttpClient(client *http.Client)
SetDefaultHeaders(headers http.Header)
}

func NewClient() Client {
return &bufferedClient{client: defaultHttpFactory(), handle: getHandleForEncoding(EncodingMsgpack), headers: nil}
}

func NewClientWithHttpClient(client *http.Client) Client { //nolint:stylecheck
if client != nil {
return &bufferedClient{client: client, handle: getHandleForEncoding(EncodingMsgpack), headers: nil}
} else {
return &bufferedClient{client: defaultHttpFactory(), handle: getHandleForEncoding(EncodingMsgpack), headers: nil}
}
return NewBufferedClient(WithHTTPClient(client))
}

func newRequest(ctx context.Context, url string, contentType string, buffer *bytes.Buffer, headers http.Header) (r *http.Request, err error) {
if buffer == nil {
buffer = &bytes.Buffer{}
}
request, errRequest := http.NewRequestWithContext(ctx, http.MethodPost, url, buffer)
func newRequest(ctx context.Context, url string, contentType string, reader io.Reader, headers http.Header) (r *http.Request, err error) {
request, errRequest := http.NewRequestWithContext(ctx, http.MethodPost, url, reader)
if errRequest != nil {
return nil, errors.Wrap(errRequest, "could not create a request")
}
Expand All @@ -57,48 +66,142 @@ func newRequest(ctx context.Context, url string, contentType string, buffer *byt
return request, nil
}

type bufferedClient struct {
client *http.Client
handle *clientHandle
headers http.Header
type BufferedClient struct {
client *http.Client
handle *clientHandle
headers http.Header
compressor Compressor
}

// ClientOption is a function that configures a BufferedClient.
type ClientOption func(*BufferedClient)

// WithHTTPClient allows you to specify a custom *http.Client.
func WithHTTPClient(c *http.Client) ClientOption {
return func(bc *BufferedClient) {
if c == nil {
bc.client = defaultHttpFactory()
} else {
bc.client = c
}
}
}

func WithClientEncoding(encoding ClientEncoding) ClientOption {
return func(bc *BufferedClient) {
bc.handle = getHandleForType(encoding)
}
}

// WithHeaders allows you to specify custom HTTP headers.
func WithHeaders(h http.Header) ClientOption {
return func(bc *BufferedClient) {
bc.headers = h
}
}

func WithCompressor(compressor Compressor) ClientOption {
return func(bc *BufferedClient) {
bc.compressor = compressor
}
}

func (c *bufferedClient) SetDefaultHeaders(headers http.Header) {
c.headers = headers
func WithSnappyCompression() ClientOption {
return WithCompressor(CompressorSnappy)
}

func (c *bufferedClient) SetClientEncoding(encoding ClientEncoding) {
c.handle = getHandleForEncoding(encoding)
func WithGZIPCompression() ClientOption {
return WithCompressor(CompressorGZIP)
}

func (c *bufferedClient) SetTransportHttpClient(client *http.Client) { //nolint:stylecheck
c.client = client
func WithNoCompression() ClientOption {
return WithCompressor(CompressorNone)
}

// NewBufferedClient is the constructor that applies all functional options.
func NewBufferedClient(opts ...ClientOption) *BufferedClient {
// Set reasonable defaults here
bc := &BufferedClient{
client: defaultHttpFactory(),
headers: make(http.Header),
handle: getHandleForType(EncodingMsgpack),
compressor: CompressorNone,
}

// Apply each option
for _, opt := range opts {
opt(bc)
}
return bc
}

// Call calls a method on the remove service
func (c *bufferedClient) Call(ctx context.Context, url string, endpoint string, method string, args []interface{}, reply []interface{}) error {
func (c *BufferedClient) Call(ctx context.Context, url string, endpoint string, method string, args []interface{}, reply []interface{}) error {
// Marshall args
b := new(bytes.Buffer)
buffer := &bytes.Buffer{}

// If no arguments are set, remove
if len(args) > 0 {
if err := codec.NewEncoder(b, c.handle.handle).Encode(args); err != nil {
return NewClientError(errors.Wrap(err, "failed to encode arguments"))

var encodeWriter io.Writer
switch c.compressor {
case CompressorGZIP:
if gzipWriter, ok := globalCompressorWriterPools[CompressorGZIP].Get().(*gzip.Writer); ok {
gzipWriter.Reset(buffer)

defer globalCompressorWriterPools[CompressorGZIP].Put(gzipWriter)

encodeWriter = gzipWriter
}
case CompressorSnappy:
if snappyWriter, ok := globalCompressorWriterPools[CompressorSnappy].Get().(*snappy.Writer); ok {
snappyWriter.Reset(buffer)

defer globalCompressorWriterPools[CompressorSnappy].Put(snappyWriter)
encodeWriter = snappyWriter
}
case CompressorNone:
encodeWriter = buffer
default:
encodeWriter = buffer
}

err := codec.NewEncoder(encodeWriter, c.handle.handle).Encode(args)
if err != nil {
return errors.Wrap(err, "could not encode data")
}

if writer, ok := encodeWriter.(io.Closer); ok {
if err = writer.Close(); err != nil {
return errors.Wrap(err, "failed to write to request body")
}
}

// Create post url
postURL := fmt.Sprintf("%s%s/%s", url, endpoint, method)
req, err := newRequest(ctx, postURL, c.handle.contentType, buffer, c.headers.Clone())
if err != nil {
return NewClientError(errors.Wrap(err, "failed to create request"))
}

// Create request
request, errRequest := newRequest(ctx, postURL, c.handle.contentType, b, c.headers.Clone())
if errRequest != nil {
return NewClientError(errors.Wrap(errRequest, "failed to create request"))
switch c.compressor {
case CompressorGZIP:
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("Accept-Encoding", "gzip")
case CompressorSnappy:
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Accept-Encoding", "snappy")
case CompressorNone:
// Disable Automatic Compression
// https://http.dev/accept-encoding
req.Header.Set("Accept-Encoding", "identity")
// uncompressed, nothing to do
default:
// uncompressed, nothing to do
}

resp, errDo := c.client.Do(request)
if errDo != nil {
return NewClientError(errors.Wrap(errDo, "failed to send request"))
resp, err := c.client.Do(req)
if err != nil {
return NewClientError(errors.Wrap(err, "failed to send request"))
}
defer resp.Body.Close()

Expand All @@ -112,7 +215,6 @@ func (c *bufferedClient) Call(ctx context.Context, url string, endpoint string,
}
return NewClientError(NewHTTPError(msg, resp.StatusCode))
}

clientHandle := getHandlerForContentType(resp.Header.Get("Content-Type"))

wrappedReply := reply
Expand All @@ -124,7 +226,30 @@ func (c *bufferedClient) Call(ctx context.Context, url string, endpoint string,
}
}

if err := codec.NewDecoder(resp.Body, clientHandle.handle).Decode(wrappedReply); err != nil {
var responseBodyReader io.Reader
switch resp.Header.Get("Content-Encoding") {
case "snappy":
if snappyReader, ok := globalCompressorReaderPools[CompressorSnappy].Get().(*snappy.Reader); ok {
defer globalCompressorReaderPools[CompressorSnappy].Put(snappyReader)

snappyReader.Reset(resp.Body)
responseBodyReader = snappyReader
}
case "gzip":
if gzipReader, ok := globalCompressorReaderPools[CompressorGZIP].Get().(*gzip.Reader); ok {
defer globalCompressorReaderPools[CompressorGZIP].Put(gzipReader)

err := gzipReader.Reset(resp.Body)
if err != nil {
return NewClientError(errors.Wrap(err, "could not create gzip reader"))
}
responseBodyReader = gzipReader
}
default:
responseBodyReader = resp.Body
}

if err := codec.NewDecoder(responseBodyReader, clientHandle.handle).Decode(wrappedReply); err != nil {
return NewClientError(errors.Wrap(err, "failed to decode response"))
}

Expand Down
Loading
Loading