From afc27e8dd5e655a595e4573a067b663e8c0ac7b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Vall=C3=A9s?= <3977183+jvallesm@users.noreply.github.com> Date: Tue, 26 Nov 2024 16:55:06 +0100 Subject: [PATCH] feat(metric): implement new pipeline dashboard endpoints (#238) Because - The new dashboard introduces changes in the metrics endpoints. This commit - Applies contracts defined in https://github.com/instill-ai/protobufs/pull/435, keeping the API backwards-compatible. --- go.mod | 2 +- go.sum | 4 +- integration-test/grpc-public-user.js | 12 +- .../proto/core/mgmt/v1beta/metric.proto | 203 +++++----- .../mgmt/v1beta/mgmt_public_service.proto | 350 +++++++++++++++--- pkg/handler/publichandler.go | 74 +++- pkg/repository/influx.go | 199 +++++++--- pkg/service/metric.go | 83 ++++- pkg/service/service.go | 6 +- 9 files changed, 731 insertions(+), 202 deletions(-) diff --git a/go.mod b/go.mod index dd6243a..5b5e547 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 github.com/iancoleman/strcase v0.2.0 github.com/influxdata/influxdb-client-go/v2 v2.12.3 - github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241104132014-1bf251739feb + github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241120170237-ee31d10bc9c8 github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a github.com/instill-ai/x v0.4.0-alpha github.com/knadh/koanf v1.5.0 diff --git a/go.sum b/go.sum index 8c9fcdf..b68c15c 100644 --- a/go.sum +++ b/go.sum @@ -1090,8 +1090,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0 github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= -github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241104132014-1bf251739feb h1:tJ7/VFdviz4bs3IVkuKwjrs/sltNPb1a3xVx3u3T7hA= -github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241104132014-1bf251739feb/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY= +github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241120170237-ee31d10bc9c8 h1:Mq87LDBN4fmpN7tD/UMTMahqR/FupEHVIPCPqLZh4XI= +github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241120170237-ee31d10bc9c8/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY= github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw= github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw= github.com/instill-ai/x v0.4.0-alpha h1:zQV2VLbSHjMv6gyBN/2mwwrvWk0/mJM6ZKS12AzjfQg= diff --git a/integration-test/grpc-public-user.js b/integration-test/grpc-public-user.js index 420af02..09f7228 100644 --- a/integration-test/grpc-public-user.js +++ b/integration-test/grpc-public-user.js @@ -293,15 +293,15 @@ export function CheckPublicMetrics(header) { }); group(`Management Public API: List Pipeline Trigger Chart Records`, () => { - check(client.invoke('core.mgmt.v1beta.MgmtPublicService/ListPipelineTriggerChartRecords', {}, header), { - 'core.mgmt.v1beta.MgmtPublicService/ListPipelineTriggerChartRecords status': (r) => r && r.status == grpc.StatusOK, - 'core.mgmt.v1beta.MgmtPublicService/ListPipelineTriggerChartRecords response has pipelineTriggerChartRecords': (r) => r && r.message.pipelineTriggerChartRecords !== undefined, + check(client.invoke('core.mgmt.v1beta.MgmtPublicService/ListPipelineTriggerChartRecordsV0', {}, header), { + 'core.mgmt.v1beta.MgmtPublicService/ListPipelineTriggerChartRecordsV0 status': (r) => r && r.status == grpc.StatusOK, + 'core.mgmt.v1beta.MgmtPublicService/ListPipelineTriggerChartRecordsV0 response has pipelineTriggerChartRecords': (r) => r && r.message.pipelineTriggerChartRecords !== undefined, }); - check(client.invoke('core.mgmt.v1beta.MgmtPublicService/ListPipelineTriggerChartRecords', { + check(client.invoke('core.mgmt.v1beta.MgmtPublicService/ListPipelineTriggerChartRecordsV0', { filter: `pipelineId="${pipeline_id}" AND triggerMode=MODE_SYNC`, }, header), { - 'core.mgmt.v1beta.MgmtPublicService/ListPipelineTriggerChartRecords with filter status': (r) => r && r.status == grpc.StatusOK, - 'core.mgmt.v1beta.MgmtPublicService/ListPipelineTriggerChartRecords with filter response pipelineTriggerChartRecords lenght is 0': (r) => r && r.message.pipelineTriggerChartRecords.length === 0, + 'core.mgmt.v1beta.MgmtPublicService/ListPipelineTriggerChartRecordsV0 with filter status': (r) => r && r.status == grpc.StatusOK, + 'core.mgmt.v1beta.MgmtPublicService/ListPipelineTriggerChartRecordsV0 with filter response pipelineTriggerChartRecords lenght is 0': (r) => r && r.message.pipelineTriggerChartRecords.length === 0, }); }); diff --git a/integration-test/proto/core/mgmt/v1beta/metric.proto b/integration-test/proto/core/mgmt/v1beta/metric.proto index 20ba79b..c2862a5 100644 --- a/integration-test/proto/core/mgmt/v1beta/metric.proto +++ b/integration-test/proto/core/mgmt/v1beta/metric.proto @@ -27,55 +27,20 @@ enum Status { STATUS_ERRORED = 2; } -// ========== Pipeline endpoints - -// PipelineTriggerCount represents a pipeline execution count with some +// TriggerCount represents a execution count with some // aggregation params (e.g. trigger status). -message PipelineTriggerCount { +message TriggerCount { // Number of triggers. int32 trigger_count = 1 [(google.api.field_behavior) = OUTPUT_ONLY]; // This field will be present when results are grouped by trigger status. optional Status status = 2 [(google.api.field_behavior) = OUTPUT_ONLY]; } -/* - // PipelineTriggerChartRecord represents a timeline of pipeline triggers. It - // contains a collection of (timestamp, count) pairs that represent the total - // pipeline triggers in a given time bucket. - // pipeline ID and time frame. - message PipelineTriggerChartRecord { - // This field will be present present when the information is grouped by pipeline. - optional string pipeline_id = 1 [(google.api.field_behavior) = OUTPUT_ONLY]; - // 2 is reserved for the pipeline UUID. - reserved 2; - // 3 is reserved for the trigger mode. The server wasn't grouping results by this - // field. - reserved 3; - // 4 is reserved for the trigger status. The server wasn't grouping results - // by this field. - reserved 4; - // Time buckets. - repeated google.protobuf.Timestamp time_buckets = 5 [(google.api.field_behavior) = OUTPUT_ONLY]; - // Aggregated trigger count in each time bucket. - repeated int32 trigger_counts = 6 [(google.api.field_behavior) = OUTPUT_ONLY]; - // 7 is reserved for the trigger execution duration. - reserved 7; - // 8 is reserved for the pipeline release ID. The server wasn't grouping - // results by this field. - reserved 8; - // 9 is reserved for the pipeline release UUID. The server wasn't grouping - // results by this field. - reserved 9; - // The ID of the namespace that requested the pipeline triggers. - string namespace_id = 10 [(google.api.field_behavior) = OUTPUT_ONLY]; - } -*/ - // GetPipelineTriggerCountRequest represents a request to fetch the trigger // count of a requester over a time period. message GetPipelineTriggerCountRequest { // The ID of the namespace that requested the pipeline triggers. - string namespace_id = 1 [(google.api.field_behavior) = REQUIRED]; + string requester_id = 1 [(google.api.field_behavior) = REQUIRED]; // Beginning of the time range from which the records will be fetched. // The default value is the beginning of the current day, in UTC. optional google.protobuf.Timestamp start = 2; @@ -88,47 +53,113 @@ message GetPipelineTriggerCountRequest { // trigger status. message GetPipelineTriggerCountResponse { // The trigger counts, grouped by status. - repeated PipelineTriggerCount pipeline_trigger_counts = 1; + repeated TriggerCount pipeline_trigger_counts = 1; +} + +// GetModelTriggerCountRequest represents a request to fetch the trigger +// count of a requester over a time period. +message GetModelTriggerCountRequest { + // The ID of the requester that triggered the model. + string requester_id = 1 [(google.api.field_behavior) = REQUIRED]; + // Beginning of the time range from which the records will be fetched. + // The default value is the beginning of the current day, in UTC. + optional google.protobuf.Timestamp start = 2; + // End of the time range from which the records will be fetched. + // The default value is the current timestamp. + optional google.protobuf.Timestamp stop = 3; +} + +// GetModelTriggerCountResponse contains the trigger count, grouped by +// trigger status. +message GetModelTriggerCountResponse { + // The trigger counts, grouped by status. + repeated TriggerCount model_trigger_counts = 1; +} + +// PipelineTriggerChartRecord represents a timeline of pipeline triggers. It +// contains a collection of (timestamp, count) pairs that represent the total +// pipeline triggers in a given time bucket. +// pipeline ID and time frame. +message PipelineTriggerChartRecord { + // This field will be present present when the information is grouped by pipeline. + optional string pipeline_id = 1 [(google.api.field_behavior) = OUTPUT_ONLY]; + // Time buckets. + repeated google.protobuf.Timestamp time_buckets = 2 [(google.api.field_behavior) = OUTPUT_ONLY]; + // Aggregated trigger count in each time bucket. + repeated int32 trigger_counts = 3 [(google.api.field_behavior) = OUTPUT_ONLY]; + // The ID of the namespace that requested the pipeline triggers. + string requester_id = 4 [(google.api.field_behavior) = OUTPUT_ONLY]; +} + +// ListPipelineTriggerChartRecordsRequest represents a request to list pipeline +// trigger chart records for a given requester, grouped by time buckets. +message ListPipelineTriggerChartRecordsRequest { + // The ID of the namespace that requested the pipeline triggers. + string requester_id = 1 [(google.api.field_behavior) = REQUIRED]; + // Aggregation window. The value is a positive duration string, i.e. a + // sequence of decimal numbers, each with optional fraction and a unit + // suffix, such as "300ms", "1.5h" or "2h45m". + // The minimum (and default) window is 1h. + optional string aggregation_window = 2; + // Beginning of the time range from which the records will be fetched. + // The default value is the beginning of the current day, in UTC. + optional google.protobuf.Timestamp start = 3; + // End of the time range from which the records will be fetched. + // The default value is the current timestamp. + optional google.protobuf.Timestamp stop = 4; +} + +// ListPipelineTriggerChartRecordsResponse contains a list of pipeline trigger +// chart records. +message ListPipelineTriggerChartRecordsResponse { + // Pipeline trigger counts. Until we allow filtering or grouping by fields + // like pipeline ID, this list will contain only one element with the + // timeline of trigger counts for a given requester, regardless the pipeline + // ID, trigger mode, final status or other fields. + repeated PipelineTriggerChartRecord pipeline_trigger_chart_records = 1; } -/* - // ListPipelineTriggerChartRecordsRequest represents a request to list pipeline - // trigger chart records for a given requester, grouped by time buckets. - message ListPipelineTriggerChartRecordsRequest { - // 1 is reserved for the aggregation window in nanoseconds. This is - // deprecated in favour of an aggregation window string that represents a - // duration. - reserved 1; - // 2 is reserved for the filter. For now, this endpoint won't allow filtering - // but in the future we might implement a filter to show the trigger count of - // only certain pipelines and to group by the pipeline ID. - reserved 2; +// ModelTriggerChartRecord represents a timeline of model triggers. It +// contains a collection of (timestamp, count) pairs that represent the total +// model triggers in a given time bucket. +message ModelTriggerChartRecord { + // This field will be present present when the information is grouped by model. + optional string model_id = 1 [(google.api.field_behavior) = OUTPUT_ONLY]; + // Time buckets. + repeated google.protobuf.Timestamp time_buckets = 2 [(google.api.field_behavior) = OUTPUT_ONLY]; + // Aggregated trigger count in each time bucket. + repeated int32 trigger_counts = 3 [(google.api.field_behavior) = OUTPUT_ONLY]; + // The ID of the namespace that requested the model triggers. + string requester_id = 4 [(google.api.field_behavior) = OUTPUT_ONLY]; +} - // The ID of the namespace that requested the pipeline triggers. - string namespace_id = 3 [(google.api.field_behavior) = REQUIRED]; - // Aggregation window. The value is a positive duration string, i.e. a - // sequence of decimal numbers, each with optional fraction and a unit - // suffix, such as "300ms", "1.5h" or "2h45m". - // The minimum (and default) window is 1h. - optional string aggregation_window = 4; - // Beginning of the time range from which the records will be fetched. - // The default value is the beginning of the current day, in UTC. - optional google.protobuf.Timestamp start = 5; - // End of the time range from which the records will be fetched. - // The default value is the current timestamp. - optional google.protobuf.Timestamp stop = 6; - } +// ListModelTriggerChartRecordsRequest represents a request to list model +// trigger metrics, aggregated by model ID and time frame. +message ListModelTriggerChartRecordsRequest { + // The ID of the namespace that requested the model triggers. + string requester_id = 1 [(google.api.field_behavior) = REQUIRED]; + // Aggregation window. The value is a positive duration string, i.e. a + // sequence of decimal numbers, each with optional fraction and a unit + // suffix, such as "300ms", "1.5h" or "2h45m". + // The minimum (and default) window is 1h. + optional string aggregation_window = 2; + // Beginning of the time range from which the records will be fetched. + // The default value is the beginning of the current day, in UTC. + optional google.protobuf.Timestamp start = 3; + // End of the time range from which the records will be fetched. + // The default value is the current timestamp. + optional google.protobuf.Timestamp stop = 4; +} - // ListPipelineTriggerChartRecordsResponse contains a list of pipeline trigger - // chart records. - message ListPipelineTriggerChartRecordsResponse { - // Pipeline trigger counts. Until we allow filtering or grouping by fields - // like pipeline ID, this list will contain only one element with the - // timeline of trigger counts for a given requester, regardless the pipeline - // ID, trigger mode, final status or other fields. - repeated PipelineTriggerChartRecord pipeline_trigger_chart_records = 1; - } -*/ +// ListModelTriggerChartRecordsResponse contains a list of model trigger +// chart records. +message ListModelTriggerChartRecordsResponse { + // Model trigger counts. Until we allow filtering or grouping by fields + // like model ID, this list will contain only one element with the + // timeline of trigger counts for a given requester, regardless the model + // ID, trigger mode, final status or other fields. + repeated ModelTriggerChartRecord model_trigger_chart_records = 1; +} // CreditConsumptionChartRecord represents a timeline of Instill Credit // consumption. It contains a collection of (timestamp, amount) pairs that @@ -174,7 +205,9 @@ message ListCreditConsumptionChartRecordsResponse { reserved 2; } +// ============================================================================= // Deprecated messages, to be removed with the new dashboard implementation. +// ============================================================================= // PipelineTriggerTableRecord contains pipeline trigger metrics, aggregated by // pipeline ID. @@ -218,9 +251,9 @@ message ListPipelineTriggerTableRecordsResponse { int32 total_size = 3; } -// ListPipelineTriggerChartRecordsRequest represents a request to list pipeline -// trigger metrics, aggregated by pipeline ID and time frame. -message ListPipelineTriggerChartRecordsRequest { +// ListPipelineTriggerChartRecordsV0Request represents a request to list +// pipeline trigger metrics, aggregated by pipeline ID and time frame. +message ListPipelineTriggerChartRecordsV0Request { // Aggregation window in nanoseconds. int32 aggregation_window = 1; // Filter can hold an [AIP-160](https://google.aip.dev/160)-compliant filter @@ -229,16 +262,16 @@ message ListPipelineTriggerChartRecordsRequest { optional string filter = 2 [(google.api.field_behavior) = OPTIONAL]; } -// ListPipelineTriggerChartRecordsResponse contains a list of pipeline trigger -// chart records. -message ListPipelineTriggerChartRecordsResponse { +// ListPipelineTriggerChartRecordsV0Response contains a list of pipeline +// trigger chart records. +message ListPipelineTriggerChartRecordsV0Response { // A list of pipeline trigger records. - repeated PipelineTriggerChartRecord pipeline_trigger_chart_records = 1; + repeated PipelineTriggerChartRecordV0 pipeline_trigger_chart_records = 1; } -// PipelineTriggerChartRecord contains pipeline trigger metrics, aggregated by -// pipeline ID and time frame. -message PipelineTriggerChartRecord { +// PipelineTriggerChartRecordV0 contains pipeline trigger metrics, aggregated +// by pipeline ID and time frame. +message PipelineTriggerChartRecordV0 { // Pipeline ID. string pipeline_id = 1; // Pipeline UUID. diff --git a/integration-test/proto/core/mgmt/v1beta/mgmt_public_service.proto b/integration-test/proto/core/mgmt/v1beta/mgmt_public_service.proto index 8f92b07..8dd0789 100644 --- a/integration-test/proto/core/mgmt/v1beta/mgmt_public_service.proto +++ b/integration-test/proto/core/mgmt/v1beta/mgmt_public_service.proto @@ -47,7 +47,13 @@ service MgmtPublicService { // Returns the details of the authenticated user. rpc GetAuthenticatedUser(GetAuthenticatedUserRequest) returns (GetAuthenticatedUserResponse) { option (google.api.http) = {get: "/v1beta/user"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "User"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Update the authenticated user @@ -61,7 +67,13 @@ service MgmtPublicService { patch: "/v1beta/user" body: "user" }; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "User"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // List users @@ -69,7 +81,13 @@ service MgmtPublicService { // Returns a paginated list of users. rpc ListUsers(ListUsersRequest) returns (ListUsersResponse) { option (google.api.http) = {get: "/v1beta/users"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "User"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Get a user @@ -77,7 +95,13 @@ service MgmtPublicService { // Returns the details of a user by their ID. rpc GetUser(GetUserRequest) returns (GetUserResponse) { option (google.api.http) = {get: "/v1beta/users/{user_id}"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "User"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Create an organization @@ -88,7 +112,13 @@ service MgmtPublicService { post: "/v1beta/organizations" body: "organization" }; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Organization"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // List organizations @@ -96,7 +126,13 @@ service MgmtPublicService { // Returns a paginated list of organizations. rpc ListOrganizations(ListOrganizationsRequest) returns (ListOrganizationsResponse) { option (google.api.http) = {get: "/v1beta/organizations"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Organization"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Get an organization @@ -104,7 +140,13 @@ service MgmtPublicService { // Returns the organization details by its ID. rpc GetOrganization(GetOrganizationRequest) returns (GetOrganizationResponse) { option (google.api.http) = {get: "/v1beta/organizations/{organization_id}"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Organization"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Update an organization @@ -118,7 +160,13 @@ service MgmtPublicService { patch: "/v1beta/organizations/{organization_id}" body: "organization" }; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Organization"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Delete an organization @@ -126,7 +174,13 @@ service MgmtPublicService { // Accesses and deletes an organization by ID. rpc DeleteOrganization(DeleteOrganizationRequest) returns (DeleteOrganizationResponse) { option (google.api.http) = {delete: "/v1beta/organizations/{organization_id}"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Organization"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // List user memberships @@ -134,7 +188,13 @@ service MgmtPublicService { // Returns the memberships of a user. rpc ListUserMemberships(ListUserMembershipsRequest) returns (ListUserMembershipsResponse) { option (google.api.http) = {get: "/v1beta/users/{user_id}/memberships"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Membership"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Get a user membership @@ -143,7 +203,13 @@ service MgmtPublicService { // organization. The authenticated must match the membership parent. rpc GetUserMembership(GetUserMembershipRequest) returns (GetUserMembershipResponse) { option (google.api.http) = {get: "/v1beta/users/{user_id}/memberships/{organization_id}"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Membership"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Update a user membership @@ -154,7 +220,13 @@ service MgmtPublicService { put: "/v1beta/users/{user_id}/memberships/{organization_id}" body: "membership" }; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Membership"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Delete a user membership @@ -162,7 +234,13 @@ service MgmtPublicService { // Accesses and deletes a user membership by parent and membership IDs. rpc DeleteUserMembership(DeleteUserMembershipRequest) returns (DeleteUserMembershipResponse) { option (google.api.http) = {delete: "/v1beta/users/{user_id}/memberships/{organization_id}"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Membership"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // List organization memberships @@ -170,7 +248,13 @@ service MgmtPublicService { // Returns a paginated list of the user memberships in an organization. rpc ListOrganizationMemberships(ListOrganizationMembershipsRequest) returns (ListOrganizationMembershipsResponse) { option (google.api.http) = {get: "/v1beta/organizations/{organization_id}/memberships"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Membership"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Get a an organization membership @@ -178,7 +262,13 @@ service MgmtPublicService { // Returns the details of a user membership within an organization. rpc GetOrganizationMembership(GetOrganizationMembershipRequest) returns (GetOrganizationMembershipResponse) { option (google.api.http) = {get: "/v1beta/organizations/{organization_id}/memberships/{user_id}"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Membership"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Uppdate an organization membership @@ -189,7 +279,13 @@ service MgmtPublicService { put: "/v1beta/organizations/{organization_id}/memberships/{user_id}" body: "membership" }; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Membership"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Delete an organization membership @@ -197,7 +293,13 @@ service MgmtPublicService { // Deletes a user membership within an organization. rpc DeleteOrganizationMembership(DeleteOrganizationMembershipRequest) returns (DeleteOrganizationMembershipResponse) { option (google.api.http) = {delete: "/v1beta/organizations/{organization_id}/memberships/{user_id}"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Membership"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Get the subscription of the authenticated user @@ -205,7 +307,13 @@ service MgmtPublicService { // Returns the subscription details of the authenticated user. rpc GetAuthenticatedUserSubscription(GetAuthenticatedUserSubscriptionRequest) returns (GetAuthenticatedUserSubscriptionResponse) { option (google.api.http) = {get: "/v1beta/user/subscription"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Subscription"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🤝 Subscription" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Get the subscription of an organization @@ -213,7 +321,13 @@ service MgmtPublicService { // Returns the subscription details of an organization. rpc GetOrganizationSubscription(GetOrganizationSubscriptionRequest) returns (GetOrganizationSubscriptionResponse) { option (google.api.http) = {get: "/v1beta/organizations/{organization_id}/subscription"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Subscription"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🤝 Subscription" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Create an API token @@ -224,7 +338,13 @@ service MgmtPublicService { post: "/v1beta/tokens" body: "token" }; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Token"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // List API tokens @@ -232,7 +352,13 @@ service MgmtPublicService { // Returns a paginated list of the API tokens of the authenticated user. rpc ListTokens(ListTokensRequest) returns (ListTokensResponse) { option (google.api.http) = {get: "/v1beta/tokens"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Token"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Get an API token @@ -240,7 +366,13 @@ service MgmtPublicService { // Returns the details of an API token. rpc GetToken(GetTokenRequest) returns (GetTokenResponse) { option (google.api.http) = {get: "/v1beta/tokens/{token_id}"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Token"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Delete an API token @@ -248,7 +380,13 @@ service MgmtPublicService { // Deletes an API token. rpc DeleteToken(DeleteTokenRequest) returns (DeleteTokenResponse) { option (google.api.http) = {delete: "/v1beta/tokens/{token_id}"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Token"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Validate an API token @@ -256,7 +394,13 @@ service MgmtPublicService { // Validates an API token. rpc ValidateToken(ValidateTokenRequest) returns (ValidateTokenResponse) { option (google.api.http) = {post: "/v1beta/validate_token"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Token"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Get the remaining Instill Credit @@ -268,7 +412,13 @@ service MgmtPublicService { // On Instill Core, this endpoint will return a 404 Not Found status. rpc GetRemainingCredit(GetRemainingCreditRequest) returns (GetRemainingCreditResponse) { option (google.api.http) = {get: "/v1beta/namespaces/{namespace_id}/credit"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Credit"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🤝 Subscription" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Check if a namespace is in use @@ -280,15 +430,13 @@ service MgmtPublicService { post: "/v1beta/check-namespace" body: "*" }; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Utils"}; - } - - // List pipeline triggers - // - // Returns a paginated list of pipeline executions. - rpc ListPipelineTriggerRecords(ListPipelineTriggerRecordsRequest) returns (ListPipelineTriggerRecordsResponse) { - option (google.api.http) = {get: "/v1beta/metrics/vdp/pipeline/triggers"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Metric"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "🪆 Namespace" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Get pipeline trigger count @@ -296,32 +444,67 @@ service MgmtPublicService { // Returns the pipeline trigger count of a given requester within a timespan. // Results are grouped by trigger status. rpc GetPipelineTriggerCount(GetPipelineTriggerCountRequest) returns (GetPipelineTriggerCountResponse) { - option (google.api.http) = {get: "/v1beta/metrics/vdp/pipeline/trigger-count"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Metric"}; + option (google.api.http) = {get: "/v1beta/pipeline-runs/count"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "📊 Metrics" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; // This endpoint will remain hidden until the new dashboard is implemented // in the frontend. Until then, the server might return empty data. option (google.api.method_visibility).restriction = "INTERNAL"; } - // List pipeline trigger metrics + // Get model trigger count // - // Returns a paginated list of pipeline executions aggregated by pipeline ID. - // NOTE: This method is deprecated and will be retired soon. - rpc ListPipelineTriggerTableRecords(ListPipelineTriggerTableRecordsRequest) returns (ListPipelineTriggerTableRecordsResponse) { - option (google.api.http) = {get: "/v1beta/metrics/vdp/pipeline/tables"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Metric"}; - option deprecated = true; + // Returns the model trigger count of a given requester within a timespan. + // Results are grouped by trigger status. + rpc GetModelTriggerCount(GetModelTriggerCountRequest) returns (GetModelTriggerCountResponse) { + option (google.api.http) = {get: "/v1beta/model-runs/count"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "📊 Metrics" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; + // This endpoint will remain hidden until the new dashboard is implemented + // in the frontend. Until then, the server might return empty data. option (google.api.method_visibility).restriction = "INTERNAL"; } // List pipeline trigger time charts // - // Returns a timeline of pipline trigger counts for the pipelines of a given - // owner. - // NOTE: This method will soon return the trigger counts of a given requester. + // Returns a timeline of pipline trigger counts for a given requester. The + // response will contain one set of records (datapoints), representing the + // amount of triggers in a time bucket. rpc ListPipelineTriggerChartRecords(ListPipelineTriggerChartRecordsRequest) returns (ListPipelineTriggerChartRecordsResponse) { - option (google.api.http) = {get: "/v1beta/metrics/vdp/pipeline/charts"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Metric"}; + option (google.api.http) = {get: "/v1beta/pipeline-runs/query-charts"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "📊 Metrics" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; + } + + // List model trigger time charts + // + // Returns a timeline of model trigger counts for a given requester. The + // response will contain one set of records (datapoints), representing the + // amount of triggers in a time bucket. + rpc ListModelTriggerChartRecords(ListModelTriggerChartRecordsRequest) returns (ListModelTriggerChartRecordsResponse) { + option (google.api.http) = {get: "/v1beta/model-runs/query-charts"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "📊 Metrics" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // List Instill Credit consumption time charts @@ -330,9 +513,17 @@ service MgmtPublicService { // response will contain one set of records (datapoints) per consumption // source (e.g. "pipeline", "model"). Each datapoint represents the amount // consumed in a time bucket. + // + // This endpoint is only exposed on Instill Cloud. rpc ListCreditConsumptionChartRecords(ListCreditConsumptionChartRecordsRequest) returns (ListCreditConsumptionChartRecordsResponse) { - option (google.api.http) = {get: "/v1beta/metrics/credit/charts"}; - option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Metric"}; + option (google.api.http) = {get: "/v1beta/credit/query-charts"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "📊 Metrics" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; } // Auth endpoints are only used in the community edition and the OpenAPI @@ -384,4 +575,63 @@ service MgmtPublicService { option (google.api.http) = {post: "/v1beta/auth/validate_access_token"}; option (google.api.method_visibility).restriction = "INTERNAL"; } + + // =========================================================================== + // Deprecated endpoints, to be retired after new pipeline dashboard is rolled + // out. + // =========================================================================== + + // List pipeline triggers + // + // Returns a paginated list of pipeline executions. + // NOTE: This method is deprecated and will be retired soon. + rpc ListPipelineTriggerRecords(ListPipelineTriggerRecordsRequest) returns (ListPipelineTriggerRecordsResponse) { + option (google.api.http) = {get: "/v1beta/metrics/vdp/pipeline/triggers"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "📊 Metrics" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; + option deprecated = true; + option (google.api.method_visibility).restriction = "INTERNAL"; + } + + + // List pipeline trigger metrics + // + // Returns a paginated list of pipeline executions aggregated by pipeline ID. + // NOTE: This method is deprecated and will be retired soon. + rpc ListPipelineTriggerTableRecords(ListPipelineTriggerTableRecordsRequest) returns (ListPipelineTriggerTableRecordsResponse) { + option (google.api.http) = {get: "/v1beta/metrics/vdp/pipeline/tables"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "📊 Metrics" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; + option deprecated = true; + option (google.api.method_visibility).restriction = "INTERNAL"; + } + + // List pipeline trigger time charts + // + // Returns a timeline of pipline trigger counts for the pipelines of a given + // owner. + // NOTE: This method will soon be retired and replaced by + // ListPipelineTriggerchartRecords. + rpc ListPipelineTriggerChartRecordsV0(ListPipelineTriggerChartRecordsV0Request) returns (ListPipelineTriggerChartRecordsV0Response) { + option (google.api.http) = {get: "/v1beta/metrics/vdp/pipeline/charts"}; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + tags: "📊 Metrics" + extensions: { + key: "x-stage" + value: {string_value: "beta"} + } + }; + option deprecated = true; + option (google.api.method_visibility).restriction = "INTERNAL"; + } } diff --git a/pkg/handler/publichandler.go b/pkg/handler/publichandler.go index 3c2420a..72346d3 100644 --- a/pkg/handler/publichandler.go +++ b/pkg/handler/publichandler.go @@ -853,6 +853,39 @@ func (h *PublicHandler) ValidateToken(ctx context.Context, req *mgmtPB.ValidateT return &mgmtPB.ValidateTokenResponse{UserUid: userUID}, nil } +// GetPipelineTriggerCount returns the pipeline trigger count of a given +// requester within a timespan. Results are grouped by trigger status. +func (h *PublicHandler) GetPipelineTriggerCount(ctx context.Context, req *mgmtPB.GetPipelineTriggerCountRequest) (*mgmtPB.GetPipelineTriggerCountResponse, error) { + eventName := "GetPipelineTriggerCount" + ctx, span := tracer.Start(ctx, eventName, + trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + + logUUID, _ := uuid.NewV4() + logger, _ := logger.GetZapLogger(ctx) + + ctxUserUID, err := h.Service.ExtractCtxUser(ctx, false) + if err != nil { + span.SetStatus(1, err.Error()) + return nil, err + } + + resp, err := h.Service.GetPipelineTriggerCount(ctx, req, ctxUserUID) + if err != nil { + span.SetStatus(1, err.Error()) + return nil, fmt.Errorf("fetching pipeline trigger count: %w", err) + } + + logger.Info(string(custom_otel.NewLogMessage( + span, + logUUID.String(), + ctxUserUID, + eventName, + ))) + + return resp, nil +} + // GetModelTriggerCount returns the model trigger count of a given // requester within a timespan. Results are grouped by trigger status. func (h *PublicHandler) GetModelTriggerCount(ctx context.Context, req *mgmtPB.GetModelTriggerCountRequest) (*mgmtPB.GetModelTriggerCountResponse, error) { @@ -873,7 +906,7 @@ func (h *PublicHandler) GetModelTriggerCount(ctx context.Context, req *mgmtPB.Ge resp, err := h.Service.GetModelTriggerCount(ctx, req, ctxUserUID) if err != nil { span.SetStatus(1, err.Error()) - return nil, fmt.Errorf("fetching credit chart records: %w", err) + return nil, fmt.Errorf("fetching model trigger count: %w", err) } logger.Info(string(custom_otel.NewLogMessage( @@ -1015,12 +1048,45 @@ func (h *PublicHandler) ListPipelineTriggerTableRecords(ctx context.Context, req // ListPipelineTriggerChartRecords returns a timeline of a requester's pipeline // trigger count. func (h *PublicHandler) ListPipelineTriggerChartRecords(ctx context.Context, req *mgmtPB.ListPipelineTriggerChartRecordsRequest) (*mgmtPB.ListPipelineTriggerChartRecordsResponse, error) { - eventName := "ListPipelineTriggerChartRecords" ctx, span := tracer.Start(ctx, eventName, trace.WithSpanKind(trace.SpanKindServer)) defer span.End() + logUUID, _ := uuid.NewV4() + logger, _ := logger.GetZapLogger(ctx) + + ctxUserUID, err := h.Service.ExtractCtxUser(ctx, false) + if err != nil { + span.SetStatus(1, err.Error()) + return nil, err + } + + resp, err := h.Service.ListPipelineTriggerChartRecords(ctx, req, ctxUserUID) + if err != nil { + span.SetStatus(1, err.Error()) + return nil, fmt.Errorf("fetching credit chart records: %w", err) + } + + logger.Info(string(custom_otel.NewLogMessage( + span, + logUUID.String(), + ctxUserUID, + eventName, + ))) + + return resp, nil +} + +// ListPipelineTriggerChartRecordsV0 returns a timeline of a requester's pipeline +// trigger count. +func (h *PublicHandler) ListPipelineTriggerChartRecordsV0(ctx context.Context, req *mgmtPB.ListPipelineTriggerChartRecordsV0Request) (*mgmtPB.ListPipelineTriggerChartRecordsV0Response, error) { + + eventName := "ListPipelineTriggerChartRecordsV0" + ctx, span := tracer.Start(ctx, eventName, + trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + logUUID, _ := uuid.NewV4() logger, _ := logger.GetZapLogger(ctx) @@ -1062,13 +1128,13 @@ func (h *PublicHandler) ListPipelineTriggerChartRecords(ctx context.Context, req return nil, err } - pipelineTriggerChartRecords, err := h.Service.ListPipelineTriggerChartRecords(ctx, pbUser, int64(req.GetAggregationWindow()), filter) + pipelineTriggerChartRecords, err := h.Service.ListPipelineTriggerChartRecordsV0(ctx, pbUser, int64(req.GetAggregationWindow()), filter) if err != nil { span.SetStatus(1, err.Error()) return nil, err } - resp := mgmtPB.ListPipelineTriggerChartRecordsResponse{ + resp := mgmtPB.ListPipelineTriggerChartRecordsV0Response{ PipelineTriggerChartRecords: pipelineTriggerChartRecords, } diff --git a/pkg/repository/influx.go b/pkg/repository/influx.go index 4a7135a..6da7b9c 100644 --- a/pkg/repository/influx.go +++ b/pkg/repository/influx.go @@ -34,9 +34,13 @@ var defaultAggregationWindow = time.Hour.Nanoseconds() type InfluxDB interface { QueryPipelineTriggerRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (pipelines []*mgmtpb.PipelineTriggerRecord, totalSize int64, nextPageToken string, err error) QueryPipelineTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerTableRecord, totalSize int64, nextPageToken string, err error) - QueryPipelineTriggerChartRecords(ctx context.Context, owner string, ownerQueryString string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerChartRecord, err error) - GetModelTriggerCount(ctx context.Context, p GetModelTriggerCountParams) (*mgmtpb.GetModelTriggerCountResponse, error) - ListModelTriggerChartRecords(ctx context.Context, p ListModelTriggerChartRecordsParams) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) + QueryPipelineTriggerChartRecordsV0(ctx context.Context, owner string, ownerQueryString string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerChartRecordV0, err error) + + ListPipelineTriggerChartRecords(context.Context, ListTriggerChartRecordsParams) (*mgmtpb.ListPipelineTriggerChartRecordsResponse, error) + GetPipelineTriggerCount(context.Context, GetTriggerCountParams) (*mgmtpb.GetPipelineTriggerCountResponse, error) + + ListModelTriggerChartRecords(context.Context, ListTriggerChartRecordsParams) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) + GetModelTriggerCount(context.Context, GetTriggerCountParams) (*mgmtpb.GetModelTriggerCountResponse, error) Bucket() string QueryAPI() api.QueryAPI @@ -112,16 +116,6 @@ func (i *influxDB) Bucket() string { return i.bucket } -// AggregationWindowOffset computes the offset to apply to InfluxDB's -// aggregateWindow function when aggregating by day. This function computes -// windows independently, starting from the Unix epoch, rather than from the -// provided time range start. This function computes the offset to shift the -// windows correctly. -func AggregationWindowOffset(t time.Time) time.Duration { - startOfDay := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC) - return t.Sub(startOfDay) -} - func (i *influxDB) QueryPipelineTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerTableRecord, totalSize int64, nextPageToken string, err error) { logger, _ := logger.GetZapLogger(ctx) @@ -312,7 +306,7 @@ func (i *influxDB) QueryPipelineTriggerTableRecords(ctx context.Context, owner s return records, int64(len(records)), pageToken, nil } -func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner string, ownerQueryString string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerChartRecord, err error) { +func (i *influxDB) QueryPipelineTriggerChartRecordsV0(ctx context.Context, owner string, ownerQueryString string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerChartRecordV0, err error) { logger, _ := logger.GetZapLogger(ctx) @@ -399,7 +393,7 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s } var currentTablePosition = -1 - var chartRecord *mgmtpb.PipelineTriggerChartRecord + var chartRecord *mgmtpb.PipelineTriggerChartRecordV0 // Iterate over query response for result.Next() { @@ -409,7 +403,7 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s } if result.Record().Table() != currentTablePosition { // only insert a new object when iterated to a new pipeline - chartRecord = &mgmtpb.PipelineTriggerChartRecord{} + chartRecord = &mgmtpb.PipelineTriggerChartRecordV0{} if v, match := result.Record().ValueByKey(constant.PipelineID).(string); match { chartRecord.PipelineId = v @@ -451,38 +445,48 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s return records, nil } -const qModelTriggerCount = ` +const ( + pipelineTriggerMeasurement = "pipeline.trigger.v1" + modelTriggerMeasurement = "model.trigger.v1" +) + +const qTriggerCount = ` from(bucket: "%s") |> range(start: %s, stop: %s) - |> filter(fn: (r) => r._measurement == "model.trigger.v1" and r.requester_uid == "%s") + |> filter(fn: (r) => r._measurement == "%s" and r.requester_uid == "%s") |> filter(fn: (r) => r._field == "trigger_time") |> group(columns: ["requester_uid", "status"]) |> count(column: "_value") ` -// GetModelTriggerCountParams contains the required information to -// query the model trigger count of a namespace. +// GetTriggerCountParams contains the required information to query the +// pipeline or model trigger count of a namespace. // TODO jvallesm: this should be defined in the service package for better // decoupling. At the moment this implies breaking an import cycle with many // dependencies. -type GetModelTriggerCountParams struct { +type GetTriggerCountParams struct { RequesterUID uuid.UUID Start time.Time Stop time.Time } -func (i *influxDB) GetModelTriggerCount( +func (i *influxDB) getTriggerCount( ctx context.Context, - p GetModelTriggerCountParams, -) (*mgmtpb.GetModelTriggerCountResponse, error) { + p GetTriggerCountParams, + measurement string, +) ([]*mgmtpb.TriggerCount, error) { l, _ := logger.GetZapLogger(ctx) - l = l.With(zap.Reflect("triggerCountParams", p)) + l = l.With( + zap.Reflect("triggerCountParams", p), + zap.String("measurement", measurement), + ) query := fmt.Sprintf( - qModelTriggerCount, + qTriggerCount, i.Bucket(), p.Start.Format(time.RFC3339Nano), p.Stop.Format(time.RFC3339Nano), + measurement, p.RequesterUID.String(), ) result, err := i.QueryAPI().Query(ctx, query) @@ -505,7 +509,7 @@ func (i *influxDB) GetModelTriggerCount( count, match := result.Record().Value().(int64) if !match { - l.Error("Missing count on model trigger count record.") + l.Error("Missing count on trigger count record.") } countRecords = append(countRecords, &mgmtpb.TriggerCount{ @@ -515,30 +519,76 @@ func (i *influxDB) GetModelTriggerCount( } if result.Err() != nil { - return nil, fmt.Errorf("collecting information from model trigger count records: %w", err) + return nil, fmt.Errorf("collecting information from trigger count records: %w", err) } if result.Record() == nil { return nil, nil } + return countRecords, nil +} + +func (i *influxDB) GetPipelineTriggerCount(ctx context.Context, p GetTriggerCountParams) (*mgmtpb.GetPipelineTriggerCountResponse, error) { + countRecords, err := i.getTriggerCount(ctx, p, pipelineTriggerMeasurement) + if err != nil { + return nil, err + } + + return &mgmtpb.GetPipelineTriggerCountResponse{ + PipelineTriggerCounts: countRecords, + }, nil +} + +func (i *influxDB) GetModelTriggerCount(ctx context.Context, p GetTriggerCountParams) (*mgmtpb.GetModelTriggerCountResponse, error) { + countRecords, err := i.getTriggerCount(ctx, p, modelTriggerMeasurement) + if err != nil { + return nil, err + } + return &mgmtpb.GetModelTriggerCountResponse{ ModelTriggerCounts: countRecords, }, nil } -const qModelTriggerChartRecords = ` +const qTriggerChartRecords = ` from(bucket: "%s") |> range(start: %s, stop: %s) - |> filter(fn: (r) => r._measurement == "model.trigger.v1" and r.requester_uid == "%s") + |> filter(fn: (r) => r._measurement == "%s" and r.requester_uid == "%s") |> filter(fn: (r) => r._field == "trigger_time") |> group(columns:["requester_uid"]) |> aggregateWindow(every: %s, column:"_value", fn: count, createEmpty: true, offset: %s) ` -// ListModelTriggerChartRecordsParams contains the required information to -// query the model triggers of a namespace. -type ListModelTriggerChartRecordsParams struct { +func (i *influxDB) listTriggerChartRecords( + ctx context.Context, + p ListTriggerChartRecordsParams, + measurement string, +) (*api.QueryTableResult, error) { + query := fmt.Sprintf( + qTriggerChartRecords, + i.Bucket(), + p.Start.Format(time.RFC3339Nano), + p.Stop.Format(time.RFC3339Nano), + measurement, + p.RequesterUID.String(), + p.AggregationWindow, + AggregationWindowOffset(p.Start).String(), + ) + result, err := i.QueryAPI().Query(ctx, query) + if err != nil { + return nil, fmt.Errorf("%w: querying data from InfluxDB: %w", errdomain.ErrInvalidArgument, err) + } + + return result, nil +} + +// ListTriggerChartRecordsParams contains the required information to query the +// triggers of a requester. +// TODO jvallesm: this should be defined in the service package for better +// decoupling. At the moment this implies breaking an import cycle with many +// dependencies. +type ListTriggerChartRecordsParams struct { RequesterID string RequesterUID uuid.UUID AggregationWindow time.Duration @@ -546,25 +596,72 @@ type ListModelTriggerChartRecordsParams struct { Stop time.Time } +func (i *influxDB) ListPipelineTriggerChartRecords( + ctx context.Context, + p ListTriggerChartRecordsParams, +) (*mgmtpb.ListPipelineTriggerChartRecordsResponse, error) { + l, _ := logger.GetZapLogger(ctx) + l = l.With( + zap.Reflect("triggerChartParams", p), + zap.String("measurement", pipelineTriggerMeasurement), + ) + + result, err := i.listTriggerChartRecords(ctx, p, pipelineTriggerMeasurement) + if err != nil { + return nil, err + } + + defer result.Close() + + record := &mgmtpb.PipelineTriggerChartRecord{ + RequesterId: p.RequesterID, + TimeBuckets: []*timestamppb.Timestamp{}, + TriggerCounts: []int32{}, + } + + // Until filtering and grouping are implemented, we'll only have one record + // (total triggers by requester). + records := []*mgmtpb.PipelineTriggerChartRecord{record} + + for result.Next() { + t := result.Record().Time() + record.TimeBuckets = append(record.TimeBuckets, timestamppb.New(t)) + + v, match := result.Record().Value().(int64) + if !match { + l.With(zap.Time("_time", result.Record().Time())). + Error("Missing count on trigger chart record.") + } + + record.TriggerCounts = append(record.TriggerCounts, int32(v)) + } + + if result.Err() != nil { + return nil, fmt.Errorf("collecting information from trigger chart records: %w", err) + } + + if result.Record() == nil { + return nil, nil + } + + return &mgmtpb.ListPipelineTriggerChartRecordsResponse{ + PipelineTriggerChartRecords: records, + }, nil +} + func (i *influxDB) ListModelTriggerChartRecords( ctx context.Context, - p ListModelTriggerChartRecordsParams, + p ListTriggerChartRecordsParams, ) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) { l, _ := logger.GetZapLogger(ctx) - l = l.With(zap.Reflect("triggerChartParams", p)) - - query := fmt.Sprintf( - qModelTriggerChartRecords, - i.Bucket(), - p.Start.Format(time.RFC3339Nano), - p.Stop.Format(time.RFC3339Nano), - p.RequesterUID.String(), - p.AggregationWindow, - AggregationWindowOffset(p.Start).String(), + l = l.With( + zap.Reflect("triggerChartParams", p), + zap.String("measurement", modelTriggerMeasurement), ) - result, err := i.QueryAPI().Query(ctx, query) + + result, err := i.listTriggerChartRecords(ctx, p, modelTriggerMeasurement) if err != nil { - return nil, fmt.Errorf("%w: querying data from InfluxDB: %w", errdomain.ErrInvalidArgument, err) + return nil, err } defer result.Close() @@ -773,3 +870,13 @@ func (i *influxDB) QueryPipelineTriggerRecords(ctx context.Context, owner string } return records, int64(len(records)), pageToken, nil } + +// AggregationWindowOffset computes the offset to apply to InfluxDB's +// aggregateWindow function when aggregating by day. This function computes +// windows independently, starting from the Unix epoch, rather than from the +// provided time range start. This function computes the offset to shift the +// windows correctly. +func AggregationWindowOffset(t time.Time) time.Duration { + startOfDay := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC) + return t.Sub(startOfDay) +} diff --git a/pkg/service/metric.go b/pkg/service/metric.go index ffc76bc..bcb3b42 100644 --- a/pkg/service/metric.go +++ b/pkg/service/metric.go @@ -144,19 +144,60 @@ func (s *service) ListPipelineTriggerTableRecords(ctx context.Context, owner *mg return pipelineTriggerTableRecords, ps, pt, nil } -func (s *service) ListPipelineTriggerChartRecords(ctx context.Context, owner *mgmtpb.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerChartRecord, error) { +func (s *service) ListPipelineTriggerChartRecords( + ctx context.Context, + req *mgmtpb.ListPipelineTriggerChartRecordsRequest, + ctxUserUID uuid.UUID, +) (*mgmtpb.ListPipelineTriggerChartRecordsResponse, error) { + nsUID, err := s.GrantedNamespaceUID(ctx, req.GetRequesterId(), ctxUserUID) + if err != nil { + return nil, fmt.Errorf("checking user permissions: %w", err) + } + + now := time.Now().UTC() + p := repository.ListTriggerChartRecordsParams{ + RequesterID: req.GetRequesterId(), + RequesterUID: nsUID, + + // Default values + AggregationWindow: 1 * time.Hour, + Start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()), + Stop: now, + } + + if req.GetAggregationWindow() != "" { + window, err := time.ParseDuration(req.GetAggregationWindow()) + if err != nil { + return nil, fmt.Errorf("%w: extracting duration from aggregation window: %w", errdomain.ErrInvalidArgument, err) + } + + p.AggregationWindow = window + } + + if req.GetStart() != nil { + p.Start = req.GetStart().AsTime() + } + + if req.GetStop() != nil { + p.Stop = req.GetStop().AsTime() + } + + return s.influxDB.ListPipelineTriggerChartRecords(ctx, p) +} + +func (s *service) ListPipelineTriggerChartRecordsV0(ctx context.Context, owner *mgmtpb.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerChartRecordV0, error) { ownerUID, ownerID, ownerType, ownerQueryString, filter, err := s.checkPipelineOwnership(ctx, filter, owner) if err != nil { - return []*mgmtpb.PipelineTriggerChartRecord{}, err + return []*mgmtpb.PipelineTriggerChartRecordV0{}, err } filter, err = s.pipelineUIDLookup(ctx, ownerID, ownerType, filter, owner) if err != nil { - return []*mgmtpb.PipelineTriggerChartRecord{}, nil + return []*mgmtpb.PipelineTriggerChartRecordV0{}, nil } - pipelineTriggerChartRecords, err := s.influxDB.QueryPipelineTriggerChartRecords(ctx, *ownerUID, ownerQueryString, aggregationWindow, filter) + pipelineTriggerChartRecords, err := s.influxDB.QueryPipelineTriggerChartRecordsV0(ctx, *ownerUID, ownerQueryString, aggregationWindow, filter) if err != nil { return nil, err } @@ -164,6 +205,36 @@ func (s *service) ListPipelineTriggerChartRecords(ctx context.Context, owner *mg return pipelineTriggerChartRecords, nil } +func (s *service) GetPipelineTriggerCount( + ctx context.Context, + req *mgmtpb.GetPipelineTriggerCountRequest, + ctxUserUID uuid.UUID, +) (*mgmtpb.GetPipelineTriggerCountResponse, error) { + requesterUID, err := s.GrantedNamespaceUID(ctx, req.GetRequesterId(), ctxUserUID) + if err != nil { + return nil, fmt.Errorf("checking user permissions: %w", err) + } + + now := time.Now().UTC() + p := repository.GetTriggerCountParams{ + RequesterUID: requesterUID, + + // Default values + Start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()), + Stop: now, + } + + if req.GetStart() != nil { + p.Start = req.GetStart().AsTime() + } + + if req.GetStop() != nil { + p.Stop = req.GetStop().AsTime() + } + + return s.influxDB.GetPipelineTriggerCount(ctx, p) +} + func (s *service) GetModelTriggerCount(ctx context.Context, req *mgmtpb.GetModelTriggerCountRequest, ctxUserUID uuid.UUID) (*mgmtpb.GetModelTriggerCountResponse, error) { requesterUID, err := s.GrantedNamespaceUID(ctx, req.GetRequesterId(), ctxUserUID) if err != nil { @@ -171,7 +242,7 @@ func (s *service) GetModelTriggerCount(ctx context.Context, req *mgmtpb.GetModel } now := time.Now().UTC() - p := repository.GetModelTriggerCountParams{ + p := repository.GetTriggerCountParams{ RequesterUID: requesterUID, // Default values @@ -201,7 +272,7 @@ func (s *service) ListModelTriggerChartRecords( } now := time.Now().UTC() - p := repository.ListModelTriggerChartRecordsParams{ + p := repository.ListTriggerChartRecordsParams{ RequesterID: req.GetRequesterId(), RequesterUID: nsUID, diff --git a/pkg/service/service.go b/pkg/service/service.go index 778c1a6..ec99cbe 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -72,8 +72,10 @@ type Service interface { ListPipelineTriggerRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerRecord, int64, string, error) ListPipelineTriggerTableRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerTableRecord, int64, string, error) - ListPipelineTriggerChartRecords(ctx context.Context, owner *mgmtpb.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerChartRecord, error) - GetModelTriggerCount(ctx context.Context, req *mgmtpb.GetModelTriggerCountRequest, ctxUserUID uuid.UUID) (*mgmtpb.GetModelTriggerCountResponse, error) + ListPipelineTriggerChartRecords(_ context.Context, _ *mgmtpb.ListPipelineTriggerChartRecordsRequest, ctxUserUID uuid.UUID) (*mgmtpb.ListPipelineTriggerChartRecordsResponse, error) + ListPipelineTriggerChartRecordsV0(ctx context.Context, owner *mgmtpb.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerChartRecordV0, error) + GetPipelineTriggerCount(_ context.Context, _ *mgmtpb.GetPipelineTriggerCountRequest, ctxUserUID uuid.UUID) (*mgmtpb.GetPipelineTriggerCountResponse, error) + GetModelTriggerCount(_ context.Context, _ *mgmtpb.GetModelTriggerCountRequest, ctxUserUID uuid.UUID) (*mgmtpb.GetModelTriggerCountResponse, error) ListModelTriggerChartRecords(ctx context.Context, req *mgmtpb.ListModelTriggerChartRecordsRequest, ctxUserUID uuid.UUID) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) DBUser2PBUser(ctx context.Context, dbUser *datamodel.Owner) (*mgmtpb.User, error)