Skip to content

Commit

Permalink
feat(server): return event count as prom metrics (#1135)
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Marton <[email protected]>
Co-authored-by: Krisztian Gacsal <[email protected]>
  • Loading branch information
hekike and chrisgacsal authored Jul 4, 2024
1 parent e394f7e commit 7d35c53
Show file tree
Hide file tree
Showing 16 changed files with 900 additions and 350 deletions.
375 changes: 205 additions & 170 deletions api/api.gen.go

Large diffs are not rendered by default.

454 changes: 284 additions & 170 deletions api/client/go/client.gen.go

Large diffs are not rendered by default.

30 changes: 29 additions & 1 deletion api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ tags:
description: |
Endpoints related to entitlements.
[Learn more](https://openmeter.io/docs/monetize/entitlements/overview)
- name: Debug
description: |
Debugging endpoints. Useful for debugging and testing.
paths:
/api/v1/events:
get:
Expand Down Expand Up @@ -152,7 +155,6 @@ paths:
$ref: "#/components/responses/UnauthorizedProblemResponse"
default:
$ref: "#/components/responses/UnexpectedProblemResponse"

/api/v1/meters:
get:
operationId: listMeters
Expand Down Expand Up @@ -1158,6 +1160,32 @@ paths:
default:
$ref: "#/components/responses/UnexpectedProblemResponse"

/api/v1/debug/metrics:
get:
summary: Get event metrics
description: |
Returns debug metrics like the number of ingested events since mindnight UTC.
The OpenMetrics Counter(s) reset every day at midnight UTC.
operationId: getDebugMetrics
tags:
- Debug
responses:
"200":
description: Dbeug metrics, like number of ingested events.
content:
text/plain:
schema:
type: string
example: |-
# HELP openmeter_events Number of ingested events
# TYPE openmeter_events counter
openmeter_events_total{subject="customer-1"} 12345.0
openmeter_events_total{subject="customer-1",error="true"} 1.0
"401":
$ref: "#/components/responses/UnauthorizedProblemResponse"
default:
$ref: "#/components/responses/UnexpectedProblemResponse"

components:
schemas:
SharedMetaFields:
Expand Down
3 changes: 3 additions & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/openmeterio/openmeter/internal/credit"
creditpgadapter "github.com/openmeterio/openmeter/internal/credit/postgresadapter"
creditdb "github.com/openmeterio/openmeter/internal/credit/postgresadapter/ent/db"
"github.com/openmeterio/openmeter/internal/debug"
"github.com/openmeterio/openmeter/internal/entitlement"
booleanentitlement "github.com/openmeterio/openmeter/internal/entitlement/boolean"
meteredentitlement "github.com/openmeterio/openmeter/internal/entitlement/metered"
Expand Down Expand Up @@ -296,6 +297,7 @@ func main() {
}
}

debugConnector := debug.NewDebugConnector(streamingConnector)
var entitlementConnector entitlement.Connector
var meteredEntitlementConnector meteredentitlement.Connector
var featureConnector productcatalog.FeatureConnector
Expand Down Expand Up @@ -375,6 +377,7 @@ func main() {
PortalCORSEnabled: conf.Portal.CORS.Enabled,
ErrorHandler: errorsx.NewAppHandler(errorsx.NewSlogHandler(logger)),
// deps
DebugConnector: debugConnector,
FeatureConnector: featureConnector,
EntitlementConnector: entitlementConnector,
EntitlementBalanceConnector: meteredEntitlementConnector,
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ require (
github.com/peterbourgon/ctxdata/v4 v4.0.0
github.com/peterldowns/pgtestdb v0.0.14
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.53.0
github.com/redis/go-redis/extra/redisotel/v9 v9.5.3
github.com/redis/go-redis/v9 v9.5.3
github.com/sagikazarmark/mapstructurex v0.1.0
Expand Down Expand Up @@ -315,8 +317,6 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/sftp v1.13.6 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.53.0 // indirect
github.com/prometheus/procfs v0.15.0 // indirect
github.com/pusher/pusher-http-go v4.0.1+incompatible // indirect
github.com/quipo/dependencysolver v0.0.0-20170801134659-2b009cb4ddcc // indirect
Expand Down
90 changes: 90 additions & 0 deletions internal/debug/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package debug

import (
"bytes"
"context"
"fmt"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/openmeterio/openmeter/internal/streaming"
)

// DebugConnector is a connector for debug metrics.
type DebugConnector interface {
GetDebugMetrics(ctx context.Context, namespace string) (string, error)
}

// debugConnector is the internal implementation of the DebugConnector interface.
type debugConnector struct {
streaming streaming.Connector
}

// NewDebugConnector creates a new DebugConnector.
func NewDebugConnector(streaming streaming.Connector) DebugConnector {
return &debugConnector{
streaming: streaming,
}
}

// GetDebugMetrics returns metrics in an OpenMetrics (Prometheus) format for debugging purposes.
// It is useful to monitor the number of events ingested on the vendor side.
func (c *debugConnector) GetDebugMetrics(ctx context.Context, namespace string) (string, error) {
// Start from the beginning of the day
queryParams := streaming.CountEventsParams{
From: time.Now().Truncate(time.Hour * 24).UTC(),
}

// Query events counts
rows, err := c.streaming.CountEvents(ctx, namespace, queryParams)
if err != nil {
return "", fmt.Errorf("connector count events: %w", err)
}

// Convert to Prometheus metrics
var metrics []*dto.Metric
for _, row := range rows {
metric := &dto.Metric{
Label: []*dto.LabelPair{
{
Name: proto.String("subject"),
Value: proto.String(row.Subject),
},
},
Counter: &dto.Counter{
// We can lose precision here
Value: proto.Float64(float64(row.Count)),
CreatedTimestamp: timestamppb.New(time.Now()),
},
}

if row.IsError {
metric.Label = append(metric.Label, &dto.LabelPair{
Name: proto.String("error"),
Value: proto.String("true"),
})
}

metrics = append(metrics, metric)
}

family := &dto.MetricFamily{
Name: proto.String("openmeter_events_total"),
Help: proto.String("Number of ingested events"),
Type: dto.MetricType_COUNTER.Enum(),
Unit: proto.String("events"),
Metric: metrics,
}

var out bytes.Buffer
_, err = expfmt.MetricFamilyToOpenMetrics(&out, family)
if err != nil {
return "", fmt.Errorf("convert metric family to OpenMetrics: %w", err)
}

return out.String(), nil
}
89 changes: 89 additions & 0 deletions internal/debug/httpdriver/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package httpdriver

import (
"context"
"errors"
"net/http"

"github.com/openmeterio/openmeter/internal/debug"
"github.com/openmeterio/openmeter/internal/namespace/namespacedriver"
"github.com/openmeterio/openmeter/pkg/framework/commonhttp"
"github.com/openmeterio/openmeter/pkg/framework/transport/httptransport"
"github.com/openmeterio/openmeter/pkg/models"
)

type DebugHandler interface {
GetMetrics() GetMetricsHandler
}

type debugHandler struct {
namespaceDecoder namespacedriver.NamespaceDecoder
debugConnector debug.DebugConnector
options []httptransport.HandlerOption
}

func NewDebugHandler(
namespaceDecoder namespacedriver.NamespaceDecoder,
debugConnector debug.DebugConnector,
options ...httptransport.HandlerOption,
) DebugHandler {
return &debugHandler{
namespaceDecoder: namespaceDecoder,
debugConnector: debugConnector,
options: options,
}
}

type GetMetricsHandlerRequestParams struct {
Namespace string
}

type GetMetricsHandlerRequest struct {
params GetMetricsHandlerRequestParams
}
type GetMetricsHandlerResponse = string
type GetMetricsHandlerParams struct{}
type GetMetricsHandler httptransport.HandlerWithArgs[GetMetricsHandlerRequest, GetMetricsHandlerResponse, GetMetricsHandlerParams]

func (h *debugHandler) GetMetrics() GetMetricsHandler {
return httptransport.NewHandlerWithArgs[GetMetricsHandlerRequest, string, GetMetricsHandlerParams](
func(ctx context.Context, r *http.Request, params GetMetricsHandlerParams) (GetMetricsHandlerRequest, error) {
ns, err := h.resolveNamespace(ctx)
if err != nil {
return GetMetricsHandlerRequest{}, err
}

return GetMetricsHandlerRequest{
params: GetMetricsHandlerRequestParams{
Namespace: ns,
},
}, nil
},
func(ctx context.Context, request GetMetricsHandlerRequest) (string, error) {
return h.debugConnector.GetDebugMetrics(ctx, request.params.Namespace)
},
commonhttp.PlainTextResponseEncoder[string],
httptransport.AppendOptions(
h.options,
httptransport.WithErrorEncoder(func(ctx context.Context, err error, w http.ResponseWriter) bool {
if _, ok := err.(*models.GenericUserError); ok {
commonhttp.NewHTTPError(
http.StatusBadRequest,
err,
).EncodeError(ctx, w)
return true
}
return false
}),
)...,
)
}

func (h *debugHandler) resolveNamespace(ctx context.Context) (string, error) {
ns, ok := h.namespaceDecoder.GetNamespace(ctx)
if !ok {
return "", commonhttp.NewHTTPError(http.StatusInternalServerError, errors.New("internal server error"))
}

return ns, nil
}
13 changes: 13 additions & 0 deletions internal/server/router/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package router

import (
"net/http"

"github.com/openmeterio/openmeter/internal/debug/httpdriver"
)

// Get debug metrics
// (GET /api/v1/debug/metrics)
func (a *Router) GetDebugMetrics(w http.ResponseWriter, r *http.Request) {
a.debugHandler.GetMetrics().With(httpdriver.GetMetricsHandlerParams{}).ServeHTTP(w, r)
}
14 changes: 14 additions & 0 deletions internal/server/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/openmeterio/openmeter/api"
"github.com/openmeterio/openmeter/internal/credit"
credit_httpdriver "github.com/openmeterio/openmeter/internal/credit/httpdriver"
"github.com/openmeterio/openmeter/internal/debug"
debug_httpdriver "github.com/openmeterio/openmeter/internal/debug/httpdriver"
"github.com/openmeterio/openmeter/internal/entitlement"
entitlement_httpdriver "github.com/openmeterio/openmeter/internal/entitlement/httpdriver"
meteredentitlement "github.com/openmeterio/openmeter/internal/entitlement/metered"
Expand Down Expand Up @@ -55,6 +57,7 @@ type Config struct {
ErrorHandler errorsx.Handler

// deps
DebugConnector debug.DebugConnector
FeatureConnector productcatalog.FeatureConnector
EntitlementConnector entitlement.Connector
EntitlementBalanceConnector meteredentitlement.Connector
Expand Down Expand Up @@ -87,6 +90,10 @@ func (c Config) Validate() error {
return errors.New("streaming connector is required")
}

if c.DebugConnector == nil {
return errors.New("debug connector is required")
}

if c.EntitlementsEnabled {
if c.FeatureConnector == nil {
return errors.New("feature connector is required")
Expand All @@ -113,6 +120,7 @@ type Router struct {

featureHandler productcatalog_httpdriver.FeatureHandler
creditHandler credit_httpdriver.GrantHandler
debugHandler debug_httpdriver.DebugHandler
entitlementHandler entitlement_httpdriver.EntitlementHandler
meteredEntitlementHandler entitlement_httpdriver.MeteredEntitlementHandler
}
Expand All @@ -131,6 +139,12 @@ func NewRouter(config Config) (*Router, error) {

staticNamespaceDecoder := namespacedriver.StaticNamespaceDecoder(config.NamespaceManager.GetDefaultNamespace())

router.debugHandler = debug_httpdriver.NewDebugHandler(
staticNamespaceDecoder,
config.DebugConnector,
httptransport.WithErrorHandler(config.ErrorHandler),
)

if config.EntitlementsEnabled {
router.featureHandler = productcatalog_httpdriver.NewFeatureHandler(
config.FeatureConnector,
Expand Down
Loading

0 comments on commit 7d35c53

Please sign in to comment.