Skip to content

Commit

Permalink
Implement methods for indexer acknowledgement
Browse files Browse the repository at this point in the history
  • Loading branch information
spectrumjade committed Feb 26, 2019
1 parent b5cffca commit 3b2d7e0
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 6 deletions.
86 changes: 83 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
"sync"
"time"

Expand All @@ -17,6 +19,8 @@ const (
retryWaitTime = 1 * time.Second

defaultMaxContentLength = 1000000

defaultAcknowledgementTimeout = 90 * time.Second
)

type Client struct {
Expand Down Expand Up @@ -44,7 +48,7 @@ type Client struct {
maxLength int

// List of acknowledgement IDs provided by Splunk
ackIDs []string
ackIDs []int

// Mutex to allow threadsafe acknowledgement checking
ackMux sync.Mutex
Expand Down Expand Up @@ -172,6 +176,82 @@ func (hec *Client) WriteRaw(reader io.ReadSeeker, metadata *EventMetadata) error
return hec.WriteRawWithContext(context.Background(), reader, metadata)
}

type acknowledgementRequest struct {
Acks []int `json:"acks"`
}

// WaitForAcknowledgementWithContext blocks until the Splunk indexer has
// acknowledged that all previously submitted data has been successfully
// indexed or if the provided context is cancelled. This requires the HEC token
// configuration in Splunk to have indexer acknowledgement enabled.
func (hec *Client) WaitForAcknowledgementWithContext(ctx context.Context) error {
// Make our own copy of the list of acknowledgement IDs and remove them
// from the client while we check them.
hec.ackMux.Lock()
ackIDs := hec.ackIDs
hec.ackIDs = nil
hec.ackMux.Unlock()

if len(ackIDs) == 0 {
return nil
}

endpoint := "/services/collector/ack?channel=" + hec.channel

for {
ackRequestData, _ := json.Marshal(acknowledgementRequest{Acks: ackIDs})

response, err := hec.makeRequest(ctx, endpoint, ackRequestData)
if err != nil {
// Put the remaining unacknowledged IDs back
hec.ackMux.Lock()
hec.ackIDs = append(hec.ackIDs, ackIDs...)
hec.ackMux.Unlock()
return err
}

for ackIDString, status := range response.Acks {
if status {
ackID, err := strconv.Atoi(ackIDString)
if err != nil {
return fmt.Errorf("could not convert ack ID to int: %v", err)
}

ackIDs = remove(ackIDs, ackID)
}
}

if len(ackIDs) == 0 {
break
}

// If the server did not indicate that all acknowledgements have been
// made, check again after a short delay.
select {
case <-time.After(retryWaitTime):
continue
case <-ctx.Done():
// Put the remaining unacknowledged IDs back
hec.ackMux.Lock()
hec.ackIDs = append(hec.ackIDs, ackIDs...)
hec.ackMux.Unlock()
return ctx.Err()
}
}

return nil
}

// WaitForAcknowledgement blocks until the Splunk indexer has acknowledged
// that all previously submitted data has been successfully indexed or if the
// default acknowledgement timeout is reached. This requires the HEC token
// configuration in Splunk to have indexer acknowledgement enabled.
func (hec *Client) WaitForAcknowledgement() error {
ctx, cancel := context.WithTimeout(context.Background(), defaultAcknowledgementTimeout)
defer cancel()
return hec.WaitForAcknowledgementWithContext(ctx)
}

// breakStream breaks text from reader into chunks, with every chunk less than max.
// Unless a single line is longer than max, it always cut at end of lines ("\n")
func breakStream(reader io.ReadSeeker, max int, callback func(chunk []byte) error) error {
Expand Down Expand Up @@ -279,11 +359,11 @@ func (hec *Client) write(ctx context.Context, endpoint string, data []byte) erro
}

// Check for acknowledgement IDs and store them if provided
if response.AckID != "" {
if response.AckID != nil {
hec.ackMux.Lock()
defer hec.ackMux.Unlock()

hec.ackIDs = append(hec.ackIDs, response.AckID)
hec.ackIDs = append(hec.ackIDs, *response.AckID)
}

return nil
Expand Down
7 changes: 4 additions & 3 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (

// Response is response message from HEC. For example, `{"text":"Success","code":0}`.
type Response struct {
Text string `json:"text"`
Code int `json:"code"`
AckID string `json:"ackID,omitempty"`
Text string `json:"text"`
Code int `json:"code"`
AckID *int `json:"ackId"` // Use a pointer so we can differentiate between a 0 and an ack ID not being specified
Acks map[string]bool `json:"acks"` // Splunk returns ack IDs as strings rather than ints
}

// Response status codes
Expand Down
7 changes: 7 additions & 0 deletions hec.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package hec

import (
"context"
"io"
"net/http"
)
Expand All @@ -20,4 +21,10 @@ type HEC interface {

// WriteRaw writes raw data stream via HEC raw mode
WriteRaw(reader io.ReadSeeker, metadata *EventMetadata) error

// WaitForAcknowledgement blocks until the Splunk indexer acknowledges data sent to it
WaitForAcknowledgement() error

// WaitForAcknowledgementWithContext blocks until the Splunk indexer acknowledges data sent to it with a context for cancellation
WaitForAcknowledgementWithContext(ctx context.Context) error
}
11 changes: 11 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package hec

func remove(l []int, item int) []int {
for i, other := range l {
if other == item {
return append(l[:i], l[i+1:]...)
}
}

return l
}

0 comments on commit 3b2d7e0

Please sign in to comment.