Skip to content

Commit

Permalink
Create viewership API (#52)
Browse files Browse the repository at this point in the history
* api: Create separate helper for stream health APIs

* api: Make cors a handler middleware as well

* go.mod: Add prometheus and LP API libs

* views: Create views client with start views query

* views: Create client constructor

* api: Create views API

* cmd: Initialize views client

* api: Actually register viewership handlers

ops

* views: Fix handling of prometheus response

* views: Create helper for start views query

* views: Get only playbackID on query

* [DEV-REVERT]: Allow querying for any playback ID

Just some testing hack

* views: Update query

* views: Query by playbackRecordingId as well

* Revert "[DEV-REVERT]: Allow querying for any playback ID"

This reverts commit 30e03cd.
  • Loading branch information
victorges authored Sep 9, 2022
1 parent 5ad4b61 commit b2a1cac
Show file tree
Hide file tree
Showing 6 changed files with 476 additions and 20 deletions.
41 changes: 34 additions & 7 deletions api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/livepeer/livepeer-data/health"
"github.com/livepeer/livepeer-data/pkg/data"
"github.com/livepeer/livepeer-data/pkg/jsse"
"github.com/livepeer/livepeer-data/views"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

Expand All @@ -32,17 +33,26 @@ type apiHandler struct {
opts APIHandlerOptions
serverCtx context.Context
core *health.Core
views *views.Client
}

func NewHandler(serverCtx context.Context, opts APIHandlerOptions, healthcore *health.Core) http.Handler {
handler := &apiHandler{opts, serverCtx, healthcore}
func NewHandler(serverCtx context.Context, opts APIHandlerOptions, healthcore *health.Core, views *views.Client) http.Handler {
handler := &apiHandler{opts, serverCtx, healthcore, views}

router := httprouter.New()
router.HandlerFunc("GET", "/_healthz", handler.healthcheck)
if opts.Prometheus {
router.Handler("GET", "/metrics", promhttp.Handler())
}
addStreamHealthHandlers(router, handler)
addViewershipHandlers(router, handler)

globalMiddlewares := []middleware{handler.cors()}
return prepareHandler("", false, router, globalMiddlewares...)
}

func addStreamHealthHandlers(router *httprouter.Router, handler *apiHandler) {
healthcore, opts := handler.core, handler.opts
middlewares := []middleware{
streamStatus(healthcore, "streamId"),
regionProxy(opts.RegionalHostFormat, opts.OwnRegion),
Expand All @@ -57,15 +67,23 @@ func NewHandler(serverCtx context.Context, opts APIHandlerOptions, healthcore *h
}
addApiHandler("/health", "get_stream_health", handler.getStreamHealth)
addApiHandler("/events", "stream_health_events", handler.subscribeEvents)
}

globalMiddlewares := []middleware{cors(opts.ServerName)}
return prepareHandler("", false, router, globalMiddlewares...)
func addViewershipHandlers(router *httprouter.Router, handler *apiHandler) {
opts := handler.opts
// TODO: Add authorization to views API
addApiHandler := func(apiPath, name string, handler http.HandlerFunc) {
fullPath := path.Join(opts.APIRoot, "/views/:assetId", apiPath)
fullHandler := prepareHandlerFunc(name, opts.Prometheus, handler)
router.Handler("GET", fullPath, fullHandler)
}
addApiHandler("/total", "get_total_views", handler.getTotalViews)
}

func cors(server string) middleware {
func (h *apiHandler) cors() middleware {
return inlineMiddleware(func(rw http.ResponseWriter, r *http.Request, next http.Handler) {
if server != "" {
rw.Header().Set("Server", server)
if h.opts.ServerName != "" {
rw.Header().Set("Server", h.opts.ServerName)
}
rw.Header().Set("Access-Control-Allow-Origin", "*")
rw.Header().Set("Access-Control-Allow-Headers", "*")
Expand All @@ -81,6 +99,15 @@ func (h *apiHandler) healthcheck(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(status)
}

func (h *apiHandler) getTotalViews(rw http.ResponseWriter, r *http.Request) {
views, err := h.views.GetTotalViews(r.Context(), apiParam(r, "assetId"))
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
}
respondJson(rw, http.StatusOK, views)
}

func (h *apiHandler) getStreamHealth(rw http.ResponseWriter, r *http.Request) {
respondJson(rw, http.StatusOK, getStreamStatus(r))
}
Expand Down
5 changes: 3 additions & 2 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/golang/glog"
"github.com/livepeer/livepeer-data/health"
"github.com/livepeer/livepeer-data/views"
"golang.org/x/sync/errgroup"
)

Expand All @@ -19,10 +20,10 @@ type ServerOptions struct {
APIHandlerOptions
}

func ListenAndServe(ctx context.Context, opts ServerOptions, healthcore *health.Core) error {
func ListenAndServe(ctx context.Context, opts ServerOptions, healthcore *health.Core, views *views.Client) error {
srv := &http.Server{
Addr: fmt.Sprintf("%s:%d", opts.Host, opts.Port),
Handler: NewHandler(ctx, opts.APIHandlerOptions, healthcore),
Handler: NewHandler(ctx, opts.APIHandlerOptions, healthcore, views),
}
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
Expand Down
15 changes: 14 additions & 1 deletion cmd/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/livepeer/livepeer-data/health/reducers"
"github.com/livepeer/livepeer-data/pkg/event"
"github.com/livepeer/livepeer-data/pkg/mistconnector"
"github.com/livepeer/livepeer-data/views"
"github.com/peterbourgon/ff"
)

Expand All @@ -37,6 +38,8 @@ type cliFlags struct {
serverOpts api.ServerOptions
streamingOpts health.StreamingOptions
memoryRecordsTtl time.Duration

viewsOpts views.ClientOptions
}

func parseFlags(version string) cliFlags {
Expand Down Expand Up @@ -71,6 +74,11 @@ func parseFlags(version string) cliFlags {
fs.DurationVar(&cli.streamingOpts.EventFlowSilenceTolerance, "event-flow-silence-tolerance", 10*time.Minute, "The time to tolerate getting zero messages in the stream before giving an error on the service healthcheck")
fs.DurationVar(&cli.memoryRecordsTtl, "memory-records-ttl", 24*time.Hour, `How long to keep data records in memory about inactive streams`)

// Views client options
fs.StringVar(&cli.viewsOpts.Livepeer.Server, "livepeer-api-server", "localhost:3004", "Base URL for the Livepeer API")
fs.StringVar(&cli.viewsOpts.Livepeer.AccessToken, "livepeer-access-token", "", "Access token for Livepeer API")
fs.StringVar(&cli.viewsOpts.Prometheus.Address, "prometheus-address", "", "Address of the Prometheus API")

flag.Set("logtostderr", "true")
glogVFlag := flag.Lookup("v")
verbosity := fs.Int("v", 0, "Log verbosity {0-10}")
Expand Down Expand Up @@ -133,8 +141,13 @@ func Run(build BuildFlags) {
glog.Fatalf("Error starting health core. err=%q", err)
}

views, err := views.NewClient(cli.viewsOpts)
if err != nil {
glog.Fatalf("Error creating views client. err=%q", err)
}

glog.Info("Starting server...")
err = api.ListenAndServe(ctx, cli.serverOpts, healthcore)
err = api.ListenAndServe(ctx, cli.serverOpts, healthcore, views)
if err != nil {
glog.Fatalf("Error starting api server. err=%q", err)
}
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ module github.com/livepeer/livepeer-data
go 1.16

require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/golang/glog v0.0.0-20210429001901-424d2337a529
github.com/golang/glog v1.0.0
github.com/google/uuid v1.3.0
github.com/julienschmidt/httprouter v1.3.0
github.com/livepeer/go-api-client v0.2.8
github.com/peterbourgon/ff v1.7.0
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/common v0.37.0
github.com/rabbitmq/amqp091-go v1.1.0
github.com/rabbitmq/rabbitmq-stream-go-client v1.0.0-rc12
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand Down
Loading

0 comments on commit b2a1cac

Please sign in to comment.