diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go index b63e5574f6..b600f372bc 100644 --- a/internal/datastore/postgres/postgres_shared_test.go +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -188,6 +188,15 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { MigrationPhase(config.migrationPhase), )) + t.Run("TestSerializationError", createDatastoreTest( + b, + SerializationErrorTest, + RevisionQuantization(0), + GCWindow(1*time.Millisecond), + WatchBufferLength(50), + MigrationPhase(config.migrationPhase), + )) + t.Run("TestStrictReadMode", createReplicaDatastoreTest( b, StrictReadModeTest, @@ -270,6 +279,36 @@ func createReplicaDatastoreTest(b testdatastore.RunningEngineForTest, tf datasto } } +func SerializationErrorTest(t *testing.T, ds datastore.Datastore) { + require := require.New(t) + + ctx := context.Background() + r, err := ds.ReadyState(ctx) + require.NoError(err) + require.True(r.IsReady) + + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + updates := []tuple.RelationshipUpdate{ + tuple.Create(tuple.MustParse("resource:resource#reader@user:user#...")), + } + rwt.(*pgReadWriteTXN).tx = txWithSerializationError{rwt.(*pgReadWriteTXN).tx} + return rwt.WriteRelationships(ctx, updates) + }) + + require.Contains(err.Error(), "unable to write relationships due to a serialization error") +} + +type txWithSerializationError struct { + pgx.Tx +} + +func (txwse txWithSerializationError) Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error) { + return pgconn.CommandTag{}, &pgconn.PgError{ + Code: pgSerializationFailure, + Message: "fake serialization error", + } +} + func GarbageCollectionTest(t *testing.T, ds datastore.Datastore) { require := require.New(t) diff --git a/internal/datastore/postgres/readwrite.go b/internal/datastore/postgres/readwrite.go index 48b93c7471..69a16224df 100644 --- a/internal/datastore/postgres/readwrite.go +++ b/internal/datastore/postgres/readwrite.go @@ -18,6 +18,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jzelinskie/stringz" + "github.com/authzed/spicedb/internal/datastore/common" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" "github.com/authzed/spicedb/pkg/datastore" core "github.com/authzed/spicedb/pkg/proto/core/v1" @@ -123,7 +124,7 @@ func (rwt *pgReadWriteTXN) collectSimplifiedTouchTypes(ctx context.Context, muta namespaces, err := rwt.LookupNamespacesWithNames(ctx, touchedResourceNamespaces.AsSlice()) if err != nil { - return nil, fmt.Errorf(errUnableToWriteRelationships, err) + return nil, handleWriteError(err) } if len(namespaces) == 0 { @@ -148,7 +149,7 @@ func (rwt *pgReadWriteTXN) collectSimplifiedTouchTypes(ctx context.Context, muta vts, err := typesystem.NewNamespaceTypeSystem(nsDef, typesystem.ResolverForDatastoreReader(rwt)) if err != nil { - return nil, fmt.Errorf(errUnableToWriteRelationships, err) + return nil, handleWriteError(err) } notAllowed, err := vts.RelationDoesNotAllowCaveatsForSubject(rel.Resource.Relation, rel.Subject.ObjectType) @@ -211,12 +212,12 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []t if hasCreateInserts { sql, args, err := createInserts.ToSql() if err != nil { - return fmt.Errorf(errUnableToWriteRelationships, err) + return handleWriteError(err) } _, err = rwt.tx.Exec(ctx, sql, args...) if err != nil { - return fmt.Errorf(errUnableToWriteRelationships, err) + return handleWriteError(err) } } @@ -234,12 +235,12 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []t sql, args, err := touchInserts.ToSql() if err != nil { - return fmt.Errorf(errUnableToWriteRelationships, err) + return handleWriteError(err) } rows, err := rwt.tx.Query(ctx, sql, args...) if err != nil { - return fmt.Errorf(errUnableToWriteRelationships, err) + return handleWriteError(err) } defer rows.Close() @@ -263,7 +264,7 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []t &subjectRelation, ) if err != nil { - return fmt.Errorf(errUnableToWriteRelationships, err) + return handleWriteError(err) } rel := tuple.Relationship{ @@ -329,12 +330,12 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []t Set(colDeletedXid, rwt.newXID). ToSql() if err != nil { - return fmt.Errorf(errUnableToWriteRelationships, err) + return handleWriteError(err) } rows, err := rwt.tx.Query(ctx, sql, args...) if err != nil { - return fmt.Errorf(errUnableToWriteRelationships, err) + return handleWriteError(err) } defer rows.Close() @@ -359,7 +360,7 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []t &subjectRelation, ) if err != nil { - return fmt.Errorf(errUnableToWriteRelationships, err) + return handleWriteError(err) } deletedTpl := tuple.Relationship{ @@ -397,17 +398,25 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []t // Otherwise execute the INSERTs for the caveated-changes TOUCHed relationships. sql, args, err = touchWrite.ToSql() if err != nil { - return fmt.Errorf(errUnableToWriteRelationships, err) + return handleWriteError(err) } _, err = rwt.tx.Exec(ctx, sql, args...) if err != nil { - return fmt.Errorf(errUnableToWriteRelationships, err) + return handleWriteError(err) } return nil } +func handleWriteError(err error) error { + if pgxcommon.IsSerializationError(err) { + return common.NewSerializationError(fmt.Errorf("unable to write relationships due to a serialization error: [%w]; this typically indicates that a number of write transactions are contending over the same relationships; either reduce the contention or scale this Postgres instance", err)) + } + + return fmt.Errorf(errUnableToWriteRelationships, err) +} + func (rwt *pgReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) { delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...) if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {