Skip to content

Commit

Permalink
NK-588 Add storage index listing pagination support (#1276)
Browse files Browse the repository at this point in the history
  • Loading branch information
sesposito authored Oct 21, 2024
1 parent 5a8ff0c commit 8b0e9b9
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 53 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr

### Changed
- Increased limit on runtimes group users list functions.
- Added pagination support to storage index listing.

### Fixed
- Ensure matchmaker stats behave correctly if matchmaker becomes fully empty and idle.
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0
github.com/heroiclabs/nakama-common v1.33.1-0.20240920140332-3cdf52bdf781
github.com/heroiclabs/nakama-common v1.33.1-0.20241021170115-c7b5486ef0aa
github.com/heroiclabs/sql-migrate v0.0.0-20240528102547-233afc8cf05a
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
github.com/jackc/pgx/v5 v5.6.0
Expand Down Expand Up @@ -74,5 +74,3 @@ require (
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect
)

replace github.com/heroiclabs/nakama-common => ../nakama-common
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZH
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/heroiclabs/nakama-common v1.33.1-0.20240920140332-3cdf52bdf781 h1:mVjenNkPCNqQscldkvoBmmeCVS6MXzoN1rDd2u/+BbE=
github.com/heroiclabs/nakama-common v1.33.1-0.20240920140332-3cdf52bdf781/go.mod h1:lPG64MVCs0/tEkh311Cd6oHX9NLx2vAPx7WW7QCJHQ0=
github.com/heroiclabs/nakama-common v1.33.1-0.20241021170115-c7b5486ef0aa h1:AWkD2AwYSMexj1mPArQvs2xkZTkvXvzqdvHlN4UeHOs=
github.com/heroiclabs/nakama-common v1.33.1-0.20241021170115-c7b5486ef0aa/go.mod h1:lPG64MVCs0/tEkh311Cd6oHX9NLx2vAPx7WW7QCJHQ0=
github.com/heroiclabs/sql-migrate v0.0.0-20240528102547-233afc8cf05a h1:tuL2ZPaeCbNw8rXmV9ywd00nXRv95V4/FmbIGKLQJAE=
github.com/heroiclabs/sql-migrate v0.0.0-20240528102547-233afc8cf05a/go.mod h1:hzCTPoEi/oml2BllVydJcNP63S7b56e5DzeQeLGvw1U=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
Expand Down
12 changes: 7 additions & 5 deletions server/runtime_go_nakama.go
Original file line number Diff line number Diff line change
Expand Up @@ -2151,27 +2151,29 @@ func (n *RuntimeGoNakamaModule) StorageDelete(ctx context.Context, deletes []*ru
// @param queryString(type=string) Query to filter index entries.
// @param limit(type=int) Maximum number of results to be returned.
// @param order(type=[]string, optional=true) The storage object fields to sort the query results by. The prefix '-' before a field name indicates descending order. All specified fields must be indexed and sortable.
// @param cursor(type=string) A cursor to fetch the next page of results.
// @return objects(*api.StorageObjectList) A list of storage objects.
// @return cursor(type=string) An optional cursor that can be used to retrieve the next page of records (if any).
// @return error(error) An optional error value if an error occurred.
func (n *RuntimeGoNakamaModule) StorageIndexList(ctx context.Context, callerID, indexName, query string, limit int, order []string) (*api.StorageObjects, error) {
func (n *RuntimeGoNakamaModule) StorageIndexList(ctx context.Context, callerID, indexName, query string, limit int, order []string, cursor string) (*api.StorageObjects, string, error) {
cid := uuid.Nil
if callerID != "" {
id, err := uuid.FromString(callerID)
if err != nil {
return nil, errors.New("expects caller id to be empty or a valid user id")
return nil, "", errors.New("expects caller id to be empty or a valid user id")
}
cid = id
}

if indexName == "" {
return nil, errors.New("expects a non-empty indexName")
return nil, "", errors.New("expects a non-empty indexName")
}

if limit < 1 || limit > 10_000 {
return nil, errors.New("limit must be 1-10000")
return nil, "", errors.New("limit must be 1-10000")
}

return n.storageIndex.List(ctx, cid, indexName, query, limit, order)
return n.storageIndex.List(ctx, cid, indexName, query, limit, order, cursor)
}

// @group users
Expand Down
45 changes: 29 additions & 16 deletions server/runtime_javascript_nakama.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (n *runtimeJavascriptNakamaModule) stringToBinary(r *goja.Runtime) func(goj
// @param limit(type=int) Maximum number of results to be returned.
// @param order(type=[]string, optional=true) The storage object fields to sort the query results by. The prefix '-' before a field name indicates descending order. All specified fields must be indexed and sortable.
// @param callerId(type=string, optional=true) User ID of the caller, will apply permissions checks of the user. If empty defaults to system user and permission checks are bypassed.
// @return objects(nkruntime.StorageObjectList) A list of storage objects.
// @return objects(nkruntime.StorageIndexResult) A list of storage objects.
// @return error(error) An optional error value if an error occurred.
func (n *runtimeJavascriptNakamaModule) storageIndexList(r *goja.Runtime) func(goja.FunctionCall) goja.Value {
return func(f goja.FunctionCall) goja.Value {
Expand Down Expand Up @@ -391,39 +391,52 @@ func (n *runtimeJavascriptNakamaModule) storageIndexList(r *goja.Runtime) func(g
callerID = cid
}

objectList, err := n.storageIndex.List(n.ctx, callerID, idxName, queryString, int(limit), order)
var cursor string
if !goja.IsUndefined(f.Argument(5)) && !goja.IsNull(f.Argument(5)) {
cursor = getJsString(r, f.Argument(5))
}

objectList, newCursor, err := n.storageIndex.List(n.ctx, callerID, idxName, queryString, int(limit), order, cursor)
if err != nil {
panic(r.NewGoError(fmt.Errorf("failed to lookup storage index: %s", err.Error())))
}

objects := make([]interface{}, 0, len(objectList.Objects))
objects := make([]any, 0, len(objectList.Objects))
for _, o := range objectList.Objects {
objectMap := make(map[string]interface{}, 9)
objectMap["key"] = o.Key
objectMap["collection"] = o.Collection
obj := r.NewObject()
_ = obj.Set("key", o.Key)
_ = obj.Set("collection", o.Collection)
if o.UserId != "" {
objectMap["userId"] = o.UserId
_ = obj.Set("userId", o.UserId)
} else {
objectMap["userId"] = nil
_ = obj.Set("userId", nil)
}
objectMap["version"] = o.Version
objectMap["permissionRead"] = o.PermissionRead
objectMap["permissionWrite"] = o.PermissionWrite
objectMap["createTime"] = o.CreateTime.Seconds
objectMap["updateTime"] = o.UpdateTime.Seconds
_ = obj.Set("version", o.Version)
_ = obj.Set("permissionRead", o.PermissionRead)
_ = obj.Set("permissionWrite", o.PermissionWrite)
_ = obj.Set("createTime", o.CreateTime.Seconds)
_ = obj.Set("updateTime", o.UpdateTime.Seconds)

valueMap := make(map[string]interface{})
err = json.Unmarshal([]byte(o.Value), &valueMap)
if err != nil {
panic(r.NewGoError(fmt.Errorf("failed to convert value to json: %s", err.Error())))
}
pointerizeSlices(valueMap)
objectMap["value"] = valueMap
_ = obj.Set("value", valueMap)

objects = append(objects, objectMap)
objects = append(objects, obj)
}

outObj := r.NewObject()
_ = outObj.Set("objects", r.NewArray(objects...))
if newCursor != "" {
_ = outObj.Set("cursor", newCursor)
} else {
_ = outObj.Set("cursor", goja.Null())
}

return r.ToValue(objects)
return r.ToValue(outObj)
}
}

Expand Down
13 changes: 11 additions & 2 deletions server/runtime_lua_nakama.go
Original file line number Diff line number Diff line change
Expand Up @@ -10391,6 +10391,7 @@ func (n *RuntimeLuaNakamaModule) channelIdBuild(l *lua.LState) int {
// @param order(type=[]string, optional=true) The storage object fields to sort the query results by. The prefix '-' before a field name indicates descending order. All specified fields must be indexed and sortable.
// @param callerId(type=string, optional=true) User ID of the caller, will apply permissions checks of the user. If empty defaults to system user and permission checks are bypassed.
// @return objects(table) A list of storage objects.
// @return objects(string) A cursor, if there's a next page of results, nil otherwise.
// @return error(error) An optional error value if an error occurred.
func (n *RuntimeLuaNakamaModule) storageIndexList(l *lua.LState) int {
idxName := l.CheckString(1)
Expand Down Expand Up @@ -10421,7 +10422,9 @@ func (n *RuntimeLuaNakamaModule) storageIndexList(l *lua.LState) int {
callerID = cid
}

objectList, err := n.storageIndex.List(l.Context(), callerID, idxName, queryString, limit, order)
cursor := l.OptString(6, "")

objectList, newCursor, err := n.storageIndex.List(l.Context(), callerID, idxName, queryString, limit, order, cursor)
if err != nil {
l.RaiseError(err.Error())
return 0
Expand Down Expand Up @@ -10456,7 +10459,13 @@ func (n *RuntimeLuaNakamaModule) storageIndexList(l *lua.LState) int {
}
l.Push(lv)

return 1
if newCursor != "" {
l.Push(lua.LString(newCursor))
} else {
l.Push(lua.LNil)
}

return 2
}

// @group satori
Expand Down
84 changes: 72 additions & 12 deletions server/storage_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
package server

import (
"bytes"
"context"
"database/sql"
"encoding/base64"
"encoding/gob"
"encoding/json"
"errors"
"fmt"
"slices"
"time"

"github.com/blugelabs/bluge"
Expand All @@ -35,7 +39,7 @@ import (
type StorageIndex interface {
Write(ctx context.Context, objects []*api.StorageObject) (creates int, deletes int)
Delete(ctx context.Context, objects StorageOpDeletes) (deletes int)
List(ctx context.Context, callerID uuid.UUID, indexName, query string, limit int, order []string) (*api.StorageObjects, error)
List(ctx context.Context, callerID uuid.UUID, indexName, query string, limit int, order []string, cursor string) (*api.StorageObjects, string, error)
Load(ctx context.Context) error
CreateIndex(ctx context.Context, name, collection, key string, fields []string, sortFields []string, maxEntries int, indexOnly bool) error
RegisterFilters(runtime *Runtime)
Expand Down Expand Up @@ -218,10 +222,17 @@ func (si *LocalStorageIndex) Delete(ctx context.Context, objects StorageOpDelete
return deletes
}

func (si *LocalStorageIndex) List(ctx context.Context, callerID uuid.UUID, indexName, query string, limit int, order []string) (*api.StorageObjects, error) {
type indexListCursor struct {
Query string
Offset int
Limit int
Order []string
}

func (si *LocalStorageIndex) List(ctx context.Context, callerID uuid.UUID, indexName, query string, limit int, order []string, cursor string) (*api.StorageObjects, string, error) {
idx, found := si.indexByName[indexName]
if !found {
return nil, fmt.Errorf("index %q not found", indexName)
return nil, "", fmt.Errorf("index %q not found", indexName)
}

if limit > idx.MaxEntries {
Expand All @@ -232,34 +243,83 @@ func (si *LocalStorageIndex) List(ctx context.Context, callerID uuid.UUID, index
query = "*"
}

var idxCursor *indexListCursor
if cursor != "" {
idxCursor = &indexListCursor{}
cb, err := base64.RawURLEncoding.DecodeString(cursor)
if err != nil {
si.logger.Error("Could not base64 decode notification cursor.", zap.String("cursor", cursor))
return nil, "", errors.New("invalid cursor")
}
if err := gob.NewDecoder(bytes.NewReader(cb)).Decode(idxCursor); err != nil {
si.logger.Error("Could not decode notification cursor.", zap.String("cursor", cursor))
return nil, "", errors.New("invalid cursor")
}

if query != idxCursor.Query {
return nil, "", fmt.Errorf("invalid cursor: query mismatch")
}
if limit != idxCursor.Limit {
return nil, "", fmt.Errorf("invalid cursor: limit mismatch")
}
if !slices.Equal(order, idxCursor.Order) {
return nil, "", fmt.Errorf("invalid cursor: order mismatch")
}
}

parsedQuery, err := ParseQueryString(query)
if err != nil {
return nil, err
return nil, "", err
}

searchReq := bluge.NewTopNSearch(limit, parsedQuery)
searchReq := bluge.NewTopNSearch(limit+1, parsedQuery)

if len(order) != 0 {
searchReq.SortBy(order)
}

if idxCursor != nil {
searchReq.SetFrom(idxCursor.Offset)
}

indexReader, err := idx.Index.Reader()
if err != nil {
return nil, err
return nil, "", err
}

results, err := indexReader.Search(ctx, searchReq)
if err != nil {
return nil, err
return nil, "", err
}

indexResults, err := si.queryMatchesToStorageIndexResults(results)
if err != nil {
return nil, err
return nil, "", err
}

var newCursor string
if len(indexResults) > limit {
indexResults = indexResults[:len(indexResults)-1]
offset := 0
if idxCursor != nil {
offset = idxCursor.Offset
}
newIdxCursor := &indexListCursor{
Query: query,
Offset: offset + limit,
Limit: limit,
Order: order,
}
cursorBuf := new(bytes.Buffer)
if err := gob.NewEncoder(cursorBuf).Encode(newIdxCursor); err != nil {
si.logger.Error("Failed to create new cursor.", zap.Error(err))
return nil, "", err
}
newCursor = base64.RawURLEncoding.EncodeToString(cursorBuf.Bytes())
}

if len(indexResults) == 0 {
return &api.StorageObjects{Objects: []*api.StorageObject{}}, nil
return &api.StorageObjects{Objects: []*api.StorageObject{}}, "", nil
}

if !si.config.DisableIndexOnly && idx.IndexOnly {
Expand All @@ -282,7 +342,7 @@ func (si *LocalStorageIndex) List(ctx context.Context, callerID uuid.UUID, index
})
}

return &api.StorageObjects{Objects: objects}, nil
return &api.StorageObjects{Objects: objects}, newCursor, nil
}

storageReads := make([]*api.ReadStorageObjectId, 0, len(indexResults))
Expand All @@ -296,7 +356,7 @@ func (si *LocalStorageIndex) List(ctx context.Context, callerID uuid.UUID, index

objects, err := StorageReadObjects(ctx, si.logger, si.db, callerID, storageReads)
if err != nil {
return nil, err
return nil, "", err
}

// Sort the objects read from the db according to the results from the index as StorageReadObjects does not guarantee ordering of the results
Expand All @@ -315,7 +375,7 @@ func (si *LocalStorageIndex) List(ctx context.Context, callerID uuid.UUID, index

objects.Objects = sortedObjects

return objects, nil
return objects, newCursor, nil
}

func (si *LocalStorageIndex) Load(ctx context.Context) error {
Expand Down
Loading

0 comments on commit 8b0e9b9

Please sign in to comment.