From a8b67a6fd4e244f62cccefb732f6c02dca01deae Mon Sep 17 00:00:00 2001 From: Akhigbe Eromosele Date: Mon, 2 Dec 2024 16:23:24 +0100 Subject: [PATCH] First test failed so i had to do some tinkering and refining and put some logging statements Will clean code up once i am done --- exporter/sematextexporter/config.go | 4 +-- exporter/sematextexporter/es.go | 15 +++----- exporter/sematextexporter/exporter.go | 52 +++++++++++++++++---------- exporter/sematextexporter/factory.go | 2 +- 4 files changed, 41 insertions(+), 32 deletions(-) diff --git a/exporter/sematextexporter/config.go b/exporter/sematextexporter/config.go index 65dec7f66bfc..6da9ee693e6f 100644 --- a/exporter/sematextexporter/config.go +++ b/exporter/sematextexporter/config.go @@ -63,11 +63,11 @@ func (cfg *Config) Validate() error { } if strings.ToLower(cfg.Region) == "eu" { cfg.MetricsEndpoint ="https://spm-receiver.eu.sematext.com" - cfg.LogsEndpoint ="logsene-receiver.eu.sematext.com/_bulk" + cfg.LogsEndpoint ="https://logsene-receiver.eu.sematext.com" } if strings.ToLower(cfg.Region) == "us"{ cfg.MetricsEndpoint ="https://spm-receiver.sematext.com" - cfg.LogsEndpoint = "logsene-receiver.sematext.com/_bulk" + cfg.LogsEndpoint = "https://logsene-receiver.sematext.com" } return nil diff --git a/exporter/sematextexporter/es.go b/exporter/sematextexporter/es.go index db906f63dd98..1e2fd966a95d 100644 --- a/exporter/sematextexporter/es.go +++ b/exporter/sematextexporter/es.go @@ -16,11 +16,6 @@ import ( json "github.com/json-iterator/go" ) -const ( - // artificialDocType designates a syntenic doc type for ES documents - artificialDocType = "_doc" -) - type group struct { client *elastic.Client token string @@ -49,7 +44,6 @@ func NewClient(config *Config, logger *logrus.Logger, writer FlatWriter) (Client if err != nil { return nil, err } - // clients := map[string]group{} clients[config.LogsEndpoint] = group{ client: c, token: config.LogsConfig.AppToken, @@ -74,9 +68,8 @@ func (c *client) Bulk(body interface{}, config *Config) error { v := reflect.ValueOf(body) for i := 0; i < v.Len(); i++ { req := elastic.NewBulkIndexRequest(). - Index(grp.token). - Type(artificialDocType). - Doc(v.Index(i).Interface()) + Index(grp.token). + Doc(v.Index(i).Interface()) bulkRequest.Add(req) } } @@ -125,7 +118,9 @@ func (c *client) Bulk(body interface{}, config *Config) error { func (c *client) writePayload(payload string, status string) { if c.config.WriteEvents.Load() { c.writer.Write(Formatl(payload, status)) - } + } else { + c.logger.Debugf("WriteEvents disabled. Payload: %s, Status: %s", payload, status) + } } // Formatl delimits and formats the response returned by receiver. func Formatl(payload string, status string) string { diff --git a/exporter/sematextexporter/exporter.go b/exporter/sematextexporter/exporter.go index a94d31ae2318..8f145d4f2943 100644 --- a/exporter/sematextexporter/exporter.go +++ b/exporter/sematextexporter/exporter.go @@ -2,7 +2,7 @@ package sematextexporter import ( "context" - // "fmt" + "fmt" "time" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/component" @@ -17,7 +17,7 @@ type sematextLogsExporter struct { logger *logrus.Logger } -func NewExporter(cfg *Config, set exporter.Settings) *sematextLogsExporter { +func newExporter(cfg *Config, set exporter.Settings) *sematextLogsExporter { logger := logrus.New() logger.SetFormatter(&FlatFormatter{}) @@ -37,22 +37,27 @@ func NewExporter(cfg *Config, set exporter.Settings) *sematextLogsExporter { func (e *sematextLogsExporter) pushLogsData(ctx context.Context, logs plog.Logs) error { - // Convert logs to Bulk API payload - bulkPayload, err := convertLogsToBulkPayload(logs) - if err != nil { - e.logger.Errorf("Failed to convert logs: %v", err) - return err - } + // Convert logs to bulk payload + bulkPayload, err := convertLogsToBulkPayload(logs, e.config.LogsConfig.AppToken) + if err != nil { + e.logger.Errorf("Failed to convert logs: %v", err) + return err + } - // Send logs using the Sematext client - if err := e.client.Bulk(bulkPayload, e.config); err != nil { - e.logger.Errorf("Failed to send logs to Sematext: %v", err) - return err - } + // Debug: Print the bulk payload + for _, payload := range bulkPayload { + fmt.Printf("Bulk payload: %v\n", payload) + } - return nil + // Send the bulk payload to Sematext + if err := e.client.Bulk(bulkPayload, e.config); err != nil { + e.logger.Errorf("Failed to send logs to Sematext: %v", err) + return err + } + + return nil } -func convertLogsToBulkPayload(logs plog.Logs) ([]map[string]interface{}, error) { +func convertLogsToBulkPayload(logs plog.Logs, appToken string) ([]map[string]interface{}, error) { var bulkPayload []map[string]interface{} resourceLogs := logs.ResourceLogs() @@ -65,13 +70,20 @@ func convertLogsToBulkPayload(logs plog.Logs) ([]map[string]interface{}, error) for k := 0; k < logRecords.Len(); k++ { record := logRecords.At(k) + // Add metadata for indexing + meta := map[string]interface{}{ + "index": map[string]interface{}{ + "_index": appToken, + }, + } + bulkPayload = append(bulkPayload, meta) + // Build the log entry logEntry := map[string]interface{}{ "@timestamp": record.Timestamp().AsTime().Format(time.RFC3339), "message": record.Body().AsString(), "severity": record.SeverityText(), } - bulkPayload = append(bulkPayload, logEntry) } } @@ -80,10 +92,12 @@ func convertLogsToBulkPayload(logs plog.Logs) ([]map[string]interface{}, error) return bulkPayload, nil } func (e *sematextLogsExporter) Start(ctx context.Context, host component.Host) error { - e.logger.Info("Starting Sematext Logs Exporter...") - return nil + if e.client == nil { + return fmt.Errorf("sematext client is not initialized") + } + e.logger.Info("Starting Sematext Logs Exporter...") + return nil } - func (e *sematextLogsExporter) Shutdown(ctx context.Context) error { e.logger.Info("Shutting down Sematext Logs Exporter...") return nil diff --git a/exporter/sematextexporter/factory.go b/exporter/sematextexporter/factory.go index b58a0e650c3e..42beba2887b7 100644 --- a/exporter/sematextexporter/factory.go +++ b/exporter/sematextexporter/factory.go @@ -115,7 +115,7 @@ func createLogsExporter( set.Logger.Info("Creating Sematext Logs Exporter") // Create the Sematext logs exporter - exporter := NewExporter(cf, set) + exporter := newExporter(cf, set) // Wrap the exporter with OpenTelemetry helper functions return exporterhelper.NewLogsExporter(