Skip to content

Commit

Permalink
Fix Headers were not being properly sent in OtlpHttpLogExporter (#513)
Browse files Browse the repository at this point in the history
Fix Headers were not being properly sent in OtlpHttpLogExporter or StableOtlpMetricExporter
Given some xcformat love to some related files
  • Loading branch information
Ignacio Bonafonte authored Feb 8, 2024
1 parent 0033445 commit 297d91f
Show file tree
Hide file tree
Showing 10 changed files with 465 additions and 460 deletions.
106 changes: 52 additions & 54 deletions Sources/Exporters/OpenTelemetryProtocolGrpc/logs/OtlpLogExporter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,68 +4,66 @@
*/

import Foundation
import Logging
import GRPC
import Logging
import NIO
import NIOHPACK
import OpenTelemetryApi
import OpenTelemetrySdk
import OpenTelemetryProtocolExporterCommon
import OpenTelemetrySdk

public class OtlpLogExporter : LogRecordExporter {
let channel : GRPCChannel
var logClient : Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient
let config : OtlpConfiguration
var callOptions : CallOptions
public init(channel: GRPCChannel,
config: OtlpConfiguration = OtlpConfiguration(),
logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }),
envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes){
self.channel = channel
logClient = Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient(channel: channel)
self.config = config
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else if let headers = config.headers {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
}
else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
public class OtlpLogExporter: LogRecordExporter {
let channel: GRPCChannel
var logClient: Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient
let config: OtlpConfiguration
var callOptions: CallOptions

public init(channel: GRPCChannel,
config: OtlpConfiguration = OtlpConfiguration(),
logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }),
envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) {
self.channel = channel
logClient = Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient(channel: channel)
self.config = config
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else if let headers = config.headers {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
}
}
}

public func export(logRecords: [ReadableLogRecord], explicitTimeout: TimeInterval? = nil) -> ExportResult {
let logRequest = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in
request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: logRecords)

public func export(logRecords: [ReadableLogRecord], explicitTimeout: TimeInterval? = nil) -> ExportResult {
let logRequest = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in
request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: logRecords)
}
let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout)
if timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds)))
}

let export = logClient.export(logRequest, callOptions: callOptions)
do {
_ = try export.response.wait()
return .success
} catch {
return .failure
}
}
let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout)
if timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds)))

public func shutdown(explicitTimeout: TimeInterval? = nil) {
_ = channel.close()
}


let export = logClient.export(logRequest, callOptions: callOptions)
do {
_ = try export.response.wait()
return .success
} catch {
return .failure

public func forceFlush(explicitTimeout: TimeInterval? = nil) -> ExportResult {
.success
}
}

public func shutdown(explicitTimeout: TimeInterval? = nil) {
_ = channel.close()
}

public func forceFlush(explicitTimeout: TimeInterval? = nil) -> ExportResult {
.success
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,24 @@
*/

import Foundation
import Logging
import GRPC
import Logging
import NIO
import NIOHPACK
import OpenTelemetryApi
import OpenTelemetrySdk
import OpenTelemetryProtocolExporterCommon
import OpenTelemetrySdk

public class OtlpMetricExporter: MetricExporter {
let channel: GRPCChannel
var metricClient: Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient
let config : OtlpConfiguration
var callOptions : CallOptions? = nil


let config: OtlpConfiguration
var callOptions: CallOptions?

public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) {
public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) {
self.channel = channel
self.config = config
self.metricClient = Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient(channel: self.channel)
metricClient = Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient(channel: self.channel)
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
Expand All @@ -33,38 +31,37 @@ public class OtlpMetricExporter: MetricExporter {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
}
else {
} else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
}
}

public func export(metrics: [Metric], shouldCancel: (() -> Bool)?) -> MetricExporterResultCode {
let exportRequest = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest
.with {
$0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(metricDataList: metrics)
}

if config.timeout > 0 {
metricClient.defaultCallOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(config.timeout.toNanoseconds)))
}

let export = metricClient.export(exportRequest, callOptions: callOptions)

do {
_ = try export.response.wait()
return .success
} catch {
return .failureRetryable
}
}

public func flush() -> SpanExporterResultCode {
return .success
}

public func shutdown() {
_ = channel.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ public class StableOtlpMetricExporter: StableMetricExporter {
public func getAggregationTemporality(for instrument: OpenTelemetrySdk.InstrumentType) -> OpenTelemetrySdk.AggregationTemporality {
return aggregationTemporalitySelector.getAggregationTemporality(for: instrument)
}

let channel: GRPCChannel
var metricClient: Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient
let config: OtlpConfiguration
var callOptions: CallOptions?
var aggregationTemporalitySelector: AggregationTemporalitySelector
var defaultAggregationSelector: DefaultAggregationSelector

public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), aggregationTemporalitySelector: AggregationTemporalitySelector = AggregationTemporality.alwaysCumulative(),
defaultAggregationSelector: DefaultAggregationSelector = AggregationSelector.instance,
logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes)
Expand All @@ -31,7 +31,7 @@ public class StableOtlpMetricExporter: StableMetricExporter {
self.aggregationTemporalitySelector = aggregationTemporalitySelector
self.channel = channel
self.config = config
self.metricClient = Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient(channel: self.channel)
metricClient = Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient(channel: self.channel)
if let headers = envVarHeaders {
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
} else if let headers = config.headers {
Expand All @@ -40,7 +40,7 @@ public class StableOtlpMetricExporter: StableMetricExporter {
callOptions = CallOptions(logger: logger)
}
}

public func export(metrics: [OpenTelemetrySdk.StableMetricData]) -> OpenTelemetrySdk.ExportResult {
let exportRequest = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with {
$0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(stableMetricData: metrics)
Expand All @@ -56,14 +56,14 @@ public class StableOtlpMetricExporter: StableMetricExporter {
return .failure
}
}

public func flush() -> OpenTelemetrySdk.ExportResult {
return .success
}

public func shutdown() -> OpenTelemetrySdk.ExportResult {
_ = channel.close()

return .success
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,67 +4,65 @@
*/

import Foundation
import Logging
import GRPC
import Logging
import NIO
import NIOHPACK
import OpenTelemetryApi
import OpenTelemetrySdk
import OpenTelemetryProtocolExporterCommon
import OpenTelemetrySdk

public class OtlpTraceExporter: SpanExporter {

let channel: GRPCChannel
var traceClient: Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient
let config : OtlpConfiguration
var callOptions : CallOptions

public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) {
self.channel = channel
traceClient = Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient(channel: channel)
self.config = config
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else if let headers = config.headers {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
let channel: GRPCChannel
var traceClient: Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient
let config: OtlpConfiguration
var callOptions: CallOptions

public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) {
self.channel = channel
traceClient = Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient(channel: channel)
self.config = config
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else if let headers = config.headers {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
}
}
}


public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
let exportRequest = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: spans)

public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
let exportRequest = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: spans)
}
let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout)
if timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds)))
}

let export = traceClient.export(exportRequest, callOptions: callOptions)

do {
// wait() on the response to stop the program from exiting before the response is received.
_ = try export.response.wait()
return .success
} catch {
return .failure
}
}
let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout)
if timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds)))

public func flush(explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
return .success
}

let export = traceClient.export(exportRequest, callOptions: callOptions)

do {
// wait() on the response to stop the program from exiting before the response is received.
_ = try export.response.wait()
return .success
} catch {
return .failure

public func shutdown(explicitTimeout: TimeInterval? = nil) {
_ = channel.close()
}
}

public func flush(explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
return .success
}

public func shutdown(explicitTimeout: TimeInterval? = nil) {
_ = channel.close()
}
}
Loading

0 comments on commit 297d91f

Please sign in to comment.