Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create index method #125

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,44 @@ func (db *MemDB) Snapshot() *MemDB {
return clone
}

// CreateIndexes is used for create indexes after the database already exists.
func (db *MemDB) CreateIndexes(table string, schema ...*IndexSchema) error {
// lock the writer but create the transaction afterwards
db.writer.Lock()

// create a edge for each index to create
root := db.getRoot()
for _, indexSchema := range schema {
index := iradix.New()
path := indexPath(table, indexSchema.Name)
root, _, _ = root.Insert(path, index)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why the new indexes are added to the root tree outside of the transaction below? If there are 10 indexes for example this would be 10 separate root tree transactions. I guess there is no way these insertions can "fail" so it's not really a correctness issue but leaves me wondering if there is some subtle reason I don't understand for it to be this way instead of creating the transaction above and using it?

}
db.root = unsafe.Pointer(root)

// now we can create the transaction
transaction := &Txn{
db: db,
write: true,
rootTxn: db.getRoot().Txn(),
}
defer transaction.Abort()

// create the requested indexes
if err := transaction.CreateIndexes(table, schema...); err != nil {
return err
}

transaction.Commit()

// copy the new created indexes into our schema
tableSchema := db.schema.Tables[table]
for _, indexSchema := range schema {
tableSchema.Indexes[indexSchema.Name] = indexSchema
}
Comment on lines +128 to +132
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm this seems like a race footgun that we should at least document in the doc comment for this method.

I think it's true that today we never expect db.schema to change after initialization therefore readers will read from it without taking the db.writer.Lock which would be a data race if there were any readers concurrent with this call.

I think that's probably OK for it's intended usage and adding a lock to every read would be too invasive and impact performance for all users, but we should at least point out the requirement in the doc comments that this MUST NOT be called concurrently with any reads. For it's intended purpose that would probably be a logical bug anyway since the indexes the readers presumably need wouldn't exist yet but it might not be obvious to other library users!


return nil
}

// initialize is used to setup the DB for use after creation. This should
// be called only once after allocating a MemDB.
func (db *MemDB) initialize() error {
Expand Down
36 changes: 36 additions & 0 deletions memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,39 @@ func TestMemDB_Snapshot(t *testing.T) {
t.Fatalf("should exist")
}
}

func TestMemDB_CreateIndexes(t *testing.T) {
db, err := NewMemDB(testValidSchema())
if err != nil {
t.Fatalf("err: %v", err)
}

// Add an object
obj := testObj()
txn := db.Txn(true)
txn.Insert("main", obj)
txn.Commit()

err = db.CreateIndexes("main", &IndexSchema{
Name: "FooIndex",
AllowMissing: true,
Unique: false,
Indexer: &StringFieldIndex{
Field: "Foo",
Lowercase: false,
},
})
if err != nil {
t.Fatalf("err: %v", err)
}

idxIter, _, err := db.Txn(false).getIndexIterator("main", "foo")
if err != nil {
t.Fatalf("err: %v", err)
}

iter := &radixIterator{iter: idxIter}
if iter.Next() == nil {
t.Fatalf("next should exist")
}
}
135 changes: 101 additions & 34 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,60 @@ func (txn *Txn) Commit() {
}
}

// CreateIndexes creates the given indexes for each value of the root tree
func (txn *Txn) CreateIndexes(table string, indexes ...*IndexSchema) error {
if !txn.write {
return fmt.Errorf("cannot create index in read-only transaction")
}

// Get the table schema
tableSchema, ok := txn.db.schema.Tables[table]
if !ok {
return fmt.Errorf("invalid table '%s'", table)
}

// and the id indexer
idSchema := tableSchema.Indexes[id]
idIndexer := idSchema.Indexer.(SingleIndexer)

// create iterator over all entries
indexIterator, _, err := txn.getIndexIterator(table, id)
if err != nil {
return err
}
iter := &radixIterator{iter: indexIterator}

// create all indexes for each index entry
for next := iter.Next(); next != nil; next = iter.Next() {

// Get the primary ID of the object
ok, idVal, err := idIndexer.FromObject(next)
if err != nil {
return fmt.Errorf("failed to build primary index: %v", err)
}
if !ok {
return fmt.Errorf("object missing primary index")
}

for _, indexSchema := range indexes {
indexTxn := txn.writableIndex(table, indexSchema.Name)

var vals [][]byte
vals, err = txn.createIndexValue(indexSchema, idVal, next)
if err != nil {
return err
}

// Update the value of the index
for _, val := range vals {
indexTxn.Insert(val, next)
}
}
Comment on lines +201 to +223
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you factored out txn.createIndexValue from Insert rather than copying it which makes sense, but unless I'm missing something, isn't the whole of this block identical to the Insert code? Could we factor out the whole thing? Or even just call txn.Insert directly? I guess that does slightly more work by replacing the ID index with existing values too which is a no-op but wasted effort.

I'd be interested to know if you measure significnatly worse times from just calling Insert for your use-case. The benefit is reduced duplication and no change to the current code path every other caller of this library is using. The change is straightforward so it's not high risk, but it's more than no change!

}

return nil
}

// Insert is used to add or update an object into the given table.
//
// When updating an object, the obj provided should be a copy rather
Expand Down Expand Up @@ -209,31 +263,10 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
for name, indexSchema := range tableSchema.Indexes {
indexTxn := txn.writableIndex(table, name)

// Determine the new index value
var (
ok bool
vals [][]byte
err error
)
switch indexer := indexSchema.Indexer.(type) {
case SingleIndexer:
var val []byte
ok, val, err = indexer.FromObject(obj)
vals = [][]byte{val}
case MultiIndexer:
ok, vals, err = indexer.FromObject(obj)
}
var vals [][]byte
vals, err = txn.createIndexValue(indexSchema, idVal, obj)
if err != nil {
return fmt.Errorf("failed to build index '%s': %v", name, err)
}

// Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must
// be unique anyways.
if ok && !indexSchema.Unique {
for i := range vals {
vals[i] = append(vals[i], idVal...)
}
return err
}

// Handle the update by deleting from the index first
Expand Down Expand Up @@ -273,21 +306,12 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
}
}

// If there is no index value, either this is an error or an expected
// case and we can skip updating
if !ok {
if indexSchema.AllowMissing {
continue
} else {
return fmt.Errorf("missing value for index '%s'", name)
}
}

// Update the value of the index
for _, val := range vals {
indexTxn.Insert(val, obj)
}
}

if txn.changes != nil {
txn.changes = append(txn.changes, Change{
Table: table,
Expand All @@ -296,6 +320,7 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
primaryKey: idVal,
})
}

return nil
}

Expand Down Expand Up @@ -650,6 +675,48 @@ func (txn *Txn) LongestPrefix(table, index string, args ...interface{}) (interfa
return nil, nil
}

func (txn *Txn) createIndexValue(indexSchema *IndexSchema, idVal []byte, obj interface{}) ([][]byte, error) {
// Determine the new index value
var (
ok bool
vals [][]byte
err error
)

switch indexer := indexSchema.Indexer.(type) {
case SingleIndexer:
var val []byte
ok, val, err = indexer.FromObject(obj)
vals = [][]byte{val}
case MultiIndexer:
ok, vals, err = indexer.FromObject(obj)
}
if err != nil {
return nil, fmt.Errorf("failed to build index '%s': %v", indexSchema.Name, err)
}

// Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must
// be unique anyways.
if ok && !indexSchema.Unique {
for i := range vals {
vals[i] = append(vals[i], idVal...)
}
}

// If there is no index value, either this is an error or an expected
// case and we can skip updating
if !ok {
if indexSchema.AllowMissing {
return nil, nil
} else {
return nil, fmt.Errorf("missing value for index '%s'", indexSchema.Name)
}
}

return vals, nil
}

// getIndexValue is used to get the IndexSchema and the value
// used to scan the index given the parameters. This handles prefix based
// scans when the index has the "_prefix" suffix. The index must support
Expand Down