Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parse consistency from headers on kube request #74

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions pkg/authz/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ func runAllMatchingChecks(ctx context.Context, matchingRules []*rules.RunnableRu
return err
}
req := &v1.CheckPermissionRequest{
Consistency: &v1.Consistency{
Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true},
},
Consistency: input.Consistency,
Resource: &v1.ObjectReference{
ObjectType: rel.ResourceType,
ObjectId: rel.ResourceID,
Expand Down
36 changes: 21 additions & 15 deletions pkg/authz/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"slices"
"time"

"github.com/kyverno/go-jmespath"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -79,9 +80,7 @@ func filterList(ctx context.Context, client v1.PermissionsServiceClient, filter
defer close(authzData.removedNNC)

req := &v1.LookupResourcesRequest{
Consistency: &v1.Consistency{
Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true},
},
Consistency: input.Consistency,
ResourceObjectType: filter.Rel.ResourceType,
Permission: filter.Rel.ResourceRelation,
Subject: &v1.SubjectReference{
Expand Down Expand Up @@ -173,6 +172,10 @@ func filterWatch(ctx context.Context, client v1.PermissionsServiceClient, watchC
defer close(authzData.allowedNNC)
defer close(authzData.removedNNC)

logger := klog.LoggerWithValues(klog.FromContext(ctx), "request", "watch", "filter", filter).WithCallDepth(1)

logger.V(3).Info("started watch")

watchResource, err := watchClient.Watch(ctx, &v1.WatchRequest{
OptionalObjectTypes: []string{filter.Rel.ResourceType},
})
Expand All @@ -187,14 +190,18 @@ func filterWatch(ctx context.Context, client v1.PermissionsServiceClient, watchC
}

if err != nil {
fmt.Println(err)
logger.V(2).Error(err, "watch error")
return
}

time.Sleep(input.WatchDelay)

for _, u := range resp.Updates {
cr, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{
Consistency: &v1.Consistency{
Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true},
Requirement: &v1.Consistency_AtLeastAsFresh{
AtLeastAsFresh: resp.ChangesThrough,
},
},
Resource: &v1.ObjectReference{
ObjectType: filter.Rel.ResourceType,
Expand All @@ -211,46 +218,45 @@ func filterWatch(ctx context.Context, client v1.PermissionsServiceClient, watchC
},
})
if err != nil {
fmt.Println(err)
logger.V(2).Error(err, "check permission error")
return
}

byteIn, err := json.Marshal(wrapper{ResourceID: u.Relationship.Resource.ObjectId, SubjectID: u.Relationship.Subject.Object.ObjectId})
if err != nil {
fmt.Println(err)
logger.V(2).Error(err, "marshal error")
return
}
var data any
if err := json.Unmarshal(byteIn, &data); err != nil {
fmt.Println(err)
logger.V(2).Error(err, "unmarshal error")
return
}
fmt.Println(data)
fmt.Println("RESPONSE", string(byteIn))

logger.V(5).Info("response", "data", data)

name, err := filter.Name.Search(data)
if err != nil {
fmt.Println(err)
klog.V(2).ErrorS(err, "error extracting name")
return
}
fmt.Println("GOT NAME", name)
if name == nil || len(name.(string)) == 0 {
return
}
namespace, err := filter.Namespace.Search(data)
if err != nil {
fmt.Println(err)
logger.V(2).Error(err, "namespace extract error")
return
}
fmt.Println("GOT NAMESPACE", namespace)
if namespace == nil {
namespace = ""
}
nn := types.NamespacedName{Name: name.(string), Namespace: namespace.(string)}
logger.V(4).Info("response object", "namespacedName", nn.String())

// TODO: this should really be over a single channel to prevent
// races on add/remove
fmt.Println(u.Relationship.Resource.ObjectId, cr.Permissionship)
logger.V(4).Info("result", "object", u.Relationship.Resource.ObjectId, "permission", cr.Permissionship)
if cr.Permissionship == v1.CheckPermissionResponse_PERMISSIONSHIP_HAS_PERMISSION {
authzData.allowedNNC <- nn
} else {
Expand Down
79 changes: 79 additions & 0 deletions pkg/rules/consistency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package rules

import (
"net/http"
"strings"
"time"

v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
)

const (
WatchDelayKey = "SpiceDB-Watch-Delay"
ConsistencyHeaderKey = "SpiceDB-Consistency"
FullyConsistentHeaderValue = "Full"
AtExactSnapshotHeaderValuePrefix = "Exact "
AtLeastAsFreshHeaderValuePrefix = "At-Least "
)

var MinimizeLatency = &v1.Consistency{
Requirement: &v1.Consistency_MinimizeLatency{MinimizeLatency: true},
}

var FullyConsistent = &v1.Consistency{
Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true},
}

// ConsistencyFromHeaders returns a consistency block given a consistency
// header. Defaults to Minimize Latency, otherwise returns the first match in order:
// - Fully Consistent
// - At Exact Snapshot
// - At Least As Fresh
// - Minimize Latency
func ConsistencyFromHeaders(headers http.Header) *v1.Consistency {
consistencyValue := headers.Get(ConsistencyHeaderKey)
if len(consistencyValue) == 0 {
return MinimizeLatency
}

switch consistencyValue[0] {
case 'F':
fallthrough
case 'f':
return FullyConsistent
case 'E':
fallthrough
case 'e':
return &v1.Consistency{
Requirement: &v1.Consistency_AtExactSnapshot{
AtExactSnapshot: &v1.ZedToken{Token: strings.TrimPrefix(consistencyValue, AtExactSnapshotHeaderValuePrefix)},
},
}
case 'A':
fallthrough
case 'a':
return &v1.Consistency{
Requirement: &v1.Consistency_AtLeastAsFresh{
AtLeastAsFresh: &v1.ZedToken{Token: strings.TrimPrefix(consistencyValue, AtLeastAsFreshHeaderValuePrefix)},
},
}
}

return MinimizeLatency
}

// WatchDelayFromHeaders returns a time duration from the delay header.
// This is used to delay sending the check request to SpiceDB on a watch event,
// so that non-fully-consistent modes can be used with watch.
func WatchDelayFromHeaders(headers http.Header) time.Duration {
delay := headers.Get(WatchDelayKey)
if len(delay) == 0 {
return 0
}

duration, err := time.ParseDuration(delay)
if err != nil {
return 0
}
return duration
}
107 changes: 107 additions & 0 deletions pkg/rules/consistency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package rules

import (
"net/http"
"reflect"
"testing"
"time"

v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/authzed/spicedb/pkg/datastore/revision"
"github.com/authzed/spicedb/pkg/zedtoken"
"github.com/shopspring/decimal"
)

func MustIntZedToken(t int64) string {
return zedtoken.MustNewFromRevision(revision.NewFromDecimal(decimal.NewFromInt(t))).String()
}

func TestConsistencyFromHeader(t *testing.T) {
tests := []struct {
name string
headers http.Header
want *v1.Consistency
}{
{
name: "defaults to minimize latency",
want: MinimizeLatency,
},
{
name: "fully consistent",
headers: map[string][]string{
ConsistencyHeaderKey: {FullyConsistentHeaderValue},
},
want: FullyConsistent,
},
{
name: "at exact snapshot",
headers: map[string][]string{
ConsistencyHeaderKey: {AtExactSnapshotHeaderValuePrefix + MustIntZedToken(5)},
},
want: &v1.Consistency{
Requirement: &v1.Consistency_AtExactSnapshot{
AtExactSnapshot: &v1.ZedToken{Token: MustIntZedToken(5)},
},
},
},
{
name: "at least as fresh",
headers: map[string][]string{
ConsistencyHeaderKey: {AtLeastAsFreshHeaderValuePrefix + MustIntZedToken(3)},
},
want: &v1.Consistency{
Requirement: &v1.Consistency_AtLeastAsFresh{
AtLeastAsFresh: &v1.ZedToken{Token: MustIntZedToken(3)},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
h := http.Header{}
for k, v := range tt.headers {
for _, vv := range v {
h.Set(k, vv)
}
}
if got := ConsistencyFromHeaders(h); !reflect.DeepEqual(got, tt.want) {
t.Errorf("ConsistencyFromHeaders() = %v, want %v", got, tt.want)
}
})
}
}

func TestWatchDelayFromHeaders(t *testing.T) {
tests := []struct {
name string
headers http.Header
want time.Duration
}{
{
name: "no headers",
want: time.Duration(0),
},
{
name: "bad fmt",
headers: map[string][]string{WatchDelayKey: {"123qwewe"}},
},
{
name: "set delay",
headers: map[string][]string{WatchDelayKey: {"6s"}},
want: time.Second * 6,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
h := http.Header{}
for k, v := range tt.headers {
for _, vv := range v {
h.Set(k, vv)
}
}
if got := WatchDelayFromHeaders(h); got != tt.want {
t.Errorf("WatchDelayFromHeaders() = %v, want %v", got, tt.want)
}
})
}
}
6 changes: 6 additions & 0 deletions pkg/rules/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"regexp"
"slices"
"strings"
"time"

v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/kyverno/go-jmespath"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -137,6 +139,8 @@ type ResolveInput struct {
Object *metav1.PartialObjectMetadata `json:"object"`
Body []byte `json:"body"`
Headers http.Header `json:"headers"`
Consistency *v1.Consistency `json:"consistency"`
WatchDelay time.Duration `json:"watchDelay"`
}

func NewResolveInputFromHttp(req *http.Request) (*ResolveInput, error) {
Expand Down Expand Up @@ -209,6 +213,8 @@ func NewResolveInput(req *request.RequestInfo, user *user.DefaultInfo, object *m
Object: object,
Body: body,
Headers: headers,
Consistency: ConsistencyFromHeaders(headers),
WatchDelay: WatchDelayFromHeaders(headers),
}
}

Expand Down
Loading