diff --git a/amqp/amqp.go b/amqp/amqp.go index fb5d35b..c7b89fc 100644 --- a/amqp/amqp.go +++ b/amqp/amqp.go @@ -80,6 +80,8 @@ func (a *AMQP) recv(context context.Context, delivery amqp.Delivery) { err error ) + var log = log.WithContext(context) + if err = delivery.Ack(false); err != nil { log.Error(err) return @@ -113,7 +115,7 @@ func (a *AMQP) recv(context context.Context, delivery amqp.Delivery) { } func (a *AMQP) Send(context context.Context, routingKey string, data []byte) error { - log = log.WithFields(logrus.Fields{"context": "sending usage to QMS"}) + var log = log.WithFields(logrus.Fields{"context": "sending usage to QMS"}).WithContext(context) log.Debugf("routing key: %s, message: %s", routingKey, string(data)) return a.client.PublishContext(context, routingKey, data) } diff --git a/go.mod b/go.mod index b8143d6..ecd95e6 100644 --- a/go.mod +++ b/go.mod @@ -16,11 +16,12 @@ require ( github.com/sirupsen/logrus v1.8.1 github.com/spf13/viper v1.10.1 github.com/streadway/amqp v1.0.1-0.20200716223359-e6b33f460591 + github.com/uptrace/opentelemetry-go-extra/otellogrus v0.1.12 github.com/uptrace/opentelemetry-go-extra/otelsql v0.1.10 github.com/uptrace/opentelemetry-go-extra/otelsqlx v0.1.10 go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.30.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.30.0 - go.opentelemetry.io/otel v1.6.1 + go.opentelemetry.io/otel v1.6.3 go.uber.org/multierr v1.7.0 golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f ) @@ -48,17 +49,18 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.2.0 // indirect + github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.12 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.1 // indirect go.opentelemetry.io/otel/exporters/jaeger v1.6.1 // indirect go.opentelemetry.io/otel/internal/metric v0.27.0 // indirect go.opentelemetry.io/otel/metric v0.27.0 // indirect go.opentelemetry.io/otel/sdk v1.6.1 // indirect - go.opentelemetry.io/otel/trace v1.6.1 // indirect + go.opentelemetry.io/otel/trace v1.6.3 // indirect go.uber.org/atomic v1.7.0 // indirect golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect - golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f // indirect + golang.org/x/sys v0.0.0-20220412071739-889880a91fd5 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect diff --git a/go.sum b/go.sum index 260f666..69d9b56 100644 --- a/go.sum +++ b/go.sum @@ -447,10 +447,14 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= +github.com/uptrace/opentelemetry-go-extra/otellogrus v0.1.12 h1:uzbsxzCiefvpRUqYG3RbqlUuN7iR9gNd0vyuvQcb3ac= +github.com/uptrace/opentelemetry-go-extra/otellogrus v0.1.12/go.mod h1:a6J7AVBZ5vJ6+Sn5RTg5gqWWYBEWmLyFVIfsvAp8zzU= github.com/uptrace/opentelemetry-go-extra/otelsql v0.1.10 h1:2eXYzpRs3c1dA1OcLUQjWCx3yUOc4MJqdnzD4n3wyjM= github.com/uptrace/opentelemetry-go-extra/otelsql v0.1.10/go.mod h1:SVTZcEiaaEsE84gE7dYuteSc4oklkYHIFE4EBu+DiNQ= github.com/uptrace/opentelemetry-go-extra/otelsqlx v0.1.10 h1:BPTVEltIeRnBAx/qsHagzDemeZp/uht/mfHBb06NTKg= github.com/uptrace/opentelemetry-go-extra/otelsqlx v0.1.10/go.mod h1:6J9/PW7NxLiPoPBsX5k0QungPWLaz9JQON4iPn7uBWA= +github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.12 h1:hNqzVQzweP68oncrKGvEsi43quSHggvv39b62GLw8RY= +github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.12/go.mod h1:BADu9LMnBZF53MQv8VtmiIDF97iR9VvatXiyueaAzbY= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.2.1 h1:TVEnxayobAdVkhQfrfes2IzOB6o+z4roRkPF52WA1u4= @@ -482,26 +486,31 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.30.0 h1:UWZ4BLz go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.30.0/go.mod h1:OoQiI4C9O38fTJWSHQjy6qCWqRli3FFV8c1oeiA9Rs8= go.opentelemetry.io/contrib/propagators/b3 v1.5.0 h1:Ahvlf1i67AuEhWITkjMCl5JGeIdnPYDf8EqssB5EBdo= go.opentelemetry.io/contrib/propagators/b3 v1.5.0/go.mod h1:2NjfjR2KnKEtTOYpasfLbRFO9Tt0z3Wuq8/N5C1BPHY= +go.opentelemetry.io/otel v1.0.1/go.mod h1:OPEOD4jIT2SlZPMmwT6FqZz2C0ZNdQqiWcoK6M0SNFU= go.opentelemetry.io/otel v1.4.0/go.mod h1:jeAqMFKy2uLIxCtKxoFj0FAL5zAPKQagc3+GtBWakzk= go.opentelemetry.io/otel v1.4.1/go.mod h1:StM6F/0fSwpd8dKWDCdRr7uRvEPYdW0hBSlbdTiUde4= go.opentelemetry.io/otel v1.5.0/go.mod h1:Jm/m+rNp/z0eqJc74H7LPwQ3G87qkU/AnnAydAjSAHk= go.opentelemetry.io/otel v1.6.0/go.mod h1:bfJD2DZVw0LBxghOTlgnlI0CV3hLDu9XF/QKOUXMTQQ= -go.opentelemetry.io/otel v1.6.1 h1:6r1YrcTenBvYa1x491d0GGpTVBsNECmrc/K6b+zDeis= go.opentelemetry.io/otel v1.6.1/go.mod h1:blzUabWHkX6LJewxvadmzafgh/wnvBSDBdOuwkAtrWQ= +go.opentelemetry.io/otel v1.6.3 h1:FLOfo8f9JzFVFVyU+MSRJc2HdEAXQgm7pIv2uFKRSZE= +go.opentelemetry.io/otel v1.6.3/go.mod h1:7BgNga5fNlF/iZjG06hM3yofffp0ofKCDwSXx1GC4dI= go.opentelemetry.io/otel/exporters/jaeger v1.6.1 h1:7xuwXr3qUWq48Chyuq+VvomV3KjXZLd5seQwg83s/sU= go.opentelemetry.io/otel/exporters/jaeger v1.6.1/go.mod h1:Cu6mKJ+LLTPuOBX830xM4wVKIsVpHSXa50uN7aAxraQ= go.opentelemetry.io/otel/internal/metric v0.27.0 h1:9dAVGAfFiiEq5NVB9FUJ5et+btbDQAUIJehJ+ikyryk= go.opentelemetry.io/otel/internal/metric v0.27.0/go.mod h1:n1CVxRqKqYZtqyTh9U/onvKapPGv7y/rpyOTI+LFNzw= go.opentelemetry.io/otel/metric v0.27.0 h1:HhJPsGhJoKRSegPQILFbODU56NS/L1UE4fS1sC5kIwQ= go.opentelemetry.io/otel/metric v0.27.0/go.mod h1:raXDJ7uP2/Jc0nVZWQjJtzoyssOYWu/+pjZqRzfvZ7g= +go.opentelemetry.io/otel/sdk v1.0.1/go.mod h1:HrdXne+BiwsOHYYkBE5ysIcv2bvdZstxzmCQhxTcZkI= go.opentelemetry.io/otel/sdk v1.6.1 h1:ZmcNyMhcuAYIb/Nr6QhBPTMopMTbov/47wHt1gibkoY= go.opentelemetry.io/otel/sdk v1.6.1/go.mod h1:IVYrddmFZ+eJqu2k38qD3WezFR2pymCzm8tdxyh3R4E= +go.opentelemetry.io/otel/trace v1.0.1/go.mod h1:5g4i4fKLaX2BQpSBsxw8YYcgKpMMSW3x7ZTuYBr3sUk= go.opentelemetry.io/otel/trace v1.4.0/go.mod h1:uc3eRsqDfWs9R7b92xbQbU42/eTNz4N+gLP8qJCi4aE= go.opentelemetry.io/otel/trace v1.4.1/go.mod h1:iYEVbroFCNut9QkwEczV9vMRPHNKSSwYZjulEtsmhFc= go.opentelemetry.io/otel/trace v1.5.0/go.mod h1:sq55kfhjXYr1zVSyexg0w1mpa03AYXR5eyTkB9NPPdE= go.opentelemetry.io/otel/trace v1.6.0/go.mod h1:qs7BrU5cZ8dXQHBGxHMOxwME/27YH2qEp4/+tZLLwJE= -go.opentelemetry.io/otel/trace v1.6.1 h1:f8c93l5tboBYZna1nWk0W9DYyMzJXDWdZcJZ0Kb400U= go.opentelemetry.io/otel/trace v1.6.1/go.mod h1:RkFRM1m0puWIq10oxImnGEduNBzxiN7TXluRBtE+5j0= +go.opentelemetry.io/otel/trace v1.6.3 h1:IqN4L+5b0mPNjdXIiZ90Ni4Bl5BRkDQywePLWemd9bc= +go.opentelemetry.io/otel/trace v1.6.3/go.mod h1:GNJQusJlUgZl9/TQBPKU/Y/ty+0iVB5fjhKeJGZPGFs= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= @@ -709,8 +718,8 @@ golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f h1:rlezHXNlxYWvBCzNses9Dlc7nGFaNMJeqLolcmQSSZY= -golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220412071739-889880a91fd5 h1:NubxfvTRuNb4RVzWrIDAUzUvREH1HkCD4JjyQTSG9As= +golang.org/x/sys v0.0.0-20220412071739-889880a91fd5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/internal/amqp.go b/internal/amqp.go index b41bb10..9a0e40f 100644 --- a/internal/amqp.go +++ b/internal/amqp.go @@ -60,7 +60,7 @@ func (a *App) SendTotal(context context.Context, username string) error { func (a *App) SendTotalCallback() worker.MessageSender { return func(context context.Context, workItem *db.CPUUsageWorkItem) { if err := a.SendTotal(context, workItem.CreatedBy); err != nil { - log.Error(err) + log.WithContext(context).Error(err) } } } diff --git a/internal/events.go b/internal/events.go index a726dc6..39ed843 100644 --- a/internal/events.go +++ b/internal/events.go @@ -14,6 +14,7 @@ import ( func (a *App) AdminListEvents(c echo.Context) error { context := c.Request().Context() + log = log.WithContext(context) d := db.New(a.database) results, err := d.ListEvents(context) if err == sql.ErrNoRows || len(results) == 0 { @@ -27,6 +28,7 @@ func (a *App) AdminListEvents(c echo.Context) error { func (a *App) AdminListAllUserEventsHandler(c echo.Context) error { context := c.Request().Context() + log = log.WithContext(context) user := c.Param("username") if user == "" { @@ -48,6 +50,7 @@ func (a *App) AdminListAllUserEventsHandler(c echo.Context) error { func (a *App) AdminGetEventHandler(c echo.Context) error { context := c.Request().Context() + log = log.WithContext(context) id := c.Param("id") if id == "" { @@ -68,6 +71,7 @@ func (a *App) AdminGetEventHandler(c echo.Context) error { func (a *App) AdminUpdateEventHandler(c echo.Context) error { context := c.Request().Context() + log = log.WithContext(context) id := c.Param("id") if id == "" { @@ -104,6 +108,7 @@ func (a *App) AdminUpdateEventHandler(c echo.Context) error { func (a *App) AdminDeleteEventHandler(c echo.Context) error { context := c.Request().Context() + log = log.WithContext(context) id := c.Param("id") if id == "" { diff --git a/internal/summary.go b/internal/summary.go index d13ec1a..a579217 100644 --- a/internal/summary.go +++ b/internal/summary.go @@ -115,7 +115,6 @@ func (a *App) GetUserSummary(c echo.Context) error { duOK = true planOK = true - log = log.WithFields(logrus.Fields{"context": "get user summary"}) context := c.Request().Context() user := c.Param("username") @@ -124,7 +123,7 @@ func (a *App) GetUserSummary(c echo.Context) error { } user = a.FixUsername(user) - log = log.WithFields(logrus.Fields{"user": user}) + log = log.WithFields(logrus.Fields{"context": "get user summary", "user": user}).WithContext(context) cpuCtx, cpuSpan := otel.Tracer(otelName).Start(context, "summary: CPU hours") @@ -142,7 +141,7 @@ func (a *App) GetUserSummary(c echo.Context) error { } summary.Errors = append(summary.Errors, cpuHoursError) } else if err != nil { - log.Error(err) + log.WithContext(cpuCtx).Error(err) cpuHours = &db.CPUHours{} cpuHoursError := APIError{ Field: "cpu_usage", diff --git a/internal/totals.go b/internal/totals.go index 4b25e49..58344e0 100644 --- a/internal/totals.go +++ b/internal/totals.go @@ -21,6 +21,7 @@ func (a *App) GreetingHandler(context echo.Context) error { // CurrentCPUHours looks up the total CPU hours for the current recording period. func (a *App) CurrentCPUHoursHandler(c echo.Context) error { context := c.Request().Context() + var log = log.WithContext(context) user := c.Param("username") if user == "" { @@ -45,6 +46,7 @@ func (a *App) CurrentCPUHoursHandler(c echo.Context) error { // AllCPUHoursHandler returns all of the total CPU hours totals, regardless of recording period. func (a *App) AllCPUHoursHandler(c echo.Context) error { context := c.Request().Context() + var log = log.WithContext(context) user := c.Param("username") if user == "" { @@ -71,6 +73,7 @@ func (a *App) AllCPUHoursHandler(c echo.Context) error { // AdminAllCurrentCPUHoursTotalsHandler looks up all of the total CPU hours totals for all users. func (a *App) AdminAllCurrentCPUHoursHandler(c echo.Context) error { context := c.Request().Context() + var log = log.WithContext(context) d := db.New(a.database) results, err := d.AdminAllCurrentCPUHours(context) @@ -87,6 +90,7 @@ func (a *App) AdminAllCurrentCPUHoursHandler(c echo.Context) error { // AdminAllCPUHoursTotalsHandler returns all of the total CPU hours totals for all recording periods, regardless of user. func (a *App) AdminAllCPUHoursTotalsHandler(c echo.Context) error { context := c.Request().Context() + var log = log.WithContext(context) d := db.New(a.database) results, err := d.AdminAllCPUHours(context) @@ -111,7 +115,7 @@ func (a *App) AdminRecalculateCPUHoursTotalHandler(c echo.Context) error { ) context := c.Request().Context() - var log = log.WithFields(logrus.Fields{"context": "recalulcating cpu hours total"}) + var log = log.WithFields(logrus.Fields{"context": "recalulcating cpu hours total"}).WithContext(context) // Make sure the username has the domain suffix attached. user := c.Param("username") @@ -222,7 +226,7 @@ func (a *App) AdminUsersWithCalculableAnalysesHandler(c echo.Context) error { ) context := c.Request().Context() - var log = log.WithFields(logrus.Fields{"context": "users with calculable analyses"}) + var log = log.WithFields(logrus.Fields{"context": "users with calculable analyses"}).WithContext(context) d := db.New(a.database) @@ -242,7 +246,7 @@ func (a *App) AdminResendTotalToQMSHandler(c echo.Context) error { var err error context := c.Request().Context() - var log = log.WithFields(logrus.Fields{"context": "resend total to QMS"}) + var log = log.WithFields(logrus.Fields{"context": "resend total to QMS"}).WithContext(context) username := c.Param("username") if username == "" { diff --git a/internal/updates.go b/internal/updates.go index 00b0175..49e8dd4 100644 --- a/internal/updates.go +++ b/internal/updates.go @@ -29,6 +29,7 @@ func totalReplacer(_ *apd.Decimal, workItem *db.CPUUsageWorkItem) (*apd.Decimal, } func (a *App) updateCPUHours(context context.Context, workItem *db.CPUUsageWorkItem, updater totalUpdaterFn) error { + var log = log.WithContext(context) // Open a transaction. tx, err := a.database.Beginx() if err != nil { @@ -136,6 +137,7 @@ func (a *App) totalHandler(c echo.Context, eventType db.EventType) error { } func (a *App) AddToTotalHandler(c echo.Context) error { + var log = log.WithContext(c.Request().Context()) err := a.totalHandler(c, db.CPUHoursAdd) if err != nil { log.Error(err) @@ -144,6 +146,7 @@ func (a *App) AddToTotalHandler(c echo.Context) error { } func (a *App) SubtractFromTotalHandler(c echo.Context) error { + var log = log.WithContext(c.Request().Context()) err := a.totalHandler(c, db.CPUHoursSubtract) if err != nil { log.Error(err) @@ -152,6 +155,7 @@ func (a *App) SubtractFromTotalHandler(c echo.Context) error { } func (a *App) ResetTotalHandler(c echo.Context) error { + var log = log.WithContext(c.Request().Context()) err := a.totalHandler(c, db.CPUHoursReset) if err != nil { log.Error(err) diff --git a/main.go b/main.go index ef1ce02..89c9448 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,7 @@ import ( "golang.org/x/net/context" "github.com/cyverse-de/go-mod/otelutils" + "github.com/uptrace/opentelemetry-go-extra/otellogrus" "github.com/uptrace/opentelemetry-go-extra/otelsql" "github.com/uptrace/opentelemetry-go-extra/otelsqlx" semconv "go.opentelemetry.io/otel/semconv/v1.7.0" @@ -41,7 +42,7 @@ func getHandler(dbClient *sqlx.DB) amqp.HandlerFn { return func(context context.Context, externalID string, state messaging.JobState) { var err error - log = log.WithFields(logrus.Fields{"externalID": externalID}) + log = log.WithFields(logrus.Fields{"externalID": externalID}).WithContext(context) if state == messaging.FailedState || state == messaging.SucceededState { log.Debug("calculating CPU hours for analysis") @@ -80,6 +81,9 @@ func main() { ) flag.Parse() + + logrus.AddHook(otellogrus.NewHook()) + logging.SetupLogging(*logLevel) var tracerCtx, cancel = context.WithCancel(context.Background()) diff --git a/worker/worker.go b/worker/worker.go index 33a9cee..6210bc3 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -63,6 +63,10 @@ func New(context context.Context, config *Config, dbAccessor *sqlx.DB) (*Worker, err error database *db.Database ) + ctx, span := otel.Tracer(otelName).Start(context, "worker.New") + defer span.End() + + var log = log.WithContext(ctx) worker := Worker{ ClaimLifetime: config.ClaimLifetime, @@ -75,18 +79,21 @@ func New(context context.Context, config *Config, dbAccessor *sqlx.DB) (*Worker, database = db.New(worker.db) - worker.ID, err = database.RegisterWorker(context, worker.Name, time.Now().Add(config.ExpirationInterval)) + worker.ID, err = database.RegisterWorker(ctx, worker.Name, time.Now().Add(config.ExpirationInterval)) if err != nil { return nil, err } worker.Scheduler = gocron.NewScheduler(time.UTC) + // use passed-in context, not the span context, for these scheduled jobs worker.Scheduler.Every(config.RefreshInterval).Do(func() { // nolint:errcheck - log := log.WithFields(logrus.Fields{"context": "refreshing worker registration"}) + ctx, span := otel.Tracer(otelName).Start(context, "worker refresh") + defer span.End() + log := log.WithFields(logrus.Fields{"context": "refreshing worker registration"}).WithContext(ctx) log.Info("start refreshing worker registrations") - newTime, err := database.RefreshWorkerRegistration(context, worker.ID, worker.Name, config.ExpirationInterval) + newTime, err := database.RefreshWorkerRegistration(ctx, worker.ID, worker.Name, config.ExpirationInterval) if err != nil { log.Error(err) return @@ -95,10 +102,12 @@ func New(context context.Context, config *Config, dbAccessor *sqlx.DB) (*Worker, }) worker.Scheduler.Every(config.WorkerPurgeInterval).Do(func() { // nolint:errcheck - log := log.WithFields(logrus.Fields{"context": "purging expired workers"}) + ctx, span := otel.Tracer(otelName).Start(context, "worker purge") + defer span.End() + log := log.WithFields(logrus.Fields{"context": "purging expired workers"}).WithContext(ctx) log.Info("start purging expired workers") - numExpired, err := database.PurgeExpiredWorkers(context) + numExpired, err := database.PurgeExpiredWorkers(ctx) if err != nil { log.Error(err) return @@ -106,7 +115,7 @@ func New(context context.Context, config *Config, dbAccessor *sqlx.DB) (*Worker, log.Infof("purged %d expired workers", numExpired) log.Info("resetting work claims for inactive workers") - resetClaims, err := database.ResetWorkClaimsForInactiveWorkers(context) + resetClaims, err := database.ResetWorkClaimsForInactiveWorkers(ctx) if err != nil { log.Error(err) return @@ -115,10 +124,12 @@ func New(context context.Context, config *Config, dbAccessor *sqlx.DB) (*Worker, }) worker.Scheduler.Every(config.WorkSeekerPurgeInterval).Do(func() { // nolint:errcheck - log := log.WithFields(logrus.Fields{"context": "purging expired work seekers"}) + ctx, span := otel.Tracer(otelName).Start(context, "worker seeker purge") + defer span.End() + log := log.WithFields(logrus.Fields{"context": "purging expired work seekers"}).WithContext(ctx) log.Info("start purging expired work seekers") - numExpiredWorkers, err := database.PurgeExpiredWorkSeekers(context) + numExpiredWorkers, err := database.PurgeExpiredWorkSeekers(ctx) if err != nil { log.Error(err) return @@ -127,10 +138,12 @@ func New(context context.Context, config *Config, dbAccessor *sqlx.DB) (*Worker, }) worker.Scheduler.Every(config.WorkClaimPurgeInterval).Do(func() { // nolint:errcheck - log := log.WithFields(logrus.Fields{"context": "purging expired work claims"}) + ctx, span := otel.Tracer(otelName).Start(context, "work claim purge") + defer span.End() + log := log.WithFields(logrus.Fields{"context": "purging expired work claims"}).WithContext(ctx) log.Info("start purging expired work claims") - numWorkClaims, err := database.PurgeExpiredWorkClaims(context) + numWorkClaims, err := database.PurgeExpiredWorkClaims(ctx) if err != nil { log.Error(err) return @@ -154,7 +167,7 @@ func (w *Worker) Start(context context.Context) { span.SetAttributes(attribute.String("worker.name", w.Name), attribute.String("worker.id", w.ID)) now := time.Now() - log := log.WithFields(logrus.Fields{"context": "processing work items"}) + log := log.WithFields(logrus.Fields{"context": "processing work items"}).WithContext(iterationCtx) log.Debugf("start looking for work items") if err = database.GettingWork(iterationCtx, w.ID, now.Add(w.WorkSeekingLifetime)); err != nil { @@ -251,7 +264,7 @@ func (w *Worker) Start(context context.Context) { // something in a transaction, which will hopefully prevent race conditions with the code that // purges expired work claims and workers. func (w *Worker) claimWorkItem(context context.Context, workItem *db.CPUUsageWorkItem) error { - log := log.WithFields(logrus.Fields{"context": "claiming work item"}) + log := log.WithFields(logrus.Fields{"context": "claiming work item"}).WithContext(context) tx, err := w.db.Beginx() if err != nil { @@ -300,7 +313,7 @@ func (w *Worker) claimWorkItem(context context.Context, workItem *db.CPUUsageWor // getting work starting work in a transaction, which should avoid race conditions // in the clean up functions that run periodically. func (w *Worker) transitionToWorkingState(context context.Context) error { - log := log.WithFields(logrus.Fields{"context": "claiming work item"}) + log := log.WithFields(logrus.Fields{"context": "claiming work item"}).WithContext(context) tx, err := w.db.Beginx() if err != nil { @@ -351,7 +364,7 @@ func (w *Worker) transitionToWorkingState(context context.Context) error { } func (w *Worker) finishWorking(context context.Context, workItem *db.CPUUsageWorkItem) error { - log := logging.Log.WithFields(logrus.Fields{"context": "mark work item finished"}) + log := logging.Log.WithFields(logrus.Fields{"context": "mark work item finished"}).WithContext(context) // Use a transaction here to avoid causing a race condition that // could cause the worker to get purged between steps. diff --git a/worker/workitems.go b/worker/workitems.go index 3c4475a..d79a7a7 100644 --- a/worker/workitems.go +++ b/worker/workitems.go @@ -15,6 +15,7 @@ import ( type totalUpdater func(*apd.Decimal, *apd.Decimal) (*apd.Decimal, error) func (w *Worker) updateCPUHoursTotal(context context.Context, log *logrus.Entry, workItem *db.CPUUsageWorkItem, updateFn totalUpdater) error { + log = log.WithContext(context) tx, err := w.db.Beginx() if err != nil { if rerr := tx.Rollback(); rerr != nil { @@ -117,7 +118,7 @@ func (w *Worker) updateCPUHoursTotal(context context.Context, log *logrus.Entry, } func (w *Worker) AddCPUHours(context context.Context, workItem *db.CPUUsageWorkItem) error { - log := logging.Log.WithFields(logrus.Fields{"context": "adding CPU hours"}) + log := logging.Log.WithFields(logrus.Fields{"context": "adding CPU hours"}).WithContext(context) return w.updateCPUHoursTotal(context, log, workItem, func(current *apd.Decimal, add *apd.Decimal) (*apd.Decimal, error) { total := apd.New(0, 0) _, err := apd.BaseContext.Add(total, current, add) @@ -129,7 +130,7 @@ func (w *Worker) AddCPUHours(context context.Context, workItem *db.CPUUsageWorkI } func (w *Worker) SubtractCPUHours(context context.Context, workItem *db.CPUUsageWorkItem) error { - log := logging.Log.WithFields(logrus.Fields{"context": "subtracting CPU hours"}) + log := logging.Log.WithFields(logrus.Fields{"context": "subtracting CPU hours"}).WithContext(context) return w.updateCPUHoursTotal(context, log, workItem, func(current *apd.Decimal, subtract *apd.Decimal) (*apd.Decimal, error) { total := apd.New(0, 0) _, err := apd.BaseContext.WithPrecision(15).Sub(total, current, subtract) @@ -141,7 +142,7 @@ func (w *Worker) SubtractCPUHours(context context.Context, workItem *db.CPUUsage } func (w *Worker) ResetCPUHours(context context.Context, workItem *db.CPUUsageWorkItem) error { - log := logging.Log.WithFields(logrus.Fields{"context": "resetting CPU hours"}) + log := logging.Log.WithFields(logrus.Fields{"context": "resetting CPU hours"}).WithContext(context) return w.updateCPUHoursTotal(context, log, workItem, func(_ *apd.Decimal, newValue *apd.Decimal) (*apd.Decimal, error) { return newValue, nil })