diff --git a/go.mod b/go.mod index 747e884..9db4152 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 github.com/iancoleman/strcase v0.2.0 github.com/influxdata/influxdb-client-go/v2 v2.12.3 - github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee + github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241104034549-4b92cf27ff00 github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a github.com/instill-ai/x v0.4.0-alpha github.com/knadh/koanf v1.5.0 diff --git a/go.sum b/go.sum index 3a3e33b..d2b178a 100644 --- a/go.sum +++ b/go.sum @@ -1090,8 +1090,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0 github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= -github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee h1:onnzrn5jabO3jDLPo2193Ql6YMRyDWDx9K834Bfi8V0= -github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029162707-1398399a24ee/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY= +github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241104034549-4b92cf27ff00 h1:65aiWEWp8ayWm64aCfa7nOoNSjsiX6zXs1VBadJ/JhY= +github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241104034549-4b92cf27ff00/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY= github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw= github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw= github.com/instill-ai/x v0.4.0-alpha h1:zQV2VLbSHjMv6gyBN/2mwwrvWk0/mJM6ZKS12AzjfQg= diff --git a/pkg/constant/constant.go b/pkg/constant/constant.go index d63548e..c22c42a 100644 --- a/pkg/constant/constant.go +++ b/pkg/constant/constant.go @@ -41,6 +41,8 @@ const ( PipelineUID string = "pipeline_uid" PipelineReleaseID string = "pipeline_release_id" PipelineReleaseUID string = "pipeline_release_uid" + ModelID string = "model_id" + ModelUID string = "model_uid" TriggerMode string = "trigger_mode" Status string = "status" ) diff --git a/pkg/handler/publichandler.go b/pkg/handler/publichandler.go index 9d762c3..8278e67 100644 --- a/pkg/handler/publichandler.go +++ b/pkg/handler/publichandler.go @@ -979,6 +979,70 @@ func (h *PublicHandler) ListPipelineTriggerTableRecords(ctx context.Context, req return &resp, nil } +func (h *PublicHandler) ListModelTriggerTableRecords(ctx context.Context, req *mgmtPB.ListModelTriggerTableRecordsRequest) (*mgmtPB.ListModelTriggerTableRecordsResponse, error) { + + eventName := "ListModelTriggerTableRecords" + ctx, span := tracer.Start(ctx, eventName, + trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + + logUUID, _ := uuid.NewV4() + + logger, _ := logger.GetZapLogger(ctx) + + ctxUserUID, err := h.Service.ExtractCtxUser(ctx, false) + if err != nil { + span.SetStatus(1, err.Error()) + return nil, err + } + pbUser, err := h.Service.GetUserByUIDAdmin(ctx, ctxUserUID) + if err != nil { + span.SetStatus(1, err.Error()) + return nil, err + } + + declarations, err := filtering.NewDeclarations([]filtering.DeclarationOption{ + filtering.DeclareStandardFunctions(), + filtering.DeclareIdent(constant.Start, filtering.TypeTimestamp), + filtering.DeclareIdent(constant.Stop, filtering.TypeTimestamp), + filtering.DeclareIdent(strcase.ToLowerCamel(constant.OwnerName), filtering.TypeString), + filtering.DeclareIdent(strcase.ToLowerCamel(constant.ModelID), filtering.TypeString), + filtering.DeclareIdent(strcase.ToLowerCamel(constant.ModelUID), filtering.TypeString), + }...) + if err != nil { + span.SetStatus(1, err.Error()) + return nil, err + } + + filter, err := filtering.ParseFilter(req, declarations) + if err != nil { + span.SetStatus(1, err.Error()) + return nil, err + } + + modelTriggerTableRecords, totalSize, nextPageToken, err := h.Service.ListModelTriggerTableRecords(ctx, pbUser, int64(req.GetPageSize()), req.GetPageToken(), filter) + if err != nil { + span.SetStatus(1, err.Error()) + return nil, err + } + + resp := mgmtPB.ListModelTriggerTableRecordsResponse{ + ModelTriggerTableRecords: modelTriggerTableRecords, + NextPageToken: nextPageToken, + TotalSize: int32(totalSize), + } + + logger.Info(string(custom_otel.NewLogMessage( + span, + logUUID.String(), + uuid.FromStringOrNil(*pbUser.Uid), + eventName, + custom_otel.SetEventResult(fmt.Sprintf("Total records retrieved: %v", totalSize)), + ))) + + return &resp, nil +} + // ListPipelineTriggerChartRecords returns a timeline of a requester's pipeline // trigger count. func (h *PublicHandler) ListPipelineTriggerChartRecords(ctx context.Context, req *mgmtPB.ListPipelineTriggerChartRecordsRequest) (*mgmtPB.ListPipelineTriggerChartRecordsResponse, error) { diff --git a/pkg/repository/influx.go b/pkg/repository/influx.go index 313c2e2..320d7a0 100644 --- a/pkg/repository/influx.go +++ b/pkg/repository/influx.go @@ -34,6 +34,7 @@ var defaultAggregationWindow = time.Hour.Nanoseconds() type InfluxDB interface { QueryPipelineTriggerRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (pipelines []*mgmtpb.PipelineTriggerRecord, totalSize int64, nextPageToken string, err error) QueryPipelineTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerTableRecord, totalSize int64, nextPageToken string, err error) + QueryModelTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.ModelTriggerTableRecord, totalSize int64, nextPageToken string, err error) QueryPipelineTriggerChartRecords(ctx context.Context, owner string, ownerQueryString string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerChartRecord, err error) ListModelTriggerChartRecords(ctx context.Context, p ListModelTriggerChartRecordsParams) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) @@ -121,6 +122,192 @@ func AggregationWindowOffset(t time.Time) time.Duration { return t.Sub(startOfDay) } +func (i *influxDB) QueryModelTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string, + pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.ModelTriggerTableRecord, + totalSize int64, nextPageToken string, err error) { + + logger, _ := logger.GetZapLogger(ctx) + + if pageSize == 0 { + pageSize = DefaultPageSize + } else if pageSize > MaxPageSize { + pageSize = MaxPageSize + } + + start := time.Time{}.Format(time.RFC3339Nano) + stop := time.Now().Format(time.RFC3339Nano) + mostRecetTimeFilter := time.Now().Format(time.RFC3339Nano) + + // TODO: validate owner uid from token + if pageToken != "" { + mostRecetTime, _, err := paginate.DecodeToken(pageToken) + if err != nil { + return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid page token: %s", err.Error()) + } + mostRecetTime = mostRecetTime.Add(time.Duration(-1)) + mostRecetTimeFilter = mostRecetTime.Format(time.RFC3339Nano) + } + + // TODO: design better filter expression to flux transpiler + expr, err := i.transpileFilter(filter) + if err != nil { + return nil, 0, "", status.Error(codes.Internal, err.Error()) + } + + if expr != "" { + exprs := strings.Split(expr, "&&") + for i, expr := range exprs { + if strings.HasPrefix(expr, constant.Start) { + start = strings.Split(expr, "@")[1] + exprs[i] = "" + } + if strings.HasPrefix(expr, constant.Stop) { + stop = strings.Split(expr, "@")[1] + exprs[i] = "" + } + } + expr = strings.Join(exprs, "") + } + + baseQuery := fmt.Sprintf( + `base = + from(bucket: "%v") + |> range(start: %v, stop: %v) + |> filter(fn: (r) => r["_measurement"] == "model.trigger.v1") + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + %v + %v + triggerRank = + base + |> drop( + columns: [ + "owner_uid", + "trigger_mode", + "compute_time_duration", + "model_trigger_id", + "status", + ], + ) + |> group(columns: ["model_uid"]) + |> map(fn: (r) => ({r with trigger_time: time(v: r.trigger_time)})) + |> sort(columns: ["trigger_time"], desc: true) + |> first(column: "trigger_time") + |> rename(columns: {trigger_time: "most_recent_trigger_time"}) + triggerCount = + base + |> drop( + columns: ["owner_uid", "trigger_mode", "compute_time_duration", "model_trigger_id"], + ) + |> group(columns: ["model_uid", "status"]) + |> count(column: "trigger_time") + |> rename(columns: {trigger_time: "trigger_count"}) + |> group(columns: ["model_uid"]) + triggerTable = + join(tables: {t1: triggerRank, t2: triggerCount}, on: ["model_uid"]) + |> group() + |> pivot( + rowKey: ["model_uid", "most_recent_trigger_time"], + columnKey: ["status"], + valueColumn: "trigger_count", + ) + |> filter( + fn: (r) => r["most_recent_trigger_time"] < time(v: %v) + ) + nameMap = + base + |> keep(columns: ["trigger_time", "model_id", "model_uid"]) + |> group(columns: ["model_uid"]) + |> top(columns: ["trigger_time"], n: 1) + |> drop(columns: ["trigger_time"]) + |> group() + join(tables: {t1: triggerTable, t2: nameMap}, on: ["model_uid"])`, + i.bucket, + start, + stop, + ownerQueryString, + expr, + mostRecetTimeFilter, + ) + + query := fmt.Sprintf( + `%v + |> group() + |> sort(columns: ["most_recent_trigger_time"], desc: true) + |> limit(n: %v)`, + baseQuery, + pageSize, + ) + + totalQuery := fmt.Sprintf( + `%v + |> group() + |> count(column: "model_uid")`, + baseQuery, + ) + + var lastTimestamp time.Time + + result, err := i.api.Query(ctx, query) + if err != nil { + return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error()) + } else { + // Iterate over query response + for result.Next() { + // Notice when group key has changed + if result.TableChanged() { + logger.Debug(fmt.Sprintf("table: %s\n", result.TableMetadata().String())) + } + + tableRecord := &mgmtpb.ModelTriggerTableRecord{} + + if v, match := result.Record().ValueByKey(constant.ModelID).(string); match { + tableRecord.ModelId = v + } + if v, match := result.Record().ValueByKey(constant.ModelUID).(string); match { + tableRecord.ModelUid = v + } + if v, match := result.Record().ValueByKey(mgmtpb.Status_STATUS_COMPLETED.String()).(int64); match { + tableRecord.TriggerCountCompleted = int32(v) + } + if v, match := result.Record().ValueByKey(mgmtpb.Status_STATUS_ERRORED.String()).(int64); match { + tableRecord.TriggerCountErrored = int32(v) + } + + records = append(records, tableRecord) + } + + // Check for an error + if result.Err() != nil { + return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error()) + } + if result.Record() == nil { + return nil, 0, "", nil + } + + if v, match := result.Record().ValueByKey("most_recent_trigger_time").(time.Time); match { + lastTimestamp = v + } + } + + var total int64 + totalQueryResult, err := i.api.Query(ctx, totalQuery) + if err != nil { + return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid total query: %s", err.Error()) + } else { + if totalQueryResult.Next() { + total = totalQueryResult.Record().ValueByKey(constant.ModelUID).(int64) + } + } + + if int64(len(records)) < total { + pageToken = paginate.EncodeToken(lastTimestamp, owner) + } else { + pageToken = "" + } + + return records, int64(len(records)), pageToken, nil +} + func (i *influxDB) QueryPipelineTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerTableRecord, totalSize int64, nextPageToken string, err error) { logger, _ := logger.GetZapLogger(ctx) diff --git a/pkg/service/metric.go b/pkg/service/metric.go index 7eae95b..01afd82 100644 --- a/pkg/service/metric.go +++ b/pkg/service/metric.go @@ -144,6 +144,21 @@ func (s *service) ListPipelineTriggerTableRecords(ctx context.Context, owner *mg return pipelineTriggerTableRecords, ps, pt, nil } +func (s *service) ListModelTriggerTableRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.ModelTriggerTableRecord, int64, string, error) { + + ownerUID, _, _, ownerQueryString, filter, err := s.checkPipelineOwnership(ctx, filter, owner) + if err != nil { + return []*mgmtpb.ModelTriggerTableRecord{}, 0, "", err + } + + modelTriggerTableRecords, ps, pt, err := s.influxDB.QueryModelTriggerTableRecords(ctx, *ownerUID, ownerQueryString, pageSize, pageToken, filter) + if err != nil { + return nil, 0, "", err + } + + return modelTriggerTableRecords, ps, pt, nil +} + func (s *service) ListPipelineTriggerChartRecords(ctx context.Context, owner *mgmtpb.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerChartRecord, error) { ownerUID, ownerID, ownerType, ownerQueryString, filter, err := s.checkPipelineOwnership(ctx, filter, owner) diff --git a/pkg/service/service.go b/pkg/service/service.go index 380cbfd..919c19d 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -73,6 +73,7 @@ type Service interface { ListPipelineTriggerRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerRecord, int64, string, error) ListPipelineTriggerTableRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerTableRecord, int64, string, error) ListPipelineTriggerChartRecords(ctx context.Context, owner *mgmtpb.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerChartRecord, error) + ListModelTriggerTableRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.ModelTriggerTableRecord, int64, string, error) ListModelTriggerChartRecords(ctx context.Context, req *mgmtpb.ListModelTriggerChartRecordsRequest, ctxUserUID uuid.UUID) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) DBUser2PBUser(ctx context.Context, dbUser *datamodel.Owner) (*mgmtpb.User, error)