Skip to content

Commit

Permalink
Relationships selected in SQL-based datastores should elide
Browse files Browse the repository at this point in the history
columns that have static values

This vastly reduces data over the wire, as well as deserialization time and memory usage
  • Loading branch information
josephschorr committed Oct 27, 2024
1 parent c11aac6 commit 22b8fb8
Show file tree
Hide file tree
Showing 21 changed files with 625 additions and 641 deletions.
154 changes: 154 additions & 0 deletions internal/datastore/common/relationships.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package common

import (
"context"
"database/sql"
"fmt"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/authzed/spicedb/pkg/datastore"
corev1 "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/tuple"
)

const errUnableToQueryRels = "unable to query relationships: %w"

// StaticValueOrAddColumnForSelect adds a column to the list of columns to select if the value
// is not static, otherwise it sets the value to the static value.
func StaticValueOrAddColumnForSelect(colsToSelect []any, queryInfo QueryInfo, colName string, field *string) []any {
// If the value is static, set the field to it and return.
if found, ok := queryInfo.FilteringValues[colName]; ok && found.SingleValue != nil {
*field = *found.SingleValue
return colsToSelect
}

// Otherwise, add the column to the list of columns to select, as the value is not static.
colsToSelect = append(colsToSelect, field)
return colsToSelect
}

// Querier is an interface for querying the database.
type Querier[R Rows] interface {
QueryFunc(ctx context.Context, f func(context.Context, R) error, sql string, args ...any) error
}

// Rows is a common interface for database rows reading.
type Rows interface {
Scan(dest ...any) error
Next() bool
Err() error
}

type closeRowsWithError interface {
Rows
Close() error
}

type closeRows interface {
Rows
Close()
}

// QueryRelationships queries relationships for the given query and transaction.
func QueryRelationships[R Rows, C ~map[string]any](ctx context.Context, queryInfo QueryInfo, sqlStatement string, args []any, span trace.Span, tx Querier[R], withIntegrity bool) (datastore.RelationshipIterator, error) {
defer span.End()

colsToSelect := make([]any, 0, 8)
var resourceObjectType string
var resourceObjectID string
var resourceRelation string
var subjectObjectType string
var subjectObjectID string
var subjectRelation string
var caveatName sql.NullString
var caveatCtx C

var integrityKeyID string
var integrityHash []byte
var timestamp time.Time

colsToSelect = StaticValueOrAddColumnForSelect(colsToSelect, queryInfo, queryInfo.Schema.ColNamespace, &resourceObjectType)
colsToSelect = StaticValueOrAddColumnForSelect(colsToSelect, queryInfo, queryInfo.Schema.ColObjectID, &resourceObjectID)
colsToSelect = StaticValueOrAddColumnForSelect(colsToSelect, queryInfo, queryInfo.Schema.ColRelation, &resourceRelation)
colsToSelect = StaticValueOrAddColumnForSelect(colsToSelect, queryInfo, queryInfo.Schema.ColUsersetNamespace, &subjectObjectType)
colsToSelect = StaticValueOrAddColumnForSelect(colsToSelect, queryInfo, queryInfo.Schema.ColUsersetObjectID, &subjectObjectID)
colsToSelect = StaticValueOrAddColumnForSelect(colsToSelect, queryInfo, queryInfo.Schema.ColUsersetRelation, &subjectRelation)

colsToSelect = append(colsToSelect, &caveatName, &caveatCtx)
if withIntegrity {
colsToSelect = append(colsToSelect, &integrityKeyID, &integrityHash, &timestamp)
}

return func(yield func(tuple.Relationship, error) bool) {
err := tx.QueryFunc(ctx, func(ctx context.Context, rows R) error {
var r Rows = rows
if crwe, ok := r.(closeRowsWithError); ok {
defer LogOnError(ctx, crwe.Close)
} else if cr, ok := r.(closeRows); ok {
defer cr.Close()
}

span.AddEvent("Query issued to database")
relCount := 0
for rows.Next() {
if err := rows.Scan(colsToSelect...); err != nil {
return fmt.Errorf(errUnableToQueryRels, fmt.Errorf("scan err: %w", err))
}

var caveat *corev1.ContextualizedCaveat
if caveatName.Valid {
var err error
caveat, err = ContextualizedCaveatFrom(caveatName.String, caveatCtx)
if err != nil {
return fmt.Errorf(errUnableToQueryRels, fmt.Errorf("unable to fetch caveat context: %w", err))
}
}

var integrity *corev1.RelationshipIntegrity
if integrityKeyID != "" {
integrity = &corev1.RelationshipIntegrity{
KeyId: integrityKeyID,
Hash: integrityHash,
HashedAt: timestamppb.New(timestamp),
}
}

relCount++
if !yield(tuple.Relationship{
RelationshipReference: tuple.RelationshipReference{
Resource: tuple.ObjectAndRelation{
ObjectType: resourceObjectType,
ObjectID: resourceObjectID,
Relation: resourceRelation,
},
Subject: tuple.ObjectAndRelation{
ObjectType: subjectObjectType,
ObjectID: subjectObjectID,
Relation: subjectRelation,
},
},
OptionalCaveat: caveat,
OptionalIntegrity: integrity,
}, nil) {
return nil
}
}

if err := rows.Err(); err != nil {
return fmt.Errorf(errUnableToQueryRels, fmt.Errorf("rows err: %w", err))
}

span.AddEvent("Rels loaded", trace.WithAttributes(attribute.Int("relCount", relCount)))
return nil
}, sqlStatement, args...)
if err != nil {
if !yield(tuple.Relationship{}, err) {
return
}
}
}, nil
}
File renamed without changes.
Loading

0 comments on commit 22b8fb8

Please sign in to comment.