diff --git a/src/pkg/egress/syslog/https.go b/src/pkg/egress/syslog/https.go index ce0bb90fd..581fb7776 100644 --- a/src/pkg/egress/syslog/https.go +++ b/src/pkg/egress/syslog/https.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "errors" "fmt" + "log" "net/url" "strings" "time" @@ -70,7 +71,8 @@ func (w *HTTPSWriter) sendHttpRequest(msg []byte, msgCount float64) error { func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error { msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname) if err != nil { - return err + log.Printf("failed to parse syslog, dropping faulty message, err: %s", err) + return nil } for _, msg := range msgs { diff --git a/src/pkg/egress/syslog/https_batch.go b/src/pkg/egress/syslog/https_batch.go index 92fa25264..c4c699261 100644 --- a/src/pkg/egress/syslog/https_batch.go +++ b/src/pkg/egress/syslog/https_batch.go @@ -3,6 +3,7 @@ package syslog import ( "bytes" "crypto/tls" + "log" "time" "code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2" @@ -51,7 +52,8 @@ func NewHTTPSBatchWriter( func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error { msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname) if err != nil { - return err + log.Printf("Failed to parse syslog, dropping faulty message, err: %s", err) + return nil } for _, msg := range msgs { @@ -61,28 +63,35 @@ func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error { } func (w *HTTPSBatchWriter) startSender() { + t := time.NewTimer(w.sendInterval) var msgBatch bytes.Buffer var msgCount float64 + reset := func() { + msgBatch.Reset() + msgCount = 0 + t.Reset(w.sendInterval) + } for { select { case msg := <-w.msgs: - msgBatch.Write(msg) - msgCount++ - if msgBatch.Len() >= w.batchSize { - w.sendHttpRequest(msgBatch.Bytes(), msgCount) - msgBatch.Reset() - msgCount = 0 - t.Reset(w.sendInterval) + length, buffer_err := msgBatch.Write(msg) + if buffer_err != nil { + log.Printf("Failed to write to buffer, dropping buffer of size %d , err: %s", length, buffer_err) + reset() + } else { + msgCount++ + if length >= w.batchSize { + w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck + reset() + } } case <-t.C: if msgBatch.Len() > 0 { - w.sendHttpRequest(msgBatch.Bytes(), msgCount) - msgBatch.Reset() - msgCount = 0 + w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck + reset() } - t.Reset(w.sendInterval) } } }