Skip to content

Commit

Permalink
Merge pull request #10 from cosmo0920/add-endpoint
Browse files Browse the repository at this point in the history
Add endpoint parameter
  • Loading branch information
cosmo0920 authored Dec 11, 2019
2 parents b46990b + 96072a9 commit 9a2e188
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 9 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ $ make
| S3Prefix | S3Prefix of S3 key | `-` | Mandatory parameter |
| Region | Region of S3 | `-` | Mandatory parameter |
| Compress | Choose Compress method | `""` | gzip or plainText(`""`) |
| Endpoint | Specify the endpoint URL | `""` | URL with port or empty string |

Example:

Expand All @@ -83,6 +84,8 @@ Add this section to fluent-bit.conf:
S3Prefix yours3prefixname
Region us-east-1
Compress gzip
# Endpoint parameter is mainly used for minio.
# Endpoint http://localhost:9000
```

fluent-bit-go-s3 supports the following credentials. Users must specify one of them:
Expand Down
13 changes: 10 additions & 3 deletions out_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ func newS3Output(ctx unsafe.Pointer, operatorID int) (*s3, error) {
s3prefix := plugin.PluginConfigKey(ctx, "S3Prefix")
region := plugin.PluginConfigKey(ctx, "Region")
compress := plugin.PluginConfigKey(ctx, "Compress")
endpoint := plugin.PluginConfigKey(ctx, "Endpoint")

config, err := getS3Config(accessKeyID, secretAccessKey, credential, s3prefix, bucket, region, compress)
config, err := getS3Config(accessKeyID, secretAccessKey, credential, s3prefix, bucket, region, compress, endpoint)
if err != nil {
return nil, err
}
Expand All @@ -134,11 +135,17 @@ func newS3Output(ctx unsafe.Pointer, operatorID int) (*s3, error) {
fmt.Printf("[flb-go %d] plugin s3prefix parameter = '%s'\n", operatorID, s3prefix)
fmt.Printf("[flb-go %d] plugin region parameter = '%s'\n", operatorID, region)
fmt.Printf("[flb-go %d] plugin compress parameter = '%s'\n", operatorID, compress)
fmt.Printf("[flb-go %d] plugin endpoint parameter = '%s'\n", operatorID, endpoint)

sess := session.New(&aws.Config{
cfg := aws.Config{
Credentials: config.credentials,
Region: config.region,
})
}
if config.endpoint != "" {
cfg.WithEndpoint(config.endpoint).WithS3ForcePathStyle(true)
}

sess := session.New(&cfg)

uploader := s3manager.NewUploader(sess, func(u *s3manager.Uploader) {
u.PartSize = 5 * 1024 * 1024
Expand Down
10 changes: 8 additions & 2 deletions out_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ type testFluentPlugin struct {
s3prefix string
region string
compress string
endpoint string
records []testrecord
position int
events []*events
Expand All @@ -181,6 +182,8 @@ func (p *testFluentPlugin) PluginConfigKey(ctx unsafe.Pointer, key string) strin
return p.region
case "Compress":
return p.compress
case "Endpoint":
return p.endpoint
}
return "unknown-" + key
}
Expand Down Expand Up @@ -240,7 +243,7 @@ func (c *testS3Credential) GetCredentials(accessID, secretkey, credential string

func TestPluginInitializationWithStaticCredentials(t *testing.T) {
s3Creds = &testS3Credential{}
_, err := getS3Config("exampleaccessID", "examplesecretkey", "", "exampleprefix", "examplebucket", "exampleregion", "")
_, err := getS3Config("exampleaccessID", "examplesecretkey", "", "exampleprefix", "examplebucket", "exampleregion", "", "")
if err != nil {
t.Fatalf("failed test %#v", err)
}
Expand All @@ -251,14 +254,15 @@ func TestPluginInitializationWithStaticCredentials(t *testing.T) {
s3prefix: "exampleprefix",
region: "exampleregion",
compress: "",
endpoint: "",
}
res := FLBPluginInit(unsafe.Pointer(&plugin))
assert.Equal(t, output.FLB_OK, res)
}

func TestPluginInitializationWithSharedCredentials(t *testing.T) {
s3Creds = &testS3Credential{}
_, err := getS3Config("", "", "examplecredentials", "exampleprefix", "examplebucket", "exampleregion", "")
_, err := getS3Config("", "", "examplecredentials", "exampleprefix", "examplebucket", "exampleregion", "", "")
if err != nil {
t.Fatalf("failed test %#v", err)
}
Expand All @@ -268,6 +272,7 @@ func TestPluginInitializationWithSharedCredentials(t *testing.T) {
s3prefix: "exampleprefix",
region: "exampleregion",
compress: "",
endpoint: "",
}
res := FLBPluginInit(unsafe.Pointer(&plugin))
assert.Equal(t, output.FLB_OK, res)
Expand All @@ -281,6 +286,7 @@ func TestPluginFlusher(t *testing.T) {
bucket: "examplebucket",
s3prefix: "exampleprefix",
compress: "",
endpoint: "",
}
ts := time.Date(2019, time.March, 10, 10, 11, 12, 0, time.UTC)
testrecords := map[interface{}]interface{}{
Expand Down
7 changes: 6 additions & 1 deletion s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type s3Config struct {
s3prefix *string
region *string
compress format
endpoint string
}

type S3Credential interface {
Expand Down Expand Up @@ -58,7 +59,7 @@ func (c *s3PluginConfig) GetCredentials(accessKeyID, secretKey, credential strin
return nil, fmt.Errorf("Failed to create credentials")
}

func getS3Config(accessID, secretKey, credential, s3prefix, bucket, region, compress string) (*s3Config, error) {
func getS3Config(accessID, secretKey, credential, s3prefix, bucket, region, compress, endpoint string) (*s3Config, error) {
conf := &s3Config{}
creds, err := s3Creds.GetCredentials(accessID, secretKey, credential)
if err != nil {
Expand Down Expand Up @@ -88,5 +89,9 @@ func getS3Config(accessID, secretKey, credential, s3prefix, bucket, region, comp
conf.compress = plainTextFormat
}

if endpoint != "" {
conf.endpoint = endpoint
}

return conf, nil
}
21 changes: 18 additions & 3 deletions s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestGetS3ConfigStaticCredentials(t *testing.T) {
conf, err := getS3Config("exampleaccessID", "examplesecretkey", "", "exampleprefix", "examplebucket", "exampleregion", "")
conf, err := getS3Config("exampleaccessID", "examplesecretkey", "", "exampleprefix", "examplebucket", "exampleregion", "", "")
if err != nil {
t.Fatalf("failed test %#v", err)
}
Expand All @@ -21,7 +21,7 @@ func TestGetS3ConfigStaticCredentials(t *testing.T) {

func TestGetS3ConfigSharedCredentials(t *testing.T) {
s3Creds = &testS3Credential{}
conf, err := getS3Config("", "", "examplecredentials", "exampleprefix", "examplebucket", "exampleregion", "")
conf, err := getS3Config("", "", "examplecredentials", "exampleprefix", "examplebucket", "exampleregion", "", "")
if err != nil {
t.Fatalf("failed test %#v", err)
}
Expand All @@ -35,7 +35,7 @@ func TestGetS3ConfigSharedCredentials(t *testing.T) {

func TestGetS3ConfigCompression(t *testing.T) {
s3Creds = &testS3Credential{}
conf, err := getS3Config("", "", "examplecredentials", "exampleprefix", "examplebucket", "exampleregion", "gzip")
conf, err := getS3Config("", "", "examplecredentials", "exampleprefix", "examplebucket", "exampleregion", "gzip", "")
if err != nil {
t.Fatalf("failed test %#v", err)
}
Expand All @@ -46,3 +46,18 @@ func TestGetS3ConfigCompression(t *testing.T) {
assert.Equal(t, "exampleregion", *conf.region, "Specify s3prefix name")
assert.Equal(t, gzipFormat, conf.compress, "Specify compression method")
}

func TestGetS3ConfigEndpoint(t *testing.T) {
s3Creds = &testS3Credential{}
conf, err := getS3Config("", "", "examplecredentials", "exampleprefix", "examplebucket", "exampleregion", "gzip", "http://localhost:9000")
if err != nil {
t.Fatalf("failed test %#v", err)
}

assert.Equal(t, "examplebucket", *conf.bucket, "Specify bucket name")
assert.Equal(t, "exampleprefix", *conf.s3prefix, "Specify s3prefix name")
assert.NotNil(t, conf.credentials, "credentials not to be nil")
assert.Equal(t, "exampleregion", *conf.region, "Specify s3prefix name")
assert.Equal(t, gzipFormat, conf.compress, "Specify compression method")
assert.Equal(t, "http://localhost:9000", conf.endpoint, "Specify correct endpoint")
}

0 comments on commit 9a2e188

Please sign in to comment.