Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose shard information at session API #200

Open
dkropachev opened this issue Jun 28, 2024 · 0 comments
Open

Expose shard information at session API #200

dkropachev opened this issue Jun 28, 2024 · 0 comments

Comments

@dkropachev
Copy link
Collaborator

dkropachev commented Jun 28, 2024

This is idea from PR.

Proposal

There are two levels on what we can do

  1. Expose shard info by implementing following API:
func (s *Session) GetShardAwareRoutingInfo(table string, colums []string, values ...interface{}) (ShardAwareRoutingInfo, error)

type ShardAwareRoutingInfo struct {
	// RoutingKey - is bytes of primary key
	RoutingKey []byte
	// Host - is node to connect (HostAware policy)
	Host *HostInfo
	// Shard - is shard ID of node to connect (ShardAware policy)
	Shard int
}
  1. Have some option to do this optimization automatically when it makes sense and/or possible.

Implementation details

To make it properly work for tablets we will need to pull tablets info from system.tablet beforehand.

Pseudo code example

Borrowed from the same PR:

func routeRangeSelectToProperShards() {
	const shardsAbout = 100 // node * (cpu-1)
	// Split []T by chunks
	var (
		queryBatches = make(map[string][]T, shardsAbout) // []T grouped by chunks
		routingKeys  = make(map[string][]byte, shardsAbout) // routing key for query
	)
	for _, pk := range pks {
		var (
			shardID string
			routingKey []byte
		)
		// We receive information about the routing of our keys.
		// In this example, PRIMARY KEY consists of one column pk_column_name.
		info, err := session.GetShardAwareRoutingInfo(keyspaceName, tableName, []string{"pk_column_name"}, pk)
		if err != nil || info.Host == nil {
			// We may not get routing information for various reasons (change shema topology, etc).
			// It is important to understand the reason when testing (for example, you are not using tokenAwarePolicy)
			log.Printf("can't get shard id of pk '%d': %v", pk, err)
		} else {
			// build key: host + "/" + vShard (127.0.0.1/1)
			shardID = info.Host.Hostname() + "/" + strconv.Itoa(info.Shard)
			routingKey = info.RoutingKey
		}
		// Put key to corresponding batch
		batch := queryBatches[shardID]
		if batch == nil {
			batch = make([]int64, 0, len(pks)/shardsAbout)
		}
		batch = append(batch, pk)
		queryBatches[shardID] = batch
		routingKeys[shardID] = rk
	}
	const query = "SELECT * FROM table_name WHERE pk IN (?)"
	var wg sync.WaitGroup
	// we go through all the batches to execute queries in parallel
	for shard, batch := range batches {
		// We divide large batches into smaller chunks, since large batches in SELECT queries have a bad effect on RT scylla
		for _, chunk := range slices.ChunkSlice(batch, 10) { // slices.ChunkSlice some function that splits slice by N slices of M or less lenght (in our example M=10)
			wg.Add(1)
			go func(shard string, chunk []int64) {
				defer wg.Done()
				rk := keys[shard] // get our routing key
				scanner := r.session.Query(query, chunk).RoutingKey(rk).Iter().Scanner() // use RoutingKey
				for scanner.Next() {
					// ...
				}
				if err := scanner.Err(); err != nil {
					// ...
				}
			}(shard, chunk)
		}
	}
	// wait for all answers
	wg.Wait()
	// NOTE: this is not the most optimal strategy 'cause we're waiting for all queries done.
	// If at least one query has long response time it will affects on the response time of our method. (RT our method = max RT of queries)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants