Skip to content

Commit

Permalink
Merge pull request #1129 from tigrisdata/main
Browse files Browse the repository at this point in the history
Latest beta
  • Loading branch information
garrensmith authored May 3, 2023
2 parents b53892d + f8064b4 commit 2db6bd8
Show file tree
Hide file tree
Showing 37 changed files with 982 additions and 119 deletions.
2 changes: 1 addition & 1 deletion api/proto
Submodule proto updated from f346cf to d5c440
13 changes: 13 additions & 0 deletions api/server/v1/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package api

import (
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -228,6 +229,18 @@ func (e *TigrisError) GRPCStatus() *status.Status {
return st
}

// ToAPIError is used when it is needed to manually produce error in wire format.
// For example in the batch API like CreateOrUpdateCollections.
func ToAPIError(err error) *Error {
var ep *TigrisError

if errors.As(err, &ep) {
return &Error{Code: ep.Code, Message: ep.Message}
}

return &Error{Code: Code_INTERNAL, Message: err.Error()}
}

// MarshalStatus marshal status object.
func MarshalStatus(status *spb.Status) ([]byte, error) {
resp := struct {
Expand Down
43 changes: 43 additions & 0 deletions api/server/v1/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
jsoniter "github.com/json-iterator/go"
"github.com/tigrisdata/tigris/util"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -526,6 +527,48 @@ func (x *CreateOrUpdateCollectionRequest) UnmarshalJSON(data []byte) error {
return nil
}

// UnmarshalJSON on CreateCollectionsRequest avoids unmarshalling schemas. The req handler deserializes the schema.
func (x *CreateOrUpdateCollectionsRequest) UnmarshalJSON(data []byte) error {
var mp map[string]jsoniter.RawMessage

if err := jsoniter.Unmarshal(data, &mp); err != nil {
return err
}

for key, value := range mp {
var v interface{}

switch key {
case "project":
v = &x.Project
case "branch":
v = &x.Branch
case "only_create":
v = &x.OnlyCreate
case "schemas":
var schemas []jsoniter.RawMessage

if err := jsoniter.Unmarshal(value, &schemas); err != nil {
return err
}

x.Schemas = util.RawMessageToByte(schemas)

continue
case "options":
v = &x.Options
default:
continue
}

if err := jsoniter.Unmarshal(value, v); err != nil {
return err
}
}

return nil
}

var operations = map[string]TigrisOperation{
"ALL": TigrisOperation_ALL,
"READ": TigrisOperation_READ,
Expand Down
3 changes: 2 additions & 1 deletion api/server/v1/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ const (
DeleteMethodName = apiMethodPrefix + "Delete"
ReadMethodName = apiMethodPrefix + "Read"

IndexCollection = apiMethodPrefix + "IndexCollection"
IndexCollection = apiMethodPrefix + "IndexCollection"
SearchIndexCollectionMethodName = apiMethodPrefix + "BuildSearchIndex"

SearchMethodName = apiMethodPrefix + "Search"

Expand Down
20 changes: 11 additions & 9 deletions cmd/consistency/workload/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package workload

import (
"fmt"
"math/rand"
"time"

"github.com/brianvoe/gofakeit/v6"
Expand All @@ -29,8 +29,8 @@ type IDocument interface {
}

type Document struct {
Id int64 `json:"id"`
F2 string
Id int64 `json:"id"`
F2 string `json:"F2" fake:"{sentence:50}"`
F3 []byte
F4 uuid.UUID
F5 time.Time
Expand All @@ -41,13 +41,15 @@ func (d *Document) ID() int64 {
}

func NewDocument(uniqueId int64) *Document {
return &Document{
Id: uniqueId,
F2: fmt.Sprintf("id_%d", uniqueId),
F3: []byte(`1234`),
F4: uuid.New(),
F5: time.Now().UTC(),
var document Document
if err := gofakeit.Struct(&document); err != nil {
log.Panic().Err(err).Msgf("failed in generating fake document")
}
document.Id = uniqueId
document.F3 = []byte(document.F2[0:rand.Intn(len(document.F2))]) //nolint:gosec
document.F4 = uuid.New()
document.F5 = time.Now().UTC()
return &document
}

func SerializeDocV1(doc *DocumentV1) ([]byte, error) {
Expand Down
11 changes: 7 additions & 4 deletions server/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,15 @@ type Billing struct {
}

type Metronome struct {
Enabled bool `mapstructure:"enabled" yaml:"enabled" json:"enabled"`
URL string `mapstructure:"url" yaml:"url" json:"url"`
ApiKey string `mapstructure:"api_key" yaml:"api_key" json:"api_key"`
DefaultPlan string `mapstructure:"default_plan" yaml:"default_plan" json:"default_plan"`
Enabled bool `mapstructure:"enabled" yaml:"enabled" json:"enabled"`
URL string `mapstructure:"url" yaml:"url" json:"url"`
ApiKey string `mapstructure:"api_key" yaml:"api_key" json:"api_key"`
DefaultPlan string `mapstructure:"default_plan" yaml:"default_plan" json:"default_plan"`
BilledMetrics BilledMetrics `mapstructure:"billed_metrics" yaml:"billed_metrics" json:"billed_metrics"`
}

type BilledMetrics = map[string]string

type BillingReporter struct {
Enabled bool `mapstructure:"enabled" yaml:"enabled" json:"enabled"`
RefreshInterval time.Duration `mapstructure:"refresh_interval" yaml:"refresh_interval" json:"refresh_interval"`
Expand Down
8 changes: 4 additions & 4 deletions server/metrics/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ func FormDatadogQueryNoMeta(namespace string, noMeta bool, req *api.QueryTimeSer
switch {
case req.TigrisOperation == api.TigrisOperation_WRITE:
if noMeta {
tags = append(tags, "grpc_method IN (createproject,deleteproject,createorupdatecollection,dropcollection,insert,update,delete,replace,publish)")
tags = append(tags, "grpc_method IN (createproject,deleteproject,createorupdatecollection,dropcollection,insert,update,delete,replace,publish,buildcollectionindex,import,createbranch,deletebranch,createorupdateindex,deleteindex,createbyid,create,createorreplace,update,delete,deletebyquery)")
} else {
tags = append(tags, "grpc_method IN (insert,update,delete,replace,publish)")
tags = append(tags, "grpc_method IN (insert,update,delete,replace,publish,buildcollectionindex,import,createbyid,create,createorreplace,update,delete,deletebyquery)")
}
case req.TigrisOperation == api.TigrisOperation_READ:
if noMeta {
tags = append(tags, "grpc_method IN (listprojects,listcollections,describeproject,describecollection, read,search,subscribe)")
tags = append(tags, "grpc_method IN (listprojects,listcollections,describedatabase,describecollection,read,search,subscribe,count,explain,listbranches,getindex,listindexes)")
} else {
tags = append(tags, "grpc_method IN (read,search,subscribe)")
tags = append(tags, "grpc_method IN (read,search,subscribe,explain,get)")
}
case req.TigrisOperation == api.TigrisOperation_METADATA:
tags = append(tags, "grpc_method IN (createorupdatecollection,dropcollection,listprojects,listcollections,createproject,deleteproject,describeproject,describecollection)")
Expand Down
8 changes: 4 additions & 4 deletions server/metrics/datadog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestDatadogQueryFormation(t *testing.T) {
}
formedQuery, err = FormDatadogQuery("", req)
require.NoError(t, err)
require.Equal(t, "sum:requests_count_ok.count{grpc_method IN (read,search,subscribe) AND project:db1 AND collection:col1} by {db,collection}.as_rate()", formedQuery)
require.Equal(t, "sum:requests_count_ok.count{grpc_method IN (read,search,subscribe,explain,get) AND project:db1 AND collection:col1} by {db,collection}.as_rate()", formedQuery)

req = &api.QueryTimeSeriesMetricsRequest{
Db: "db1",
Expand All @@ -151,7 +151,7 @@ func TestDatadogQueryFormation(t *testing.T) {
}
formedQuery, err = FormDatadogQuery("", req)
require.NoError(t, err)
require.Equal(t, "sum:requests_count_ok.count{grpc_method IN (insert,update,delete,replace,publish) AND project:db1 AND collection:col1} by {db,collection}.as_rate()", formedQuery)
require.Equal(t, "sum:requests_count_ok.count{grpc_method IN (insert,update,delete,replace,publish,buildcollectionindex,import,createbyid,create,createorreplace,update,delete,deletebyquery) AND project:db1 AND collection:col1} by {db,collection}.as_rate()", formedQuery)

req = &api.QueryTimeSeriesMetricsRequest{
From: 1,
Expand All @@ -163,7 +163,7 @@ func TestDatadogQueryFormation(t *testing.T) {
}
formedQuery, err = FormDatadogQuery("", req)
require.NoError(t, err)
require.Equal(t, "sum:requests_count_ok.count{grpc_method IN (insert,update,delete,replace,publish)}.as_rate()", formedQuery)
require.Equal(t, "sum:requests_count_ok.count{grpc_method IN (insert,update,delete,replace,publish,buildcollectionindex,import,createbyid,create,createorreplace,update,delete,deletebyquery)}.as_rate()", formedQuery)

req = &api.QueryTimeSeriesMetricsRequest{
From: 1,
Expand All @@ -175,7 +175,7 @@ func TestDatadogQueryFormation(t *testing.T) {
}
formedQuery, err = FormDatadogQuery("", req)
require.NoError(t, err)
require.Equal(t, "sum:requests_count_ok.count{grpc_method IN (read,search,subscribe)}.as_rate()", formedQuery)
require.Equal(t, "sum:requests_count_ok.count{grpc_method IN (read,search,subscribe,explain,get)}.as_rate()", formedQuery)

req = &api.QueryTimeSeriesMetricsRequest{
From: 1,
Expand Down
2 changes: 1 addition & 1 deletion server/metrics/measurement.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (m *Measurement) RecordDuration(scope tally.Scope, tags map[string]string)
case SecondaryIndexRespTime, SecondaryIndexErrorRespTime:
timerEnabled = cfg.SecondaryIndex.Timer.TimerEnabled
histogramEnabled = cfg.SecondaryIndex.Timer.HistogramEnabled
case MetronomeCreateAccount, MetronomeAddPlan, MetronomeIngest, MetronomeGetInvoice, MetronomeListInvoices:
case MetronomeCreateAccount, MetronomeAddPlan, MetronomeIngest, MetronomeGetInvoice, MetronomeListInvoices, MetronomeGetUsage:
timerEnabled = true
histogramEnabled = true
}
Expand Down
2 changes: 2 additions & 0 deletions server/metrics/metronome.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
MetronomeIngest tally.Scope
MetronomeListInvoices tally.Scope
MetronomeGetInvoice tally.Scope
MetronomeGetUsage tally.Scope
)

func initializeMetronomeScopes() {
Expand All @@ -34,6 +35,7 @@ func initializeMetronomeScopes() {
MetronomeIngest = MetronomeMetrics.SubScope("ingest")
MetronomeListInvoices = MetronomeMetrics.SubScope("list_invoices")
MetronomeGetInvoice = MetronomeMetrics.SubScope("get_invoice")
MetronomeGetUsage = MetronomeMetrics.SubScope("get_usage")
}

func GetResponseCodeTags(code int) map[string]string {
Expand Down
8 changes: 6 additions & 2 deletions server/middleware/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ func timeoutUnaryServerInterceptor(timeout time.Duration) grpc.UnaryServerInterc
ctx, cancel = setDeadlineUsingHeader(ctx)

d, ok := ctx.Deadline()
if ok && info.FullMethod != api.IndexCollection && time.Until(d) > MaximumTimeout {
if ok && !isLongRunningAPI(info.FullMethod) && time.Until(d) > MaximumTimeout {
timeout = MaximumTimeout
ok = false
}

if !ok && info.FullMethod == api.IndexCollection {
if !ok && isLongRunningAPI(info.FullMethod) {
timeout = LongRunningTimeout
}

Expand All @@ -65,6 +65,10 @@ func timeoutUnaryServerInterceptor(timeout time.Duration) grpc.UnaryServerInterc
}
}

func isLongRunningAPI(method string) bool {
return method == api.IndexCollection || method == api.SearchIndexCollectionMethodName
}

func setDeadlineUsingHeader(ctx context.Context) (context.Context, context.CancelFunc) {
value := api.GetHeader(ctx, api.HeaderRequestTimeout)
if len(value) == 0 {
Expand Down
61 changes: 60 additions & 1 deletion server/services/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/fullstorydev/grpchan/inprocgrpc"
"github.com/go-chi/chi/v5"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
jsoniter "github.com/json-iterator/go"
"github.com/rs/zerolog/log"
api "github.com/tigrisdata/tigris/api/server/v1"
"github.com/tigrisdata/tigris/errors"
Expand Down Expand Up @@ -177,7 +178,7 @@ func (s *apiService) CommitTransaction(ctx context.Context, _ *api.CommitTransac

err := session.Commit(s.versionH, session.GetTx().Context().GetStagedDatabase() != nil, nil)
if err != nil {
return nil, err
return nil, database.CreateApiError(err)
}

return &api.CommitTransactionResponse{}, nil
Expand Down Expand Up @@ -255,6 +256,21 @@ func (s *apiService) BuildCollectionIndex(ctx context.Context, r *api.BuildColle
return resp.Response.(*api.BuildCollectionIndexResponse), nil
}

func (s *apiService) BuildSearchIndex(ctx context.Context, r *api.BuildCollectionSearchIndexRequest) (*api.BuildCollectionSearchIndexResponse, error) {
qm := metrics.WriteQueryMetrics{}
accessToken, _ := request.GetAccessToken(ctx)

resp, err := s.sessions.ReadOnlyExecute(ctx, s.runnerFactory.GetSearchIndexRunner(r, &qm, accessToken), database.ReqOptions{
MetadataChange: true,
InstantVerTracking: true,
})
if err != nil {
return nil, err
}

return resp.Response.(*api.BuildCollectionSearchIndexResponse), nil
}

func (s *apiService) Replace(ctx context.Context, r *api.ReplaceRequest) (*api.ReplaceResponse, error) {
qm := metrics.WriteQueryMetrics{}
accessToken, _ := request.GetAccessToken(ctx)
Expand Down Expand Up @@ -391,6 +407,49 @@ func (s *apiService) CreateOrUpdateCollection(ctx context.Context, r *api.Create
}, nil
}

func (s *apiService) CreateOrUpdateCollections(ctx context.Context, r *api.CreateOrUpdateCollectionsRequest) (*api.CreateOrUpdateCollectionsResponse, error) {
accessToken, _ := request.GetAccessToken(ctx)

resp := &api.CreateOrUpdateCollectionsResponse{
Resp: make([]*api.CreateCollectionStatus, len(r.Schemas)),
}

for i, c := range r.Schemas {
coll := jsoniter.Get(c, "title")
if len(coll.ToString()) == 0 {
resp.FailedAtIndex = int32(i)
resp.Error = &api.Error{Code: api.Code_INVALID_ARGUMENT, Message: "Collection name is empty at index %d"}

return resp, nil
}

runner := s.runnerFactory.GetCollectionQueryRunner(accessToken)
runner.SetCreateOrUpdateCollectionReq(&api.CreateOrUpdateCollectionRequest{
Collection: coll.ToString(),
Schema: c,
OnlyCreate: r.OnlyCreate,
Branch: r.Branch,
Project: r.Project,
})

oneResp, err := s.sessions.Execute(ctx, runner, database.ReqOptions{
TxCtx: api.GetTransaction(ctx),
MetadataChange: true,
InstantVerTracking: true,
})
if err != nil {
resp.Error = api.ToAPIError(err)
resp.FailedAtIndex = int32(i)

return resp, nil
}

resp.Resp[i] = &api.CreateCollectionStatus{Status: oneResp.Status}
}

return resp, nil
}

func (s *apiService) DropCollection(ctx context.Context, r *api.DropCollectionRequest) (*api.DropCollectionResponse, error) {
accessToken, _ := request.GetAccessToken(ctx)
runner := s.runnerFactory.GetCollectionQueryRunner(accessToken)
Expand Down
2 changes: 1 addition & 1 deletion server/services/v1/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (b *billingService) ListInvoices(ctx context.Context, req *api.ListInvoices
return b.GetInvoices(ctx, mId, req)
}

func (b *billingService) getMetronomeId(ctx context.Context, namespaceId string) (billing.MetronomeId, error) {
func (b *billingService) getMetronomeId(ctx context.Context, namespaceId string) (billing.AccountId, error) {
nsMeta := b.nsMgr.GetNamespaceMetadata(ctx, namespaceId)
if nsMeta == nil {
log.Warn().Msgf("Could not find namespace, this must not happen with right authn/authz configured")
Expand Down
Loading

0 comments on commit 2db6bd8

Please sign in to comment.