Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Broadcast min_commit_ts for pipelined transactions #1458

Merged
merged 24 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
75892ad
feat: periodic updates of min_commit_ts and broadcast
ekexium Sep 4, 2024
5588fdb
feat: periodic updates of full store list
ekexium Sep 5, 2024
294e614
test: unit test for minCommitTsManager
ekexium Sep 5, 2024
516aa22
fix: set cluster id for broadcast requests
ekexium Sep 5, 2024
fba423c
fix: set resource group related context
ekexium Sep 6, 2024
5adc6c9
limit the update to non-async-commit and non-1pc txns
ekexium Sep 11, 2024
61545b0
broadcast txn status when commit and rollback
ekexium Sep 18, 2024
f3909ea
rename minCommiTS to minCommitTSMgr; refactor broadcastToAllStores
ekexium Sep 19, 2024
ad0302e
fix test
ekexium Sep 19, 2024
d2213c5
Merge branch 'master' of github.com:tikv/client-go into feat-resolved…
ekexium Sep 19, 2024
a98a443
Update txnkv/transaction/2pc.go
ekexium Sep 19, 2024
7bad9fd
refactor broadcastToAllStores to use store goroutine pool
ekexium Sep 19, 2024
4f23a9e
set refreshStoreListInterval to 10s
ekexium Sep 19, 2024
1a9447b
fix: change a read lock to write lock
ekexium Sep 20, 2024
e611350
Apply suggestions from code review
ekexium Sep 23, 2024
f165547
fix mistaken renaming
ekexium Sep 23, 2024
ecb33d5
Update txnkv/transaction/pipelined_flush.go
ekexium Sep 23, 2024
9de2ac9
fix rpc code
ekexium Sep 23, 2024
5fece2a
Merge branch 'feat-resolved-ts-for-large-txn' of github.com:ekexium/c…
ekexium Sep 23, 2024
ca93ef6
comment the check of return values of GetAllStores
ekexium Sep 24, 2024
4a9b779
update kvproto
ekexium Sep 24, 2024
4aada7a
apply suggestions from review
ekexium Sep 24, 2024
012425a
Merge branch 'master' of github.com:tikv/client-go into feat-resolved…
ekexium Sep 24, 2024
ea8180b
update integration_tests dependency
ekexium Sep 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,5 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/pingcap/kvproto => github.com/ekexium/kvproto v0.0.0-20240918103128-cc0810e58b37
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/ekexium/kvproto v0.0.0-20240918103128-cc0810e58b37 h1:wwoMiZm/cK9466IX3QUPgogxH+HsTuAVAouHLlBJLh0=
github.com/ekexium/kvproto v0.0.0-20240918103128-cc0810e58b37/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down Expand Up @@ -74,8 +76,6 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
40 changes: 40 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,9 +726,48 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
// cache GC is incompatible with cache refresh
c.bg.schedule(c.gcRoundFunc(cleanRegionNumPerRound), cleanCacheInterval)
}
c.bg.schedule(
func(ctx context.Context, _ time.Time) bool {
refreshFullStoreList(ctx, c.PDClient(), c.stores)
return false
}, refreshStoreListInterval,
)
return c
}

// Try to refresh full store list. Errors are ignored.
func refreshFullStoreList(ctx context.Context, pdClient pd.Client, stores storeCache) {
storeList, err := pdClient.GetAllStores(ctx)
ekexium marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logutil.Logger(ctx).Info("refresh full store list failed", zap.Error(err))
return
}
for _, store := range storeList {
_, exist := stores.get(store.GetId())
if exist {
continue
}
s := stores.getOrInsertDefault(store.GetId())
// GetAllStores is supposed to return only Up and Offline stores.
// This check is being defensive and to make it consistent with store resolve code.
if store == nil || store.GetState() == metapb.StoreState_Tombstone {
s.setResolveState(tombstone)
continue
}
addr := store.GetAddress()
if addr == "" {
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to explain in the comments when the pdClient.GetAllStores(ctx) may return a store list containg nil store and emptry address and how should they be treated. For example, dose it mean the returned store list is invalid and pdClient.GetAllStores(ctx) should be retried when there invalid element in the list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that GetAllStores is supposed to return only stores in Up and Offline states. This additional check serves as a defensive measure and ensures consistency with the store resolve procedure

}
ekexium marked this conversation as resolved.
Show resolved Hide resolved
// TODO: maybe refactor this, together with other places initializing Store
s.addr = addr
s.peerAddr = store.GetPeerAddress()
s.saddr = store.GetStatusAddress()
s.storeType = tikvrpc.GetStoreTypeByMeta(store)
s.labels = store.GetLabels()
s.changeResolveStateTo(unresolved, resolved)
}
}

// only used fot test.
func newTestRegionCache() *RegionCache {
c := &RegionCache{}
Expand Down Expand Up @@ -2649,6 +2688,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV

const cleanCacheInterval = time.Second
const cleanRegionNumPerRound = 50
const refreshStoreListInterval = 10 * time.Second

// gcScanItemHook is only used for testing
var gcScanItemHook = new(atomic.Pointer[func(*btreeItem)])
Expand Down
4 changes: 4 additions & 0 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,10 @@ func (s *mockTikvGrpcServer) GetHealthFeedback(ctx context.Context, request *kvr
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) BroadcastTxnStatus(ctx context.Context, request *kvrpcpb.BroadcastTxnStatusRequest) (*kvrpcpb.BroadcastTxnStatusResponse, error) {
return nil, errors.New("unreachable")
}

func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled() {
// prepare a mock tikv grpc server
addr := "localhost:56341"
Expand Down
4 changes: 2 additions & 2 deletions tikv/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ const unsafeDestroyRangeTimeout = 5 * time.Minute
// multiple times on an single range.
func (s *KVStore) UnsafeDestroyRange(ctx context.Context, startKey []byte, endKey []byte) error {
// Get all stores every time deleting a region. So the store list is less probably to be stale.
stores, err := s.listStoresForUnsafeDestory(ctx)
stores, err := s.listStoresForUnsafeDestroy(ctx)
if err != nil {
metrics.TiKVUnsafeDestroyRangeFailuresCounterVec.WithLabelValues("get_stores").Inc()
return err
Expand Down Expand Up @@ -366,7 +366,7 @@ func (s *KVStore) UnsafeDestroyRange(ctx context.Context, startKey []byte, endKe
return nil
}

func (s *KVStore) listStoresForUnsafeDestory(ctx context.Context) ([]*metapb.Store, error) {
func (s *KVStore) listStoresForUnsafeDestroy(ctx context.Context) ([]*metapb.Store, error) {
stores, err := s.pdClient.GetAllStores(ctx)
if err != nil {
return nil, errors.WithStack(err)
Expand Down
1 change: 0 additions & 1 deletion tikv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ type testKVSuite struct {
func (s *testKVSuite) SetupTest() {
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
s.Require().Nil(err)
testutils.BootstrapWithSingleStore(cluster)
s.setGetMinResolvedTSByStoresIDs(func(ctx context.Context, ids []uint64) (uint64, map[uint64]uint64, error) {
return 0, nil, nil
})
Expand Down
13 changes: 13 additions & 0 deletions tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ const (
CmdLockWaitInfo

CmdGetHealthFeedback
CmdBroadcastTxnStatus

CmdCop CmdType = 512 + iota
CmdCopStream
Expand Down Expand Up @@ -221,6 +222,8 @@ func (t CmdType) String() string {
return "LockWaitInfo"
case CmdGetHealthFeedback:
return "GetHealthFeedback"
case CmdBroadcastTxnStatus:
return "BroadcastTxnStatus"
case CmdFlashbackToVersion:
return "FlashbackToVersion"
case CmdPrepareFlashbackToVersion:
Expand Down Expand Up @@ -568,6 +571,10 @@ func (req *Request) GetHealthFeedback() *kvrpcpb.GetHealthFeedbackRequest {
return req.Req.(*kvrpcpb.GetHealthFeedbackRequest)
}

func (req *Request) BroadcastTxnStatus() *kvrpcpb.BroadcastTxnStatusRequest {
return req.Req.(*kvrpcpb.BroadcastTxnStatusRequest)
}

// FlashbackToVersion returns FlashbackToVersionRequest in request.
func (req *Request) FlashbackToVersion() *kvrpcpb.FlashbackToVersionRequest {
return req.Req.(*kvrpcpb.FlashbackToVersionRequest)
Expand Down Expand Up @@ -653,6 +660,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_BufferBatchGet{BufferBatchGet: req.BufferBatchGet()}}
case CmdGetHealthFeedback:
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_GetHealthFeedback{GetHealthFeedback: req.GetHealthFeedback()}}
case CmdBroadcastTxnStatus:
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_BroadcastTxnStatus{BroadcastTxnStatus: req.BroadcastTxnStatus()}}
}
return nil
}
Expand Down Expand Up @@ -730,6 +739,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) (*Res
return &Response{Resp: res.BufferBatchGet}, nil
case *tikvpb.BatchCommandsResponse_Response_GetHealthFeedback:
return &Response{Resp: res.GetHealthFeedback}, nil
case *tikvpb.BatchCommandsResponse_Response_BroadcastTxnStatus:
return &Response{Resp: res.BroadcastTxnStatus}, nil
}
panic("unreachable")
}
Expand Down Expand Up @@ -1143,6 +1154,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp
resp.Resp, err = client.KvBufferBatchGet(ctx, req.BufferBatchGet())
case CmdGetHealthFeedback:
resp.Resp, err = client.GetHealthFeedback(ctx, req.GetHealthFeedback())
case CmdBroadcastTxnStatus:
resp.Resp, err = client.BroadcastTxnStatus(ctx, req.BroadcastTxnStatus())
default:
return nil, errors.Errorf("invalid request type: %v", req.Type)
}
Expand Down
Loading
Loading