Skip to content

Commit

Permalink
Merge pull request #1110 from tigrisdata/main
Browse files Browse the repository at this point in the history
Beta release
  • Loading branch information
efirs authored Apr 27, 2023
2 parents ced543a + f85ed8f commit c1e7c00
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 53 deletions.
20 changes: 20 additions & 0 deletions schema/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,26 @@ func (i *Index) IsCompatible(i1 *Index) error {
return nil
}

func HasIndex(indexes []*Index, idx *Index) bool {
for _, index := range indexes {
if index.Name == idx.Name {
return true
}
}

return false
}

func FindIndex(indexes []*Index, name string) *Index {
for _, idx := range indexes {
if idx.Name == name {
return idx
}
}

return nil
}

type FieldBuilder struct {
FieldName string
Description string `json:"description,omitempty"`
Expand Down
32 changes: 7 additions & 25 deletions server/metadata/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,24 +79,20 @@ func (c *CollectionSubspace) Create(ctx context.Context, tx transaction.Tx, nsID
func (c *CollectionSubspace) updateMetadataIndexes(ctx context.Context, tx transaction.Tx, nsID uint32, dbID uint32, name string, id uint32, metadata *CollectionMetadata, updatedIndexes []*schema.Index,
) error {
for _, updateIdx := range updatedIndexes {
if !hasIndex(metadata.Indexes, updateIdx) {
existingIdx := schema.FindIndex(metadata.Indexes, updateIdx.Name)
if existingIdx != nil {
if updateIdx.State == schema.UNKNOWN {
updateIdx.State = existingIdx.State
}
} else {
updateIdx.State = schema.INDEX_WRITE_MODE
if err := c.createBuildIndexTask(ctx, tx, nsID, dbID, name, id, updateIdx); err != nil {
return err
}
metadata.Indexes = append(metadata.Indexes, updateIdx)
}
}

for _, existing := range metadata.Indexes {
if !hasIndex(updatedIndexes, existing) {
existing.State = schema.INDEX_DELETED
if err := c.createDeleteIndexTask(ctx, tx, nsID, dbID, name, id, existing); err != nil {
return err
}
}
}

metadata.Indexes = updatedIndexes
return nil
}

Expand Down Expand Up @@ -129,10 +125,6 @@ func (c *CollectionSubspace) createBuildIndexTask(ctx context.Context, tx transa
return nil
}

func (c *CollectionSubspace) createDeleteIndexTask(ctx context.Context, tx transaction.Tx, nsID uint32, dbID uint32, name string, id uint32, index *schema.Index) error {
return nil
}

func (c *CollectionSubspace) insert(ctx context.Context, tx transaction.Tx, nsID uint32, dbID uint32, name string,
metadata *CollectionMetadata,
) error {
Expand Down Expand Up @@ -248,13 +240,3 @@ func (c *CollectionSubspace) list(ctx context.Context, tx transaction.Tx, namesp

return collections, nil
}

func hasIndex(indexes []*schema.Index, idx *schema.Index) bool {
for _, index := range indexes {
if index.Name == idx.Name {
return true
}
}

return false
}
4 changes: 2 additions & 2 deletions server/metadata/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ func TestCollectionWithIndexes(t *testing.T) {

updatedMeta, err := c.Update(ctx, tx, 1, 1, "name5", 1, idxsUpdated)
require.NoError(t, err)
require.Len(t, updatedMeta.Indexes, 2)
require.Equal(t, updatedMeta.Indexes[1].State, schema.INDEX_DELETED)
require.Len(t, updatedMeta.Indexes, 1)
require.Equal(t, updatedMeta.Indexes[0].State, schema.INDEX_ACTIVE)
})

t.Run("list", func(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions server/metadata/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -1484,6 +1484,15 @@ func (tenant *Tenant) UpdateCollectionIndexes(ctx context.Context, tx transactio
return nil
}

func (tenant *Tenant) GetCollectionMetadata(ctx context.Context, tx transaction.Tx, db *Database, collectionName string) (*CollectionMetadata, error) {
metadata, err := tenant.metaStore.Collection().Get(ctx, tx, tenant.namespace.Id(), db.id, collectionName)
if err != nil {
return nil, err
}

return metadata, nil
}

// DropCollection is to drop a collection and its associated indexes. It removes the "created" entry from the encoding
// subspace and adds a "dropped" entry for the same collection key.
func (tenant *Tenant) DropCollection(ctx context.Context, tx transaction.Tx, db *Database, collectionName string) error {
Expand Down
22 changes: 22 additions & 0 deletions server/services/v1/database/collection_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,13 @@ func (runner *CollectionQueryRunner) createOrUpdate(ctx context.Context, tx tran
return Response{}, ctx, err
}

var oldMetadata *metadata.CollectionMetadata
if db.GetCollection(req.GetCollection()) == nil {
collectionExists = true
oldMetadata, err = tenant.GetCollectionMetadata(ctx, tx, db, req.GetCollection())
if err != nil && err != errors.ErrNotFound {
return Response{}, ctx, createApiError(err)
}
}

if !collectionExists && req.OnlyCreate {
Expand Down Expand Up @@ -136,6 +141,23 @@ func (runner *CollectionQueryRunner) createOrUpdate(ctx context.Context, tx tran
}
if collectionExists {
countDDLCreateUnit(ctx)

if config.DefaultConfig.SecondaryIndex.WriteEnabled && oldMetadata != nil {
updatedMetadata, err := tenant.GetCollectionMetadata(ctx, tx, db, req.GetCollection())
if err != nil {
return Response{}, nil, createApiError(err)
}

indexer := NewSecondaryIndexer(db.GetCollection(req.GetCollection()))

for _, oldIndex := range oldMetadata.Indexes {
if !schema.HasIndex(updatedMetadata.Indexes, oldIndex) {
if err = indexer.DeleteIndex(ctx, tx, oldIndex); err != nil {
return Response{}, nil, createApiError(err)
}
}
}
}
} else {
countDDLUpdateUnit(ctx, true)
}
Expand Down
8 changes: 8 additions & 0 deletions server/services/v1/database/secondary_index_measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ func (m *secondaryIndexerWithMetrics) Update(ctx context.Context, tx transaction
return
}

func (m *secondaryIndexerWithMetrics) DeleteIndex(ctx context.Context, tx transaction.Tx, index *schema.Index) (err error) {
m.measure(ctx, "DeleteIndex", func(ctx context.Context) error {
err = m.q.DeleteIndex(ctx, tx, index)
return err
})
return
}

type secondaryIndexReaderWithMetrics struct {
reader *SecondaryIndexReaderImpl
}
Expand Down
8 changes: 8 additions & 0 deletions server/services/v1/database/secondary_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type SecondaryIndexer interface {
Index(ctx context.Context, tx transaction.Tx, td *internal.TableData, primaryKey []interface{}) error
// Update an existing document in the secondary index
Update(ctx context.Context, tx transaction.Tx, newTd *internal.TableData, oldTd *internal.TableData, primaryKey []interface{}) error
// Delete the KVS for an index
DeleteIndex(ctx context.Context, tx transaction.Tx, index *schema.Index) error
}

type IndexRow struct {
Expand Down Expand Up @@ -245,6 +247,12 @@ func shouldRetryBulkIndex(err error) bool {
return false
}

func (q *SecondaryIndexerImpl) DeleteIndex(ctx context.Context, tx transaction.Tx, index *schema.Index) error {
indexKey := keys.NewKey(q.coll.EncodedTableIndexName, q.coll.SecondaryIndexKeyword(), KVSubspace, index.Name)

return tx.Delete(ctx, indexKey)
}

func (q *SecondaryIndexerImpl) scanIndex(ctx context.Context, tx transaction.Tx) (kv.Iterator, error) {
start := keys.NewKey(q.coll.EncodedTableIndexName, q.coll.SecondaryIndexKeyword(), KVSubspace)
end := keys.NewKey(q.coll.EncodedTableIndexName, q.coll.SecondaryIndexKeyword(), KVSubspace, 0xFF)
Expand Down
59 changes: 59 additions & 0 deletions server/services/v1/database/secondary_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,65 @@ func TestIndexingStoreAndGetSimpleKVsforDoc(t *testing.T) {
assert.Equal(t, 8, count)
assert.NoError(t, tx.Commit(ctx))
})

t.Run("delete index", func(t *testing.T) {
coll := indexStore.coll
_ = kvStore.DropTable(ctx, coll.EncodedTableIndexName)
tx, err := tm.StartTx(ctx)
assert.NoError(t, err)

info, err := indexStore.IndexInfo(ctx, tx)
assert.NoError(t, err)
assert.Equal(t, int64(0), info.Rows)
assert.Equal(t, info.Size, int64(0))

for i := 0; i < 5; i++ {
td, pk := createDoc(`{"id":1, "double_f":2,"created":"2023-01-16T12:55:17.304154Z","updated": "2023-01-16T12:55:17.304154Z", "arr":[1]}`, []interface{}{i}...)
err = indexStore.Index(ctx, tx, td, pk)
assert.NoError(t, err)
}

info, err = indexStore.IndexInfo(ctx, tx)
assert.NoError(t, err)
assert.Equal(t, int64(35), info.Rows)
err = tx.Commit(ctx)
assert.NoError(t, err)

tx, err = tm.StartTx(ctx)
assert.NoError(t, err)
indexMeta := schema.Index{
Id: 1,
Name: "double_f",
Fields: []*schema.Field{
{
FieldName: "double_f",
},
},
}
assert.NoError(t, indexStore.DeleteIndex(ctx, tx, &indexMeta))

err = tx.Commit(ctx)
assert.NoError(t, err)

tx, err = tm.StartTx(ctx)
assert.NoError(t, err)

info, err = indexStore.IndexInfo(ctx, tx)
assert.NoError(t, err)
assert.Equal(t, int64(30), info.Rows)

iter, err := indexStore.scanIndex(ctx, tx)
assert.NoError(t, err)

count := 0
var row kv.KeyValue
for iter.Next(&row) {
count += 1
}
assert.NoError(t, err)
assert.Nil(t, iter.Err())
assert.Equal(t, 30, count)
})
}

func TestBulkIndexing(t *testing.T) {
Expand Down
Loading

0 comments on commit c1e7c00

Please sign in to comment.