Skip to content

Commit

Permalink
Integrate airbrake with datastore
Browse files Browse the repository at this point in the history
  • Loading branch information
agbpatro committed Apr 5, 2024
1 parent 4582a49 commit 8240851
Show file tree
Hide file tree
Showing 11 changed files with 106 additions and 31 deletions.
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func startServer(config *config.Config, logger *logrus.Logger) (err error) {
monitoring.StartServerMetrics(config, logger, registry)
}

producerRules, err := config.ConfigureProducers(logger)
producerRules, err := config.ConfigureProducers(airbrakeHandler, logger)
if err != nil {
return err
}
Expand Down
11 changes: 6 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/teslamotors/fleet-telemetry/datastore/zmq"
logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/server/airbrake"
"github.com/teslamotors/fleet-telemetry/telemetry"
)

Expand Down Expand Up @@ -241,7 +242,7 @@ func (c *Config) prometheusEnabled() bool {
}

// ConfigureProducers validates and establishes connections to the producers (kafka/pubsub/logger)
func (c *Config) ConfigureProducers(logger *logrus.Logger) (map[string][]telemetry.Producer, error) {
func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (map[string][]telemetry.Producer, error) {
producers := make(map[telemetry.Dispatcher]telemetry.Producer)
producers[telemetry.Logger] = simple.NewProtoLogger(logger)

Expand All @@ -257,7 +258,7 @@ func (c *Config) ConfigureProducers(logger *logrus.Logger) (map[string][]telemet
return nil, errors.New("Expected Kafka to be configured")
}
convertKafkaConfig(c.Kafka)
kafkaProducer, err := kafka.NewProducer(c.Kafka, c.Namespace, c.ReliableAckWorkers, c.AckChan, c.prometheusEnabled(), c.MetricCollector, logger)
kafkaProducer, err := kafka.NewProducer(c.Kafka, c.Namespace, c.ReliableAckWorkers, c.AckChan, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, logger)
if err != nil {
return nil, err
}
Expand All @@ -268,7 +269,7 @@ func (c *Config) ConfigureProducers(logger *logrus.Logger) (map[string][]telemet
if c.Pubsub == nil {
return nil, errors.New("Expected Pubsub to be configured")
}
googleProducer, err := googlepubsub.NewProducer(context.Background(), c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.MetricCollector, logger)
googleProducer, err := googlepubsub.NewProducer(context.Background(), c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.MetricCollector, airbrakeHandler, logger)
if err != nil {
return nil, err
}
Expand All @@ -284,7 +285,7 @@ func (c *Config) ConfigureProducers(logger *logrus.Logger) (map[string][]telemet
maxRetries = *c.Kinesis.MaxRetries
}
streamMapping := c.CreateKinesisStreamMapping(recordNames)
kinesis, err := kinesis.NewProducer(maxRetries, streamMapping, c.Kinesis.OverrideHost, c.prometheusEnabled(), c.MetricCollector, logger)
kinesis, err := kinesis.NewProducer(maxRetries, streamMapping, c.Kinesis.OverrideHost, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, logger)
if err != nil {
return nil, err
}
Expand All @@ -295,7 +296,7 @@ func (c *Config) ConfigureProducers(logger *logrus.Logger) (map[string][]telemet
if c.ZMQ == nil {
return nil, errors.New("Expected ZMQ to be configured")
}
zmqProducer, err := zmq.NewProducer(context.Background(), c.ZMQ, c.MetricCollector, c.Namespace, logger)
zmqProducer, err := zmq.NewProducer(context.Background(), c.ZMQ, c.MetricCollector, c.Namespace, airbrakeHandler, logger)
if err != nil {
return nil, err
}
Expand Down
15 changes: 8 additions & 7 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/server/airbrake"
"github.com/teslamotors/fleet-telemetry/telemetry"
)

Expand Down Expand Up @@ -134,7 +135,7 @@ var _ = Describe("Test full application config", func() {
config, err := loadTestApplicationConfig(TestSmallConfig)
Expect(err).NotTo(HaveOccurred())

producers, err = config.ConfigureProducers(log)
producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).NotTo(HaveOccurred())
Expect(producers["FS"]).To(HaveLen(1))

Expand All @@ -150,7 +151,7 @@ var _ = Describe("Test full application config", func() {
config.Records = map[string][]telemetry.Dispatcher{"FS": {"kinesis"}}

var err error
producers, err = config.ConfigureProducers(log)
producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).To(MatchError("Expected Kinesis to be configured"))
Expect(producers).To(BeNil())
})
Expand Down Expand Up @@ -188,15 +189,15 @@ var _ = Describe("Test full application config", func() {
log, _ := logrus.NoOpLogger()
_ = os.Setenv("PUBSUB_EMULATOR_HOST", "some_url")
_ = os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", "some_service_account_path")
_, err := pubsubConfig.ConfigureProducers(log)
_, err := pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).To(MatchError("pubsub_connect_error pubsub cannot initialize with both emulator and GCP resource"))
})

It("pubsub config works", func() {
log, _ := logrus.NoOpLogger()
_ = os.Setenv("PUBSUB_EMULATOR_HOST", "some_url")
var err error
producers, err = pubsubConfig.ConfigureProducers(log)
producers, err = pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).NotTo(HaveOccurred())
Expect(producers["FS"]).NotTo(BeNil())
})
Expand All @@ -215,10 +216,10 @@ var _ = Describe("Test full application config", func() {
log, _ := logrus.NoOpLogger()
config.Records = map[string][]telemetry.Dispatcher{"FS": {"zmq"}}
var err error
producers, err = config.ConfigureProducers(log)
producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).To(MatchError("Expected ZMQ to be configured"))
Expect(producers).To(BeNil())
producers, err = zmqConfig.ConfigureProducers(log)
producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).To(BeNil())
})

Expand All @@ -227,7 +228,7 @@ var _ = Describe("Test full application config", func() {
zmqConfig.ZMQ.Addr = "tcp://127.0.0.1:5285"
log, _ := logrus.NoOpLogger()
var err error
producers, err = zmqConfig.ConfigureProducers(log)
producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).NotTo(HaveOccurred())
Expect(producers["FS"]).NotTo(BeNil())
})
Expand Down
17 changes: 13 additions & 4 deletions datastore/googlepubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/metrics/adapter"
"github.com/teslamotors/fleet-telemetry/server/airbrake"
"github.com/teslamotors/fleet-telemetry/telemetry"
)

Expand All @@ -24,6 +25,7 @@ type Producer struct {
metricsCollector metrics.MetricCollector
prometheusEnabled bool
logger *logrus.Logger
airbrakeHandler *airbrake.AirbrakeHandler
}

// Metrics stores metrics reported from this package
Expand Down Expand Up @@ -52,7 +54,7 @@ func configurePubsub(projectID string) (*pubsub.Client, error) {
}

// NewProducer establishes the pubsub connection and define the dispatch method
func NewProducer(ctx context.Context, prometheusEnabled bool, projectID string, namespace string, metricsCollector metrics.MetricCollector, logger *logrus.Logger) (telemetry.Producer, error) {
func NewProducer(ctx context.Context, prometheusEnabled bool, projectID string, namespace string, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (telemetry.Producer, error) {
registerMetricsOnce(metricsCollector)
pubsubClient, err := configurePubsub(projectID)
if err != nil {
Expand All @@ -66,6 +68,7 @@ func NewProducer(ctx context.Context, prometheusEnabled bool, projectID string,
prometheusEnabled: prometheusEnabled,
metricsCollector: metricsCollector,
logger: logger,
airbrakeHandler: airbrakeHandler,
}
p.logger.ActivityLog("pubsub_registerd", logrus.LogInfo{"project": projectID, "namespace": namespace})
return p, nil
Expand All @@ -80,13 +83,13 @@ func (p *Producer) Produce(entry *telemetry.Record) {
pubsubTopic, err := p.createTopicIfNotExists(ctx, topicName)

if err != nil {
p.logger.ErrorLog("pubsub_topic_creation_error", err, logInfo)
p.ReportError("pubsub_topic_creation_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{})
return
}

if exists, err := pubsubTopic.Exists(ctx); !exists || err != nil {
p.logger.ErrorLog("pubsub_topic_check_error", err, logInfo)
p.ReportError("pubsub_topic_check_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{})
return
}
Expand All @@ -97,7 +100,7 @@ func (p *Producer) Produce(entry *telemetry.Record) {
Attributes: entry.Metadata(),
})
if _, err = result.Get(ctx); err != nil {
p.logger.ErrorLog("pubsub_err", err, logInfo)
p.ReportError("pubsub_err", err, logInfo)
metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType})
return
}
Expand All @@ -119,6 +122,12 @@ func (p *Producer) createTopicIfNotExists(ctx context.Context, topic string) (*p
return p.pubsubClient.CreateTopic(ctx, topic)
}

// ReportError to airbrake and logger
func (p *Producer) ReportError(message string, err error, logInfo logrus.LogInfo) {
p.airbrakeHandler.ReportLogMessage(logrus.ERROR, message, err, logInfo)
p.logger.ErrorLog(message, err, logInfo)
}

func registerMetricsOnce(metricsCollector metrics.MetricCollector) {
metricsOnce.Do(func() { registerMetrics(metricsCollector) })
}
Expand Down
13 changes: 11 additions & 2 deletions datastore/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/metrics/adapter"
"github.com/teslamotors/fleet-telemetry/server/airbrake"
"github.com/teslamotors/fleet-telemetry/telemetry"
)

Expand All @@ -20,6 +21,7 @@ type Producer struct {
prometheusEnabled bool
metricsCollector metrics.MetricCollector
logger *logrus.Logger
airbrakeHandler *airbrake.AirbrakeHandler
}

// Metrics stores metrics reported from this package
Expand All @@ -36,7 +38,7 @@ var (

// NewProducer establishes the kafka connection and define the dispatch method
func NewProducer(config *kafka.ConfigMap, namespace string, reliableAckWorkers int,
ackChan chan (*telemetry.Record), prometheusEnabled bool, metricsCollector metrics.MetricCollector, logger *logrus.Logger) (telemetry.Producer, error) {
ackChan chan (*telemetry.Record), prometheusEnabled bool, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (telemetry.Producer, error) {
registerMetricsOnce(metricsCollector)

kafkaProducer, err := kafka.NewProducer(config)
Expand All @@ -50,6 +52,7 @@ func NewProducer(config *kafka.ConfigMap, namespace string, reliableAckWorkers i
metricsCollector: metricsCollector,
prometheusEnabled: prometheusEnabled,
logger: logger,
airbrakeHandler: airbrakeHandler,
}

for i := 0; i < reliableAckWorkers; i++ {
Expand Down Expand Up @@ -84,6 +87,12 @@ func (p *Producer) Produce(entry *telemetry.Record) {
metricsRegistry.byteTotal.Add(int64(entry.Length()), map[string]string{"record_type": entry.TxType})
}

// ReportError to airbrake and logger
func (p *Producer) ReportError(message string, err error, logInfo logrus.LogInfo) {
p.airbrakeHandler.ReportLogMessage(logrus.ERROR, message, err, logInfo)
p.logger.ErrorLog(message, err, logInfo)
}

func headersFromRecord(record *telemetry.Record) (headers []kafka.Header) {
for key, val := range record.Metadata() {
headers = append(headers, kafka.Header{
Expand Down Expand Up @@ -111,7 +120,7 @@ func (p *Producer) handleProducerEvents(ackChan chan (*telemetry.Record)) {
}

func (p *Producer) logError(err error) {
p.logger.ErrorLog("kafka_err", err, nil)
p.ReportError("kafka_err", err, nil)
metricsRegistry.errorCount.Inc(map[string]string{})
}

Expand Down
15 changes: 12 additions & 3 deletions datastore/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/metrics/adapter"
"github.com/teslamotors/fleet-telemetry/server/airbrake"
"github.com/teslamotors/fleet-telemetry/telemetry"
)

Expand All @@ -22,6 +23,7 @@ type Producer struct {
prometheusEnabled bool
metricsCollector metrics.MetricCollector
streams map[string]string
airbrakeHandler *airbrake.AirbrakeHandler
}

// Metrics stores metrics reported from this package
Expand All @@ -37,7 +39,7 @@ var (
)

// NewProducer configures and tests the kinesis connection
func NewProducer(maxRetries int, streams map[string]string, overrideHost string, prometheusEnabled bool, metricsCollector metrics.MetricCollector, logger *logrus.Logger) (telemetry.Producer, error) {
func NewProducer(maxRetries int, streams map[string]string, overrideHost string, prometheusEnabled bool, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (telemetry.Producer, error) {
registerMetricsOnce(metricsCollector)

config := &aws.Config{
Expand Down Expand Up @@ -66,6 +68,7 @@ func NewProducer(maxRetries int, streams map[string]string, overrideHost string,
prometheusEnabled: prometheusEnabled,
metricsCollector: metricsCollector,
streams: streams,
airbrakeHandler: airbrakeHandler,
}, nil
}

Expand All @@ -74,7 +77,7 @@ func (p *Producer) Produce(entry *telemetry.Record) {
entry.ProduceTime = time.Now()
stream, ok := p.streams[entry.TxType]
if !ok {
p.logger.ErrorLog("kinesis_produce_stream_not_configured", nil, logrus.LogInfo{"record_type": entry.TxType})
p.ReportError("kinesis_produce_stream_not_configured", nil, logrus.LogInfo{"record_type": entry.TxType})
return
}
kinesisRecord := &kinesis.PutRecordInput{
Expand All @@ -85,7 +88,7 @@ func (p *Producer) Produce(entry *telemetry.Record) {

kinesisRecordOutput, err := p.kinesis.PutRecord(kinesisRecord)
if err != nil {
p.logger.ErrorLog("kinesis_err", err, nil)
p.ReportError("kinesis_err", err, nil)
metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType})
return
}
Expand All @@ -95,6 +98,12 @@ func (p *Producer) Produce(entry *telemetry.Record) {
metricsRegistry.byteTotal.Add(int64(entry.Length()), map[string]string{"record_type": entry.TxType})
}

// ReportError to airbrake and logger
func (p *Producer) ReportError(message string, err error, logInfo logrus.LogInfo) {
p.airbrakeHandler.ReportLogMessage(logrus.ERROR, message, err, logInfo)
p.logger.ErrorLog(message, err, logInfo)
}

func registerMetricsOnce(metricsCollector metrics.MetricCollector) {
metricsOnce.Do(func() { registerMetrics(metricsCollector) })
}
Expand Down
5 changes: 5 additions & 0 deletions datastore/simple/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,8 @@ func (p *ProtoLogger) Produce(entry *telemetry.Record) {
}
p.logger.ActivityLog("logger_json_unmarshal", logrus.LogInfo{"vin": entry.Vin, "metadata": entry.Metadata(), "data": string(data)})
}

// ReportError noop method
func (p *ProtoLogger) ReportError(message string, err error, logInfo logrus.LogInfo) {
return
}
26 changes: 19 additions & 7 deletions datastore/zmq/zmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/metrics/adapter"
"github.com/teslamotors/fleet-telemetry/server/airbrake"
"github.com/teslamotors/fleet-telemetry/telemetry"
)

Expand Down Expand Up @@ -58,10 +59,11 @@ const MonitorSocketAddr = "inproc://zmq_socket_monitor.rep"
// ZMQProducer implements the telemetry.Producer interface by publishing to a
// bound zmq socket.
type ZMQProducer struct {
namespace string
ctx context.Context
sock *zmq4.Socket
logger *logrus.Logger
namespace string
ctx context.Context
sock *zmq4.Socket
logger *logrus.Logger
airbrakeHandler *airbrake.AirbrakeHandler
}

// Publish the record to the socket.
Expand All @@ -71,13 +73,19 @@ func (p *ZMQProducer) Produce(rec *telemetry.Record) {
}
if nBytes, err := p.sock.SendMessage(telemetry.BuildTopicName(p.namespace, rec.TxType), rec.Payload()); err != nil {
metricsRegistry.errorCount.Inc(map[string]string{"record_type": rec.TxType})
p.logger.ErrorLog("zmq_dispatch_error", err, nil)
p.ReportError("zmq_dispatch_error", err, nil)
} else {
metricsRegistry.byteTotal.Add(int64(nBytes), map[string]string{"record_type": rec.TxType})
metricsRegistry.publishCount.Inc(map[string]string{"record_type": rec.TxType})
}
}

// ReportError to airbrake and logger
func (p *ZMQProducer) ReportError(message string, err error, logInfo logrus.LogInfo) {
p.airbrakeHandler.ReportLogMessage(logrus.ERROR, message, err, logInfo)
p.logger.ErrorLog(message, err, logInfo)
}

// Close the underlying socket.
func (p *ZMQProducer) Close() error {
if p.sock != nil {
Expand All @@ -90,7 +98,7 @@ func (p *ZMQProducer) Close() error {
}

// NewProducer creates a ZMQProducer with the given config.
func NewProducer(ctx context.Context, config *Config, metrics metrics.MetricCollector, namespace string, logger *logrus.Logger) (producer telemetry.Producer, err error) {
func NewProducer(ctx context.Context, config *Config, metrics metrics.MetricCollector, namespace string, airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (producer telemetry.Producer, err error) {
registerMetricsOnce(metrics)
sock, err := zmq4.NewSocket(zmq4.PUB)
if err != nil {
Expand Down Expand Up @@ -146,7 +154,11 @@ func NewProducer(ctx context.Context, config *Config, metrics metrics.MetricColl
}

return &ZMQProducer{
namespace, ctx, sock, logger,
namespace: namespace,
ctx: ctx,
sock: sock,
logger: logger,
airbrakeHandler: airbrakeHandler,
}, nil
}

Expand Down
Loading

0 comments on commit 8240851

Please sign in to comment.