Skip to content

Commit

Permalink
Simplify implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
rmikalkenas committed Mar 18, 2024
1 parent 3207961 commit a7cb47c
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 73 deletions.
83 changes: 83 additions & 0 deletions bpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package sendremotefile

import (
"sync"
)

const (
OneMB uint = 1024 * 1024 * 1
FiveMB uint = 1024 * 1024 * 5
TenMB uint = 1024 * 1024 * 10
)

type bpool struct {
*sync.Map
}

func NewBytePool() *bpool {
var frameChunkedPool = &sync.Map{}
var preallocate = &sync.Once{}
preallocate.Do(internalAllocate(frameChunkedPool))

return &bpool{
frameChunkedPool,
}
}

func internalAllocate(frameChunkedPool *sync.Map) func() {
return func() {
pool1 := &sync.Pool{
New: func() any {
data := make([]byte, OneMB)
return &data
},
}
pool5 := &sync.Pool{
New: func() any {
data := make([]byte, FiveMB)
return &data
},
}
pool10 := &sync.Pool{
New: func() any {
data := make([]byte, TenMB)
return &data
},
}

frameChunkedPool.Store(OneMB, pool1)
frameChunkedPool.Store(FiveMB, pool5)
frameChunkedPool.Store(TenMB, pool10)
}
}

func (bpool *bpool) get(size uint) *[]byte {
switch {
case size <= OneMB:
val, _ := bpool.Load(OneMB)
return val.(*sync.Pool).Get().(*[]byte)
case size <= FiveMB:
val, _ := bpool.Load(FiveMB)
return val.(*sync.Pool).Get().(*[]byte)
default:
val, _ := bpool.Load(TenMB)
return val.(*sync.Pool).Get().(*[]byte)
}
}

func (bpool *bpool) put(size uint, data *[]byte) {
switch {
case size <= OneMB:
pool, _ := bpool.Load(OneMB)
pool.(*sync.Pool).Put(data)
return
case size <= FiveMB:
pool, _ := bpool.Load(FiveMB)
pool.(*sync.Pool).Put(data)
return
default:
pool, _ := bpool.Load(TenMB)
pool.(*sync.Pool).Put(data)
return
}
}
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewClient(url string, timeout time.Duration) *client {
}

func (c *client) Request() (*http.Response, error) {
req, err := http.NewRequest("GET", c.url, nil)
req, err := http.NewRequest(http.MethodGet, c.url, nil)
if err != nil {
return nil, err
}
Expand Down
90 changes: 36 additions & 54 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@ import (
"net"
"net/http"
"strings"
"sync"
"time"

rrErrors "github.com/roadrunner-server/errors"
"go.uber.org/zap"
)

const (
RootPluginName string = "http"
PluginName string = "sendremotefile"
ContentTypeKey string = "Content-Type"
ContentTypeVal string = "application/octet-stream"
xSendRemoteHeader string = "X-Sendremotefile"
defaultBufferSize int = 10 * 1024 * 1024 // 10MB chunks
timeout time.Duration = 5 * time.Second
rootPluginName string = "http"
pluginName string = "sendremotefile"
responseContentTypeKey string = "Content-Type"
responseContentTypeVal string = "application/octet-stream"
responseStatusCode int = http.StatusOK
xSendRemoteHeader string = "X-Sendremotefile"
defaultBufferSize uint = TenMB
timeout time.Duration = 5 * time.Second
)

type Configurer interface {
Expand All @@ -35,36 +35,29 @@ type Logger interface {

type Plugin struct {
log *zap.Logger
writersPool sync.Pool
bytesPool *bpool
writersPool *wpool
}

func (p *Plugin) Init(cfg Configurer, log Logger) error {
const op = rrErrors.Op("sendremotefile_plugin_init")

if !cfg.Has(RootPluginName) {
if !cfg.Has(rootPluginName) {
return rrErrors.E(op, rrErrors.Disabled)
}

p.log = log.NamedLogger(PluginName)

p.writersPool = sync.Pool{
New: func() any {
wr := new(writer)
wr.code = http.StatusOK
wr.data = make([]byte, 0, 10)
wr.hdrToSend = make(map[string][]string, 2)
return wr
},
}
p.log = log.NamedLogger(pluginName)
p.bytesPool = NewBytePool()
p.writersPool = NewWriterPool()

return nil
}

func (p *Plugin) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rrWriter := p.getWriter()
rrWriter := p.writersPool.get()
defer func() {
p.putWriter(rrWriter)
p.writersPool.put(rrWriter)
_ = r.Body.Close()
}()

Expand All @@ -73,9 +66,10 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
// if there is no X-Sendremotefile header from the PHP worker, just return
if url := rrWriter.Header().Get(xSendRemoteHeader); url == "" {
// re-add original headers, status code and body
maps.Copy(w.Header(), rrWriter.hdrToSend)
maps.Copy(w.Header(), rrWriter.Header())
w.WriteHeader(rrWriter.code)
if len(rrWriter.data) > 0 {
// write a body if exists
_, err := w.Write(rrWriter.data)
if err != nil {
p.log.Error("failed to write data to the response", zap.Error(err))
Expand Down Expand Up @@ -110,37 +104,38 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
}

defer func() {
_ = resp.Body.Close()
err = resp.Body.Close()
if err != nil {
p.log.Error("failed to close upstream response body", zap.Error(err))
}
}()

if resp.StatusCode != http.StatusOK {
p.log.Error("invalid upstream response status code", zap.Int("status_code", resp.StatusCode))
p.log.Error("invalid upstream response status code", zap.Int("rr_response_code", rrWriter.code), zap.Int("remotefile_response_code", resp.StatusCode))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}

var buf []byte
// do not allocate large buffer for the small files
if size := resp.ContentLength; size > 0 && size < int64(defaultBufferSize) {
// allocate based on provided content length
buf = make([]byte, size)
} else {
// allocate default buffer
buf = make([]byte, defaultBufferSize)
var pl = defaultBufferSize
if cl := resp.ContentLength; cl > 0 {
pl = uint(cl)
}

pb := p.bytesPool.get(pl)

// re-add original headers
maps.Copy(w.Header(), rrWriter.hdrToSend)
maps.Copy(w.Header(), rrWriter.Header())
// overwrite content-type header
w.Header().Set(ContentTypeKey, ContentTypeVal)
w.Header().Set(responseContentTypeKey, responseContentTypeVal)
w.WriteHeader(responseStatusCode)

rc := http.NewResponseController(w)

for {
nr, er := resp.Body.Read(buf)
nr, er := resp.Body.Read(*pb)

if nr > 0 {
nw, ew := w.Write(buf[0:nr])
nw, ew := w.Write((*pb)[:nr])

if nw > 0 {
if ef := rc.Flush(); ef != nil {
Expand All @@ -164,25 +159,12 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
break
}
}

p.bytesPool.put(pl, pb)
})
}

// Middleware/plugin name.
func (p *Plugin) Name() string {
return PluginName
}

func (p *Plugin) getWriter() *writer {
return p.writersPool.Get().(*writer)
}

func (p *Plugin) putWriter(w *writer) {
w.code = http.StatusOK
w.data = make([]byte, 0, 10)

for k := range w.hdrToSend {
delete(w.hdrToSend, k)
}

p.writersPool.Put(w)
return pluginName
}
Binary file added tests/data/1MB.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 3 additions & 3 deletions tests/php_test_files/psr-worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
break;

case "/remote-file":
$resp = new Response(200, ["X-Sendremotefile" => "http://127.0.0.1:18953/file", "Content-Disposition" => "attachment; filename=composer.json"]);
$resp = new Response(200, ["X-Sendremotefile" => "http://127.0.0.1:18953/file", "Content-Disposition" => "attachment; filename=1MB.jpg"]);
break;

case "/remote-file-not-found":
Expand All @@ -31,12 +31,12 @@
break;

case "/file":
$resp = new Response(200, ["Content-Type" => "text/plain"], $psr17Factory->createStreamFromFile(__DIR__ . "/composer.json"));
$resp = new Response(200, ["Content-Type" => "image/jpeg"], $psr17Factory->createStreamFromFile(__DIR__ . "/../data/1MB.jpg"));
break;

case "/file-timeout":
usleep(5_500_000);
$resp = new Response(200, ["Content-Type" => "text/plain"], $psr17Factory->createStreamFromFile(__DIR__ . "/composer.json"));
$resp = new Response(200, ["Content-Type" => "image/jpeg"], $psr17Factory->createStreamFromFile(__DIR__ . "/../data/1MB.jpg"));
break;

default:
Expand Down
29 changes: 14 additions & 15 deletions tests/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"net/http"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"testing"
Expand All @@ -29,7 +28,7 @@ func TestSendremotefileInit(t *testing.T) {
cont := endure.New(slog.LevelDebug)

cfg := &config.Plugin{
Version: "2.9.0",
Version: "2023.3.0",
Path: "configs/.rr-with-sendremotefile.yaml",
Prefix: "rr",
}
Expand All @@ -44,9 +43,7 @@ func TestSendremotefileInit(t *testing.T) {
assert.NoError(t, err)

err = cont.Init()
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

ch, err := cont.Serve()
assert.NoError(t, err)
Expand Down Expand Up @@ -95,7 +92,7 @@ func TestSendremotefileDisabled(t *testing.T) {
cont := endure.New(slog.LevelDebug)

cfg := &config.Plugin{
Version: "2.9.0",
Version: "2023.3.0",
Path: "configs/.rr-with-disabled-sendremotefile.yaml",
Prefix: "rr",
}
Expand All @@ -110,9 +107,7 @@ func TestSendremotefileDisabled(t *testing.T) {
assert.NoError(t, err)

err = cont.Init()
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

ch, err := cont.Serve()
assert.NoError(t, err)
Expand Down Expand Up @@ -174,7 +169,7 @@ func TestSendremotefileFileStream(t *testing.T) {
cont := endure.New(slog.LevelDebug)

cfg := &config.Plugin{
Version: "2.9.0",
Version: "2023.3.0",
Path: "configs/.rr-with-sendremotefile.yaml",
Prefix: "rr",
}
Expand All @@ -191,9 +186,7 @@ func TestSendremotefileFileStream(t *testing.T) {
assert.NoError(t, err)

err = cont.Init()
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

ch, err := cont.Serve()
assert.NoError(t, err)
Expand Down Expand Up @@ -250,10 +243,16 @@ func remoteFileCheck(t *testing.T) {
b, err := io.ReadAll(r.Body)
require.NoError(t, err)

assert.True(t, strings.Contains(string(b), "roadrunner/sendremotefile-tests"))
file, err := os.Open("./data/1MB.jpg")
require.NoError(t, err)
defer file.Close()
fs, err := file.Stat()
require.NoError(t, err)

assert.Equal(t, int(fs.Size()), len(b))
assert.Equal(t, 200, r.StatusCode)
assert.Equal(t, "", r.Header.Get("X-Sendremotefile"))
assert.Equal(t, "attachment; filename=composer.json", r.Header.Get("Content-Disposition"))
assert.Equal(t, "attachment; filename=1MB.jpg", r.Header.Get("Content-Disposition"))
assert.Equal(t, "application/octet-stream", r.Header.Get("Content-Type"))

err = r.Body.Close()
Expand Down
Loading

0 comments on commit a7cb47c

Please sign in to comment.