Skip to content

Commit

Permalink
Merge pull request #15 from livepeer/vg/feat/os-delete-file
Browse files Browse the repository at this point in the history
drivers: Implement delete methods for S3 and GS
  • Loading branch information
thomshutt authored Feb 1, 2023
2 parents 44cbd85 + 46c1285 commit 1da2279
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 47 deletions.
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
.PHONY: all
all: fmt build test

.PHONY: build
build:
go build ./...

.PHONY: fmt
fmt:
go fmt ./...

.PHONY: tets
test:
go test -race ./...
4 changes: 4 additions & 0 deletions drivers/drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var ext2mime = map[string]string{
}

var ErrFormatMime = fmt.Errorf("unknown file extension")
var ErrNotSupported = fmt.Errorf("not supported")

// NodeStorage is current node's primary driver
var NodeStorage OSDriver
Expand Down Expand Up @@ -131,6 +132,9 @@ type OSSession interface {
// ListFiles return list of files
ListFiles(ctx context.Context, prefix, delim string) (PageInfo, error)

// DeleteFile deletes a single file. 'name' should be the relative filename
DeleteFile(ctx context.Context, name string) error

ReadData(ctx context.Context, name string) (*FileInfoReader, error)
}

Expand Down
5 changes: 5 additions & 0 deletions drivers/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/url"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -121,6 +122,10 @@ func (ostore *FSSession) ListFiles(ctx context.Context, dir, delim string) (Page
return pi, nil
}

func (ostore *FSSession) DeleteFile(ctx context.Context, name string) error {
return os.Remove(filepath.Join(ostore.path, name))
}

func (ostore *FSSession) ReadData(ctx context.Context, name string) (*FileInfoReader, error) {
prefix := ""
if ostore.os.baseURI != nil {
Expand Down
27 changes: 26 additions & 1 deletion drivers/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"bytes"
"context"
"crypto/rand"
"github.com/stretchr/testify/assert"
"io"
"net/url"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func readFile(session *FSSession, name string) []byte {
Expand Down Expand Up @@ -66,3 +69,25 @@ func TestFsOS(t *testing.T) {
assert.Equal(1, len(files.Directories()))
assert.Equal("name1", files.Directories()[0])
}

func TestDeleteFile(t *testing.T) {
file, err := os.CreateTemp("", "TestDeleteFileefix")
require.NoError(t, err)

// Defer a removal of the file so that we don't litter the filesystem when this test fails
defer os.Remove(file.Name())

// Confirm that the file exists
_, err = os.Stat(file.Name())
require.NoError(t, err)

// Try to delete the file
u, err := url.Parse(os.TempDir())
require.NoError(t, err)
sess := NewFSDriver(u).NewSession(os.TempDir())
require.NoError(t, sess.DeleteFile(context.Background(), filepath.Base(file.Name())))

// Check the file no longer exists
_, err = os.Stat(file.Name())
require.ErrorContains(t, err, "no such file or directory")
}
15 changes: 14 additions & 1 deletion drivers/gs.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,20 @@ func (os *gsSession) createClient() error {
return nil
}

func (os *gsSession) DeleteFile(ctx context.Context, name string) error {
if !os.useFullAPI {
return errors.New("delete not supported for non full api")
}
if os.client == nil {
if err := os.createClient(); err != nil {
return err
}
}
return os.client.Bucket(os.bucket).
Object(os.key + "/" + name).
Delete(ctx)
}

func (os *gsSession) SaveData(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error) {
if os.useFullAPI {
if os.client == nil {
Expand Down Expand Up @@ -335,7 +349,6 @@ func gsGetFields(sess *s3Session) map[string]string {
// gsCreatePolicy returns policy, signature
func gsCreatePolicy(signer *gsSigner, bucket, region, path string) (string, string) {
const timeFormat = "2006-01-02T15:04:05.999Z"
const shortTimeFormat = "20060102"

expireAt := time.Now().Add(S3_POLICY_EXPIRE_IN_HOURS * time.Hour)
expireFmt := expireAt.UTC().Format(timeFormat)
Expand Down
8 changes: 6 additions & 2 deletions drivers/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package drivers

import (
"context"
"github.com/livepeer/go-tools/clients"
"io"
"net/http"
"path"
"sync"
"time"

"github.com/livepeer/go-tools/clients"
)

type IpfsOS struct {
Expand All @@ -18,7 +19,6 @@ type IpfsOS struct {
type IpfsSession struct {
os *IpfsOS
filename string
ended bool
client clients.IPFS
dCache map[string]*dataCache
dLock sync.RWMutex
Expand Down Expand Up @@ -107,6 +107,10 @@ func (session *IpfsSession) GetInfo() *OSInfo {
return nil
}

func (ostore *IpfsSession) DeleteFile(ctx context.Context, name string) error {
return ErrNotSupported
}

func (session *IpfsSession) SaveData(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error) {
// concatenate filename with name argument to get full filename, both may be empty
fullPath := session.getAbsolutePath(name)
Expand Down
4 changes: 4 additions & 0 deletions drivers/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (ostore *MemorySession) EndSession() {
ostore.os.lock.Unlock()
}

func (ostore *MemorySession) DeleteFile(ctx context.Context, name string) error {
return ErrNotSupported
}

func (ostore *MemorySession) ListFiles(ctx context.Context, prefix, delim string) (PageInfo, error) {
pi := &singlePageInfo{}
if prefix == "" {
Expand Down
43 changes: 26 additions & 17 deletions drivers/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package drivers

import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"net/url"
"strings"
"testing"

"github.com/stretchr/testify/require"
)

func TestLocalOS(t *testing.T) {
Expand All @@ -18,39 +18,48 @@ func TestLocalOS(t *testing.T) {
defer func() {
dataCacheLen = oldDataCacheLen
}()
assert := assert.New(t)

u, err := url.Parse("fake.com/url")
assert.NoError((err))
require.NoError(t, err)

os := NewMemoryDriver(u)
sess := os.NewSession(("sesspath")).(*MemorySession)
path, err := sess.SaveData(context.TODO(), "name1/1.ts", strings.NewReader(tempData1), nil, 0)
fmt.Println(path)
assert.Equal("fake.com/url/stream/sesspath/name1/1.ts", path)
require.NoError(t, err)
require.Equal(t, "fake.com/url/stream/sesspath/name1/1.ts", path)

data := sess.GetData("sesspath/name1/1.ts")
fmt.Printf("got Data: '%s'\n", data)
assert.Equal(tempData1, string(data))
path, err = sess.SaveData(context.TODO(), "name1/1.ts", strings.NewReader(tempData2), nil, 0)
require.Equal(t, tempData1, string(data))

_, err = sess.SaveData(context.TODO(), "name1/1.ts", strings.NewReader(tempData2), nil, 0)
require.NoError(t, err)

data = sess.GetData("sesspath/name1/1.ts")
assert.Equal(tempData2, string(data))
require.Equal(t, tempData2, string(data))

path, err = sess.SaveData(context.TODO(), "name1/2.ts", strings.NewReader(tempData3), nil, 0)
require.NoError(t, err)

data = sess.GetData("sesspath/name1/2.ts")
assert.Equal(tempData3, string(data))
require.Equal(t, tempData3, string(data))

// Test trim prefix when baseURI != nil
data = sess.GetData(path)
assert.Equal(tempData3, string(data))
require.Equal(t, tempData3, string(data))
data = sess.GetData("sesspath/name1/1.ts")
assert.Nil(data)
require.Nil(t, data)
sess.EndSession()

data = sess.GetData("sesspath/name1/2.ts")
assert.Nil(data)
require.Nil(t, data)

// Test trim prefix when baseURI = nil
os = NewMemoryDriver(nil)
sess = os.NewSession("sesspath").(*MemorySession)
path, err = sess.SaveData(context.TODO(), "name1/1.ts", strings.NewReader(tempData1), nil, 0)
assert.Nil(err)
assert.Equal("/stream/sesspath/name1/1.ts", path)
require.NoError(t, err)
require.Equal(t, "/stream/sesspath/name1/1.ts", path)

data = sess.GetData(path)
assert.Equal(tempData1, string(data))
require.Equal(t, tempData1, string(data))
}
16 changes: 16 additions & 0 deletions drivers/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
"io"
"mime/multipart"
Expand Down Expand Up @@ -372,6 +373,21 @@ func (os *s3Session) saveDataPut(ctx context.Context, name string, data io.Reade
return url, nil
}

func (os *s3Session) DeleteFile(ctx context.Context, name string) error {
if os.s3svc == nil {
return errors.New("delete not supported for non full api")
}
params := &s3.DeleteObjectInput{
Bucket: aws.String(os.bucket),
Key: aws.String(name),
}
if os.key != "" && !strings.HasPrefix(name, os.key+"/") {
params.Key = aws.String(path.Join(os.key, name))
}
_, err := os.s3svc.DeleteObjectWithContext(ctx, params)
return err
}

func (os *s3Session) SaveData(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error) {
if os.s3svc != nil {
return os.saveDataPut(ctx, name, data, meta, timeout)
Expand Down
Loading

0 comments on commit 1da2279

Please sign in to comment.