From a584fb713db30ec5056e6d45d1435ad38025b556 Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Fri, 27 Sep 2024 19:08:40 +0530 Subject: [PATCH] chore: sync tables without acquiring read lock the whole time (#14179) --- pkg/storage/store_test.go | 1 + .../indexshipper/downloads/index_set.go | 5 ++++ .../shipper/indexshipper/downloads/table.go | 18 +++++++++++-- .../indexshipper/downloads/table_manager.go | 25 ++++++++++++++++--- .../shipper/indexshipper/downloads/util.go | 9 +++++++ 5 files changed, 53 insertions(+), 5 deletions(-) diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index c509783d8661f..b1493089750a9 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -1894,6 +1894,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) { // recreate the store because boltdb-shipper now runs queriers on snapshots which are created every 1 min and during startup. store.Stop() + ResetBoltDBIndexClientsWithShipper() // there should be 2 index tables in the object storage indexTables, err := os.ReadDir(filepath.Join(cfg.FSConfig.Directory, "index")) diff --git a/pkg/storage/stores/shipper/indexshipper/downloads/index_set.go b/pkg/storage/stores/shipper/indexshipper/downloads/index_set.go index 8edd121071c5e..971dcb0fb65b3 100644 --- a/pkg/storage/stores/shipper/indexshipper/downloads/index_set.go +++ b/pkg/storage/stores/shipper/indexshipper/downloads/index_set.go @@ -283,6 +283,11 @@ func (t *indexSet) cleanupDB(fileName string) error { } func (t *indexSet) Sync(ctx context.Context) (err error) { + if !t.indexMtx.isReady() { + level.Info(t.logger).Log("msg", "skip sync since the index set is not ready") + return nil + } + return t.syncWithRetry(ctx, true, false) } diff --git a/pkg/storage/stores/shipper/indexshipper/downloads/table.go b/pkg/storage/stores/shipper/indexshipper/downloads/table.go index 1bae83c51e0e9..5b9f29c3a0c18 100644 --- a/pkg/storage/stores/shipper/indexshipper/downloads/table.go +++ b/pkg/storage/stores/shipper/indexshipper/downloads/table.go @@ -13,6 +13,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" "github.com/pkg/errors" + "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" "github.com/grafana/loki/v3/pkg/storage/chunk/client/util" @@ -271,9 +272,22 @@ func (t *table) Sync(ctx context.Context) error { level.Debug(t.logger).Log("msg", fmt.Sprintf("syncing files for table %s", t.name)) t.indexSetsMtx.RLock() - defer t.indexSetsMtx.RUnlock() + users := maps.Keys(t.indexSets) + t.indexSetsMtx.RUnlock() + + for _, userID := range users { + if err := ctx.Err(); err != nil { + return err + } + + t.indexSetsMtx.RLock() + indexSet, ok := t.indexSets[userID] + t.indexSetsMtx.RUnlock() + + if !ok { + continue + } - for userID, indexSet := range t.indexSets { if err := indexSet.Sync(ctx); err != nil { return errors.Wrap(err, fmt.Sprintf("failed to sync index set %s for table %s", userID, t.name)) } diff --git a/pkg/storage/stores/shipper/indexshipper/downloads/table_manager.go b/pkg/storage/stores/shipper/indexshipper/downloads/table_manager.go index 6b69272593784..3b4bc4bfb3fce 100644 --- a/pkg/storage/stores/shipper/indexshipper/downloads/table_manager.go +++ b/pkg/storage/stores/shipper/indexshipper/downloads/table_manager.go @@ -14,6 +14,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "golang.org/x/exp/maps" "github.com/grafana/loki/v3/pkg/compactor/deletion" "github.com/grafana/loki/v3/pkg/storage/chunk/client/util" @@ -180,6 +181,10 @@ func (tm *tableManager) ForEach(ctx context.Context, tableName, userID string, c } func (tm *tableManager) getOrCreateTable(tableName string) (Table, error) { + if tm.ctx.Err() != nil { + return nil, errors.New("table manager is stopping") + } + // if table is already there, use it. start := time.Now() tm.tablesMtx.RLock() @@ -214,7 +219,8 @@ func (tm *tableManager) getOrCreateTable(tableName string) (Table, error) { func (tm *tableManager) syncTables(ctx context.Context) error { tm.tablesMtx.RLock() - defer tm.tablesMtx.RUnlock() + tables := maps.Keys(tm.tables) + tm.tablesMtx.RUnlock() start := time.Now() var err error @@ -231,11 +237,24 @@ func (tm *tableManager) syncTables(ctx context.Context) error { level.Info(tm.logger).Log("msg", "syncing tables") - for name, table := range tm.tables { + for _, name := range tables { + if err := ctx.Err(); err != nil { + return err + } + level.Debug(tm.logger).Log("msg", "syncing table", "table", name) start := time.Now() + + tm.tablesMtx.RLock() + table, ok := tm.tables[name] + tm.tablesMtx.RUnlock() + + if !ok { + continue + } + err := table.Sync(ctx) - duration := float64(time.Since(start)) + duration := time.Since(start).Seconds() if err != nil { tm.metrics.tableSyncLatency.WithLabelValues(name, statusFailure).Observe(duration) return errors.Wrapf(err, "failed to sync table '%s'", name) diff --git a/pkg/storage/stores/shipper/indexshipper/downloads/util.go b/pkg/storage/stores/shipper/indexshipper/downloads/util.go index 457f76b3433d7..4c5fcfee1674d 100644 --- a/pkg/storage/stores/shipper/indexshipper/downloads/util.go +++ b/pkg/storage/stores/shipper/indexshipper/downloads/util.go @@ -23,6 +23,15 @@ func (m *mtxWithReadiness) markReady() { close(m.ready) } +func (m *mtxWithReadiness) isReady() bool { + select { + case <-m.ready: + return true + default: + return false + } +} + func (m *mtxWithReadiness) awaitReady(ctx context.Context) error { ctx, cancel := context.WithTimeoutCause(ctx, 30*time.Second, errors.New("exceeded 30 seconds in awaitReady")) defer cancel()