diff --git a/README.md b/README.md index d94ae72..14526dc 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,7 @@ Databricks) and/or Spark telemetry from any Spark deployment. See the * [Authentication](#authentication) * [Consumption & Cost Data](#consumption--cost-data) * [Job Run Data](#job-run-data) + * [Pipeline Event Logs](#pipeline-event-logs) * [Building](#building) * [Coding Conventions](#coding-conventions) * [Local Development](#local-development) @@ -152,6 +153,17 @@ to add the following environment variables. the OAuth client ID for the service principal * `NEW_RELIC_DATABRICKS_OAUTH_CLIENT_SECRET` - To [use a service principal to authenticate with Databricks (OAuth M2M)](https://docs.databricks.com/en/dev-tools/auth/oauth-m2m.html), an OAuth client secret associated with the service principal +* `NEW_RELIC_DATABRICKS_USAGE_ENABLED` - Set to `true` to enable collection of + [consumption and cost data](#consumption--cost-data) from this cluster node or + `false` to disable collection. Defaults to `true`. +* `NEW_RELIC_DATABRICKS_SQL_WAREHOUSE` - The ID of a [SQL warehouse](https://docs.databricks.com/en/compute/sql-warehouse/index.html) + where usage queries should be run +* `NEW_RELIC_DATABRICKS_JOB_RUNS_ENABLED` - Set to `true` to enable collection + of [job run data](#job-run-data) from this cluster node or `false` to disable + collection. Defaults to `true`. +* `NEW_RELIC_DATABRICKS_PIPELINE_EVENT_LOGS_ENABLED` - Set to `true` to enable + collection of Databricks Delta Live Tables [pipeline event logs](#pipeline-event-logs) + from this cluster node or `false` to disable collection. Defaults to `true`. Note that the `NEW_RELIC_API_KEY` and `NEW_RELIC_ACCOUNT_ID` are currently unused but are required by the [new-relic-client-go](https://github.com/newrelic/newrelic-client-go) @@ -181,6 +193,23 @@ The New Relic Databricks integration supports the following capabilities. This data can be used to show Databricks DBU consumption metrics and estimated Databricks costs directly within New Relic. +* Collect Databricks job run telemetry + + The New Relic Databricks integration can collect telemetry about + [Databricks Job](https://docs.databricks.com/en/jobs/index.html#what-are-databricks-jobs) + runs, such as job run durations, task run durations, the current state of job + and task runs, if a job or a task is a retry, and the number of times a task + was retried. + +* Collect Databricks Delta Live Tables Pipeline event logs + + The New Relic Databricks integration can collect [Databricks Delta Live Tables Pipeline event logs](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log) + for all [Databricks Delta Live Tables Pipelines](https://docs.databricks.com/en/delta-live-tables/develop-pipelines.html) + defined in a [workspace](https://docs.databricks.com/en/getting-started/concepts.html#accounts-and-workspaces). + [Databricks Delta Live Tables Pipeline event log](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log) + entries for every [pipeline update](https://docs.databricks.com/en/delta-live-tables/updates.html) + are collected and sent to [New Relic Logs](https://docs.newrelic.com/docs/logs/get-started/get-started-log-management/). + ## Usage ### Command Line Options @@ -551,6 +580,16 @@ the Databricks collector settings related to the collection of This element groups together the configuration parameters to [configure](#databricks-job-configuration) the Databricks collector settings related to the collection of job data. +###### `pipelines` + +| Description | Valid Values | Required | Default | +| --- | --- | --- | --- | +| The root node for the set of [Databricks Pipeline configuration](#databricks-pipeline-configuration) parameters | YAML Mapping | N | N/a | + +This element groups together the configuration parameters to [configure](#databricks-pipeline-configuration) +the Databricks collector settings related to the collection of [Databricks Delta Live Tables Pipelines](https://docs.databricks.com/en/delta-live-tables/develop-pipelines.html) +telemetry. + ##### Databricks `spark` configuration ###### Databricks Spark `enabled` @@ -853,6 +892,44 @@ particular time in the past calculated as an offset from the current time. See the section [`startOffset` Configuration](#startoffset-configuration) for more details. +##### Databricks Pipeline configuration + +###### `logs` + +| Description | Valid Values | Required | Default | +| --- | --- | --- | --- | +| The root node for the set of [Databricks Pipeline Event Logs configuration](#databricks-pipeline-event-logs-configuration) parameters | YAML Mapping | N | N/a | + +This element groups together the configuration parameters to [configure](#databricks-pipeline-event-logs-configuration) +the Databricks collector settings related to the collection of Databricks Delta +Live Tables [pipeline event logs](#pipeline-event-logs). + +##### Databricks pipeline event logs configuration + +The Databricks pipeline event logs configuration parameters are used to +configure Databricks collector settings related to the collection of Databricks +Delta Live Tables [pipeline event logs](#pipeline-event-logs). + +###### Databricks pipeline event logs `enabled` + +| Description | Valid Values | Required | Default | +| --- | --- | --- | --- | +| Flag to enable automatic collection of Databricks Delta Live Tables [pipeline event logs](#pipeline-event-logs) | `true` / `false` | N | `true` | + +By default, when the Databricks collector is enabled, it will automatically +collect Databricks Delta Live Tables [pipeline event logs](#pipeline-event-logs). + +This flag can be used to disable the collection of Databricks Delta Live Tables +[pipeline event logs](#pipeline-event-logs) by the Databricks collector. This +may be useful when running multiple instances of the New Relic Databricks +integration against the same Databricks [workspace](https://docs.databricks.com/en/getting-started/concepts.html#accounts-and-workspaces). +In this scenario, the collection of Databricks Delta Live Tables [pipeline event logs](#pipeline-event-logs) +should _only_ be enabled on a single instance of the integration. Otherwise, +duplicate New Relic log entries will be created for each Databricks Delta Live +Tables [pipeline event log](#pipeline-event-logs) entry, making troubleshooting +challenging and affecting product features that rely on signal integrity such as +[anomaly detection](https://docs.newrelic.com/docs/alerts/create-alert/set-thresholds/anomaly-detection/). + ##### Spark configuration The Spark configuration parameters are used to configure the Spark collector. @@ -1472,6 +1549,75 @@ statements to use to visualize the data. ![Sample job runs dashboard image](./examples/job-runs-dashboard.png) +### Pipeline Event Logs + +The New Relic Databricks integration can collect [Databricks Delta Live Tables Pipeline event logs](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log) +for all [Databricks Delta Live Tables Pipelines](https://docs.databricks.com/en/delta-live-tables/develop-pipelines.html) +defined in a [workspace](https://docs.databricks.com/en/getting-started/concepts.html#accounts-and-workspaces). +[Databricks Delta Live Tables Pipeline event log](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log) +entries for every [pipeline update](https://docs.databricks.com/en/delta-live-tables/updates.html) +are collected and sent to [New Relic Logs](https://docs.newrelic.com/docs/logs/get-started/get-started-log-management/). + +**NOTE:** Some of the text below is sourced from the +[Databricks Delta Live Tables pipeline event log schema documentation](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log-schema) +and the [Databricks SDK Go module documentation](https://pkg.go.dev/github.com/databricks/databricks-sdk-go). + +#### Pipeline Event Log Data + +[Databricks Delta Live Tables Pipeline event logs](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log) +are sent to New Relic as [New Relic Log data](https://docs.newrelic.com/docs/data-apis/understand-data/new-relic-data-types/#log-data). +For each [Databricks Delta Live Tables Pipeline event log](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log) +entry, the [fields from the event log entry](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log-schema) +are mapped to attributes on the corresponding New Relic log entry as follows. + +**NOTE:** Due to the way in which the [Databricks ReST API](https://docs.databricks.com/api/workspace/introduction) +returns [Databricks Delta Live Tables Pipeline event log](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log) +entries (in descending order by timestamp), later event log entries may +sometimes be visible in the [New Relic Logs UI](https://docs.newrelic.com/docs/logs/ui-data/use-logs-ui/) +before older event log entries are visible. This does not affect the temporal +ordering of the log entries. Once older log entries become visible (usually +30-60s longer than the value of the [`interval` configuration parameter](#interval)), +they are properly ordered by timestamp in the [New Relic Logs UI](https://docs.newrelic.com/docs/logs/ui-data/use-logs-ui/). + +##### Pipeline event log attributes + +| Pipeline Event Log Field Name | New Relic Log Entry Attribute Name | Data Type in New Relic Log Entry | Description | +| --- | --- | --- | --- | +| `message` | `message` | string | A human-readable message describing the event | +| `timestamp` | `timestamp` | integer | The time (in milliseconds since the epoch) the event was recorded | +| `id` | `databricksPipelineEventId` | string | A unique identifier for the event log record | +| `event_type` | `databricksPipelineEventType` | string | The event type | +| `level` | `level`, `databricksPipelineEventLevel` | string | The severity level of the event, for example, `INFO`, `WARN`, `ERROR`, or `METRICS` | +| `maturity_level` | `databricksPipelineEventMaturityLevel` | string | The stability of the event schema, one of `STABLE`, `NULL`, `EVOLVING`, or `DEPRECATED` | +| n/a | `databricksPipelineEventError` | boolean | `true` if an error was captured by the event (see below for additional details), otherwise `false` | +| `origin.batch_id` | `databricksPipelineEventBatchId` | integer | The id of a batch. Unique within a flow. | +| `origin.cloud` | `databricksPipelineEventCloud` | string | The cloud provider, e.g., AWS or Azure | +| `origin.cluster_id` | `databricksPipelineEventClusterId` | string | The id of the cluster where an execution happens. Unique within a region. | +| `origin.dataset_name` | `databricksPipelineEventDatasetName` | string | The name of a dataset. Unique within a pipeline. | +| `origin.flow_id` | `databricksPipelineEventFlowId` | string | The id of the flow. Globally unique. | +| `origin.flow_name` | `databricksPipelineEventFlowName` | string | The name of the flow. Not unique. | +| `origin.host` | `databricksPipelineEventHost` | string | The optional host name where the event was triggered | +| `origin.maintenance_id` | `databricksPipelineEventMaintenanceId` | string | The id of a maintenance run. Globally unique. | +| `origin.materialization_name` | `databricksPipelineEventMaterializationName` | string | Materialization name | +| `origin.org_id` | `databricksPipelineEventOrgId` | integer | The org id of the user. Unique within a cloud. | +| `origin.pipeline_id` | `databricksPipelineEventPipelineId` | string | The id of the pipeline. Globally unique. | +| `origin.pipeline_name` | `databricksPipelineEventPipelineName` | string | The name of the pipeline. Not unique. | +| `origin.region` | `databricksPipelineEventRegion` | string | The cloud region | +| `origin.request_id` | `databricksPipelineEventRequestId` | string | The id of the request that caused an update | +| `origin.table_id` | `databricksPipelineEventTableId` | string | The id of a (delta) table. Globally unique. | +| `origin.uc_resource_id` | `databricksPipelineEventUcResourceId` | string | The Unity Catalog id of the MV or ST being updated | +| `origin.update_id` | `databricksPipelineEventUpdateId` | string | The id of an execution. Globally unique. | + +Additionally, if the `error` field is set on the pipeline event log entry, +indicating an error was captured by the event, the following attributes are +added to the New Relic log entry. + +| Pipeline Event Log Field Name | New Relic Log Entry Attribute Name | Data Type in New Relic Log Entry | Description | +| --- | --- | --- | --- | +| `error.fatal` | `databricksPipelineEventErrorFatal` | boolean | Whether the error is considered fatal, that is, unrecoverable | +| `error.exceptions[N].class_name` | `databricksPipelineEventErrorExceptionNClassName` | string | Runtime class of the `N`th exception | +| `error.exceptions[N].message` | `databricksPipelineEventErrorExceptionNMessage` | string | Exception message of the `N`th exception | + ## Building ### Coding Conventions diff --git a/cmd/databricks/databricks.go b/cmd/databricks/databricks.go index c1cd152..5a0eb7a 100644 --- a/cmd/databricks/databricks.go +++ b/cmd/databricks/databricks.go @@ -29,6 +29,7 @@ func main() { integration.WithApiKey(), integration.WithEvents(ctx), integration.WithLastUpdate(), + integration.WithLogs(ctx), ) fatalIfErr(err) diff --git a/configs/config.template.yml b/configs/config.template.yml index 9f702db..7699198 100644 --- a/configs/config.template.yml +++ b/configs/config.template.yml @@ -41,6 +41,9 @@ databricks: includeIdentityMetadata: false includeRunId: false startOffset: 86400 + pipelines: + logs: + enabled: true spark: webUiUrl: http://localhost:4040 diff --git a/examples/dlt_pipeline_events_logs_ui.png b/examples/dlt_pipeline_events_logs_ui.png new file mode 100644 index 0000000..10399d5 Binary files /dev/null and b/examples/dlt_pipeline_events_logs_ui.png differ diff --git a/go.mod b/go.mod index cf8747c..c668fbf 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21 toolchain go1.22.4 require ( - github.com/databricks/databricks-sdk-go v0.46.0 + github.com/databricks/databricks-sdk-go v0.52.0 github.com/newrelic/newrelic-client-go/v2 v2.45.0 github.com/newrelic/newrelic-labs-sdk/v2 v2.1.0 ) diff --git a/go.sum b/go.sum index 1579300..1414704 100644 --- a/go.sum +++ b/go.sum @@ -17,8 +17,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/containerd/containerd v1.3.7/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA= github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= -github.com/databricks/databricks-sdk-go v0.46.0 h1:D0TxmtSVAOsdnfzH4OGtAmcq+8TyA7Z6fA6JEYhupeY= -github.com/databricks/databricks-sdk-go v0.46.0/go.mod h1:ds+zbv5mlQG7nFEU5ojLtgN/u0/9YzZmKQES/CfedzU= +github.com/databricks/databricks-sdk-go v0.52.0 h1:WKcj0F+pdx0gjI5xMicjYC4O43S2q5nyTpaGGMFmgHw= +github.com/databricks/databricks-sdk-go v0.52.0/go.mod h1:ds+zbv5mlQG7nFEU5ojLtgN/u0/9YzZmKQES/CfedzU= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= diff --git a/init/cluster_init_integration.sh b/init/cluster_init_integration.sh index 11975e8..2e9359d 100755 --- a/init/cluster_init_integration.sh +++ b/init/cluster_init_integration.sh @@ -33,7 +33,23 @@ databricks: accessToken: "$NEW_RELIC_DATABRICKS_ACCESS_TOKEN" oauthClientId: "$NEW_RELIC_DATABRICKS_OAUTH_CLIENT_ID" oauthClientSecret: "$NEW_RELIC_DATABRICKS_OAUTH_CLIENT_SECRET" - sparkMetrics: false + spark: + enabled: false + usage: + enabled: $NEW_RELIC_DATABRICKS_USAGE_ENABLED + warehouseId: "$NEW_RELIC_DATABRICKS_SQL_WAREHOUSE" + includeIdentityMetadata: false + runTime: 02:00:00 + jobs: + runs: + enabled: $NEW_RELIC_DATABRICKS_JOB_RUNS_ENABLED + metricPrefix: databricks. + includeIdentityMetadata: false + includeRunId: false + startOffset: 86400 + pipelines: + logs: + enabled: $NEW_RELIC_DATABRICKS_PIPELINE_EVENT_LOGS_ENABLED spark: webUiUrl: http://{UI_HOST}:{UI_PORT} metricPrefix: spark. diff --git a/internal/databricks/databricks.go b/internal/databricks/databricks.go index 4aaee6e..6724946 100644 --- a/internal/databricks/databricks.go +++ b/internal/databricks/databricks.go @@ -179,6 +179,29 @@ func InitPipelines( i.AddComponent(mp) } + collectPipelineEventLogs := true + if viper.IsSet("databricks.pipelines.logs.enabled") { + collectPipelineEventLogs = viper.GetBool( + "databricks.pipelines.logs.enabled", + ) + } + + if collectPipelineEventLogs { + databricksPipelineEventsReceiver := + NewDatabricksPipelineEventsReceiver(i, w, tags) + + // Create a logs pipeline for the event logs + lp := pipeline.NewLogsPipeline( + "databricks-pipeline-event-logs-pipeline", + ) + lp.AddReceiver(databricksPipelineEventsReceiver) + lp.AddExporter(newRelicExporter) + + log.Debugf("initializing Databricks pipeline event logs pipeline") + + i.AddComponent(lp) + } + return nil } diff --git a/internal/databricks/jobs.go b/internal/databricks/jobs.go index 1c7a141..ae1c611 100644 --- a/internal/databricks/jobs.go +++ b/internal/databricks/jobs.go @@ -516,35 +516,3 @@ func writeCounters( writer, ) } - -func writeGauge( - prefix string, - metricName string, - metricValue any, - attrs map[string]interface{}, - writer chan <- model.Metric, -) { - metric := model.NewGaugeMetric( - prefix + metricName, - model.MakeNumeric(metricValue), - time.Now(), - ) - - for k, v := range attrs { - metric.Attributes[k] = v - } - - writer <- metric -} - -func makeAttributesMap( - tags map[string]string, -) map[string]interface{} { - attrs := make(map[string]interface{}) - - for k, v := range tags { - attrs[k] = v - } - - return attrs -} diff --git a/internal/databricks/pipelines.go b/internal/databricks/pipelines.go new file mode 100644 index 0000000..8d391ec --- /dev/null +++ b/internal/databricks/pipelines.go @@ -0,0 +1,184 @@ +package databricks + +import ( + "context" + "fmt" + "time" + + databricksSdk "github.com/databricks/databricks-sdk-go" + databricksSdkPipelines "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/newrelic/newrelic-labs-sdk/v2/pkg/integration" + "github.com/newrelic/newrelic-labs-sdk/v2/pkg/integration/log" + "github.com/newrelic/newrelic-labs-sdk/v2/pkg/integration/model" +) + +const ( + RFC_3339_MILLI_LAYOUT = "2006-01-02T15:04:05.000Z07:00" +) + +type DatabricksPipelineEventsReceiver struct { + i *integration.LabsIntegration + w *databricksSdk.WorkspaceClient + tags map[string]string +} + +func NewDatabricksPipelineEventsReceiver( + i *integration.LabsIntegration, + w *databricksSdk.WorkspaceClient, + tags map[string]string, +) *DatabricksPipelineEventsReceiver { + return &DatabricksPipelineEventsReceiver{ + i, + w, + tags, + } +} + +func (d *DatabricksPipelineEventsReceiver) GetId() string { + return "databricks-pipeline-events-receiver" +} + +func (d *DatabricksPipelineEventsReceiver) PollLogs( + ctx context.Context, + writer chan <- model.Log, +) error { + lastRun := time.Now().Add(-d.i.Interval * time.Second) + + all := d.w.Pipelines.ListPipelines( + ctx, + databricksSdkPipelines.ListPipelinesRequest{}, + ) + + LOOP: + + for ; all.HasNext(ctx); { + pipelineStateInfo, err := all.Next(ctx) + if err != nil { + return err + } + + log.Debugf( + "processing pipeline events for pipeline %s (%s) with state %s", + pipelineStateInfo.PipelineId, + pipelineStateInfo.Name, + pipelineStateInfo.State, + ) + + allEvents := d.w.Pipelines.ListPipelineEvents( + ctx, + databricksSdkPipelines.ListPipelineEventsRequest{ + Filter: "timestamp >= '" + + lastRun.UTC().Format(RFC_3339_MILLI_LAYOUT) + + "'", + PipelineId: pipelineStateInfo.PipelineId, + }, + ) + + count := 0 + + for ; allEvents.HasNext(ctx); { + pipelineEvent, err := allEvents.Next(ctx) + if err != nil { + log.Warnf( + "unexpected no more items error or failed to fetch next page of pipeline events for pipeline %s (%s): %v", + pipelineStateInfo.PipelineId, + pipelineStateInfo.Name, + err, + ) + continue LOOP + } + + // eventTimestamp is not used until the end of the loop but by + // parsing here we can abort if an error occurs and skip allocating + // and populating the attributes map. + + eventTimestamp, err := time.Parse( + RFC_3339_MILLI_LAYOUT, + pipelineEvent.Timestamp, + ) + if err != nil { + log.Warnf( + "ignoring event with ID %s with invalid timestamp %s for pipeline %s (%s): %v", + pipelineEvent.Id, + pipelineEvent.Timestamp, + pipelineStateInfo.PipelineId, + pipelineStateInfo.Name, + err, + ) + continue + } + + attrs := makeAttributesMap(d.tags) + + attrs["databricksPipelineEventId"] = pipelineEvent.Id + attrs["databricksPipelineEventType"] = pipelineEvent.EventType + attrs["level"] = pipelineEvent.Level + attrs["databricksPipelineEventLevel"] = pipelineEvent.Level + attrs["databricksPipelineEventMaturityLevel"] = + pipelineEvent.MaturityLevel + + isError := pipelineEvent.Error != nil + + attrs["databricksPipelineEventError"] = isError + + if isError { + attrs["databricksPipelineEventErrorFatal"] = + pipelineEvent.Error.Fatal + + if len(pipelineEvent.Error.Exceptions) > 0 { + for i, e := range pipelineEvent.Error.Exceptions { + attrName := fmt.Sprintf( + "databricksPipelineEventErrorException%dClassName", + i + 1, + ) + attrs[attrName] = e.ClassName + + attrName = fmt.Sprintf( + "databricksPipelineEventErrorException%dMessage", + i + 1, + ) + attrs[attrName] = e.Message + } + } + } + + if pipelineEvent.Origin != nil { + origin := pipelineEvent.Origin + + attrs["databricksPipelineEventBatchId"] = origin.BatchId + attrs["databricksPipelineEventCloud"] = origin.Cloud + attrs["databricksPipelineEventClusterId"] = origin.ClusterId + attrs["databricksPipelineEventDatasetName"] = origin.DatasetName + attrs["databricksPipelineEventFlowId"] = origin.FlowId + attrs["databricksPipelineEventFlowName"] = origin.FlowName + attrs["databricksPipelineEventHost"] = origin.Host + attrs["databricksPipelineEventMaintenanceId"] = + origin.MaintenanceId + attrs["databricksPipelineEventMaterializationName"] = + origin.MaterializationName + attrs["databricksPipelineEventOrgId"] = origin.OrgId + attrs["databricksPipelineEventPipelineId"] = origin.PipelineId + attrs["databricksPipelineEventPipelineName"] = + origin.PipelineName + attrs["databricksPipelineEventRegion"] = origin.Region + attrs["databricksPipelineEventRequestId"] = origin.RequestId + attrs["databricksPipelineEventTableId"] = origin.TableId + attrs["databricksPipelineEventUcResourceId"] = + origin.UcResourceId + attrs["databricksPipelineEventUpdateId"] = origin.UpdateId + } + + writer <- model.NewLog( + pipelineEvent.Message, + attrs, + eventTimestamp, + ) + + count += 1 + } + + log.Debugf("received %d pipeline events", count) + } + + return nil +} diff --git a/internal/databricks/util.go b/internal/databricks/util.go new file mode 100644 index 0000000..3de77d8 --- /dev/null +++ b/internal/databricks/util.go @@ -0,0 +1,39 @@ +package databricks + +import ( + "time" + + "github.com/newrelic/newrelic-labs-sdk/v2/pkg/integration/model" +) + +func writeGauge( + prefix string, + metricName string, + metricValue any, + attrs map[string]interface{}, + writer chan <- model.Metric, +) { + metric := model.NewGaugeMetric( + prefix + metricName, + model.MakeNumeric(metricValue), + time.Now(), + ) + + for k, v := range attrs { + metric.Attributes[k] = v + } + + writer <- metric +} + +func makeAttributesMap( + tags map[string]string, +) map[string]interface{} { + attrs := make(map[string]interface{}) + + for k, v := range tags { + attrs[k] = v + } + + return attrs +}