Skip to content

Commit

Permalink
Merge pull request #8 from FindHotel/s3-transport
Browse files Browse the repository at this point in the history
Add S3 transport
  • Loading branch information
velppa authored Apr 5, 2019
2 parents 2df8644 + 37db4d0 commit 22a8b9e
Show file tree
Hide file tree
Showing 10 changed files with 547 additions and 53 deletions.
63 changes: 54 additions & 9 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@
[[constraint]]
name = "github.com/avast/retry-go"
version = "2.1.0"

[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = "1.19.1"
43 changes: 26 additions & 17 deletions analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

// Version of the client.
const Version = "3.4.1"
const Version = "3.5.0"

// Client is the main API exposed by the analytics package.
// Values that satsify this interface are returned by the client constructors
Expand Down Expand Up @@ -96,6 +96,17 @@ func New(writeKey string) Client {
// values (like a negative flush interval for example). When the function
// returns an error the returned client will always be nil.
func NewWithConfig(writeKey string, config Config) (Client, error) {
c, err := newWithConfig(writeKey, config)
if err != nil {
return nil, err
}
go c.loop()
go c.loopMetrics()

return c, nil
}

func newWithConfig(writeKey string, config Config) (*client, error) {
if err := config.validate(); err != nil {
return nil, err
}
Expand All @@ -109,13 +120,11 @@ func NewWithConfig(writeKey string, config Config) (Client, error) {
http: makeHTTPClient(config.Transport),
metricsRegistry: metrics.NewRegistry(),
}

c.successCounters = c.newCounters("submitted.success")
c.failureCounters = c.newCounters("submitted.failure")
c.droppedCounters = c.newCounters("dropped")

go c.loop()
go c.loopMetrics()

return c, nil
}

Expand All @@ -141,37 +150,37 @@ func (c *client) Enqueue(msg Message) (err error) {
switch m := msg.(type) {
case Alias:
m.Type = "alias"
m.MessageId = makeMessageId(m.MessageId, id)
m.MessageId = makeMessageID(m.MessageId, id)
m.Timestamp = makeTimestamp(m.Timestamp, ts)
msg = m

case Group:
m.Type = "group"
m.MessageId = makeMessageId(m.MessageId, id)
m.MessageId = makeMessageID(m.MessageId, id)
m.Timestamp = makeTimestamp(m.Timestamp, ts)
msg = m

case Identify:
m.Type = "identify"
m.MessageId = makeMessageId(m.MessageId, id)
m.MessageId = makeMessageID(m.MessageId, id)
m.Timestamp = makeTimestamp(m.Timestamp, ts)
msg = m

case Page:
m.Type = "page"
m.MessageId = makeMessageId(m.MessageId, id)
m.MessageId = makeMessageID(m.MessageId, id)
m.Timestamp = makeTimestamp(m.Timestamp, ts)
msg = m

case Screen:
m.Type = "screen"
m.MessageId = makeMessageId(m.MessageId, id)
m.MessageId = makeMessageID(m.MessageId, id)
m.Timestamp = makeTimestamp(m.Timestamp, ts)
msg = m

case Track:
m.Type = "track"
m.MessageId = makeMessageId(m.MessageId, id)
m.MessageId = makeMessageID(m.MessageId, id)
m.Timestamp = makeTimestamp(m.Timestamp, ts)
msg = m
}
Expand Down Expand Up @@ -232,7 +241,7 @@ func (c *client) send(msgs []message) {
const attempts = 10

b, err := json.Marshal(batch{
MessageId: c.uid(),
MessageID: c.uid(),
SentAt: c.now(),
Messages: msgs,
Context: c.DefaultContext,
Expand Down Expand Up @@ -357,7 +366,7 @@ func (c *client) push(q *messageQueue, m Message, wg *sync.WaitGroup, ex *execut

if msg, err = makeMessage(m, maxMessageBytes); err != nil {
c.errorf("%s - %v", err, m)
c.notifyFailure([]message{{m, nil}}, err)
c.notifyFailure([]message{msg}, err)
return
}

Expand Down Expand Up @@ -392,7 +401,7 @@ func (c *client) errorf(format string, args ...interface{}) {

func (c *client) maxBatchBytes() int {
b, _ := json.Marshal(batch{
MessageId: c.uid(),
MessageID: c.uid(),
SentAt: c.now(),
Context: c.DefaultContext,
})
Expand All @@ -401,22 +410,22 @@ func (c *client) maxBatchBytes() int {

func (c *client) notifySuccess(msgs []message) {
for _, m := range msgs {
c.successCounters(m.msg.tags()...).Inc(1)
c.successCounters(m.Msg().tags()...).Inc(1)
}
if c.Callback != nil {
for _, m := range msgs {
c.Callback.Success(m.msg)
c.Callback.Success(m.Msg())
}
}
}

func (c *client) notifyFailure(msgs []message, err error) {
for _, m := range msgs {
c.failureCounters(m.msg.tags()...).Inc(1)
c.failureCounters(m.Msg().tags()...).Inc(1)
}
if c.Callback != nil {
for _, m := range msgs {
c.Callback.Failure(m.msg, err)
c.Callback.Failure(m.Msg(), err)
}
}
}
14 changes: 11 additions & 3 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import (
"os"
)

// Instances of types implementing this interface can be used to define where
// the analytics client logs are written.
// Logger can be used to define where the analytics client logs are written.
type Logger interface {

// Analytics clients call this method to log regular messages about the
Expand All @@ -22,7 +21,7 @@ type Logger interface {
Errorf(format string, args ...interface{})
}

// This function instantiate an object that statisfies the analytics.Logger
// StdLogger instantiate an object that statisfies the analytics.Logger
// interface and send logs to standard logger passed as argument.
func StdLogger(logger *log.Logger) Logger {
return stdLogger{
Expand All @@ -45,3 +44,12 @@ func (l stdLogger) Errorf(format string, args ...interface{}) {
func newDefaultLogger() Logger {
return StdLogger(log.New(os.Stderr, "segment ", log.LstdFlags))
}

// DiscardLogger discards all log messages supplied to it.
type DiscardLogger struct{}

// Logf does nothing for DiscardLogger.
func (l DiscardLogger) Logf(format string, args ...interface{}) {}

// Errorf does nothing for DiscardLogger.
func (l DiscardLogger) Errorf(format string, args ...interface{}) {}
44 changes: 28 additions & 16 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Message interface {

// Takes a message id as first argument and returns it, unless it's the zero-
// value, in that case the default id passed as second argument is returned.
func makeMessageId(id string, def string) string {
func makeMessageID(id string, def string) string {
if len(id) == 0 {
return def
}
Expand All @@ -63,37 +63,49 @@ func makeTimestamp(t Time, def Time) Time {
// export this type because it's only meant to be used internally to send groups
// of messages in one API call.
type batch struct {
MessageId string `json:"messageId"`
MessageID string `json:"messageId"`
SentAt Time `json:"sentAt"`
Messages []message `json:"batch"`
Context *Context `json:"context"`
}

type message struct {
type serializedMessage struct {
msg Message
json []byte
}

func makeMessage(m Message, maxBytes int) (msg message, err error) {
if msg.json, err = json.Marshal(m); err == nil {
if len(msg.json) > maxBytes {
err = ErrMessageTooBig
} else {
msg.msg = m
}
}
return
func (m *serializedMessage) MarshalJSON() ([]byte, error) {
return m.json, nil
}

func (m message) MarshalJSON() ([]byte, error) {
return m.json, nil
func (m *serializedMessage) Msg() Message {
return m.msg
}

func (m message) size() int {
func (m *serializedMessage) size() int {
// The `+ 1` is for the comma that sits between each items of a JSON array.
return len(m.json) + 1
}

type message interface {
MarshalJSON() ([]byte, error)
Msg() Message
size() int
}

func makeMessage(m Message, maxBytes int) (message, error) {
result := &serializedMessage{msg: m}
b, err := json.Marshal(m)
if err != nil {
return result, err
}
if len(b) > maxBytes {
return result, ErrMessageTooBig
}
result.json = b
return result, nil
}

type messageQueue struct {
pending []message
bytes int
Expand All @@ -111,7 +123,7 @@ func (q *messageQueue) push(m message) (b []message) {
}

q.pending = append(q.pending, m)
q.bytes += len(m.json)
q.bytes += m.size()

if b == nil && len(q.pending) == q.maxBatchSize {
b = q.flush()
Expand Down
11 changes: 5 additions & 6 deletions message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
)

func TestMessageIdDefault(t *testing.T) {
if id := makeMessageId("", "42"); id != "42" {
if id := makeMessageID("", "42"); id != "42" {
t.Error("invalid default message id:", id)
}
}

func TestMessageIdNonDefault(t *testing.T) {
if id := makeMessageId("A", "42"); id != "A" {
if id := makeMessageID("A", "42"); id != "A" {
t.Error("invalid non-default message id:", id)
}
}
Expand Down Expand Up @@ -55,7 +55,7 @@ func TestMessageQueuePushMaxBatchBytes(t *testing.T) {

q := messageQueue{
maxBatchSize: 100,
maxBatchBytes: len(m0.json) + 1,
maxBatchBytes: m0.size(),
}

if msgs := q.push(m0); msgs != nil {
Expand All @@ -76,12 +76,11 @@ func TestMakeMessage(t *testing.T) {

if msg, err := makeMessage(track, maxMessageBytes); err != nil {
t.Error("failed to make message from track message:", err)

} else if !reflect.DeepEqual(msg, message{
} else if !reflect.DeepEqual(msg.(*serializedMessage), &serializedMessage{
msg: track,
json: []byte(`{"userId":"1","event":"","timestamp":0}`),
}) {
t.Error("invalid message generated from track message:", msg.msg, string(msg.json))
t.Error("invalid message generated from track message:", msg.Msg())
}
}

Expand Down
Loading

0 comments on commit 22a8b9e

Please sign in to comment.