Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync ForT0 balances for T1 refs who fitting T0's ref limit #182

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1
github.com/ice-blockchain/eskimo v1.326.0
github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb
github.com/ice-blockchain/wintr v1.140.0
github.com/ice-blockchain/wintr v1.141.0
github.com/imroc/req/v3 v3.43.7
github.com/oklog/ulid/v2 v2.1.0
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -110,7 +110,7 @@ require (
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.4 // indirect
github.com/googleapis/gax-go/v2 v2.12.5 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
Expand Down Expand Up @@ -207,7 +207,7 @@ require (
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.22.0 // indirect
google.golang.org/api v0.184.0 // indirect
google.golang.org/api v0.185.0 // indirect
google.golang.org/appengine/v2 v2.0.6 // indirect
google.golang.org/genproto v0.0.0-20240617180043-68d350f18fd4 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs=
github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0=
github.com/googleapis/gax-go/v2 v2.12.4 h1:9gWcmF85Wvq4ryPFvGFaOgPIs1AQX0d0bcbGw4Z96qg=
github.com/googleapis/gax-go/v2 v2.12.4/go.mod h1:KYEYLorsnIGDi/rPC8b5TdlB9kbKoFubselGIoBMCwI=
github.com/googleapis/gax-go/v2 v2.12.5 h1:8gw9KZK8TiVKB6q3zHY3SBzLnrGp6HQjyfYBYGmXdxA=
github.com/googleapis/gax-go/v2 v2.12.5/go.mod h1:BUDKcWo+RaKq5SC9vVYL0wLADa3VcfswbOMMRmB9H3E=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
Expand Down Expand Up @@ -304,8 +304,8 @@ github.com/ice-blockchain/eskimo v1.326.0 h1:MNey0egcmR42Lsmi2A4nnl3h2nYKWkyQIry
github.com/ice-blockchain/eskimo v1.326.0/go.mod h1:ACvCKV6SSS+B/AudcS1zHzQ8BiMpsgk1PyeokF4ybxY=
github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb h1:8TnFP3mc7O+tc44kv2e0/TpZKnEVUaKH+UstwfBwRkk=
github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb/go.mod h1:ZsQU7i3mxhgBBu43Oev7WPFbIjP4TniN/b1UPNGbrq8=
github.com/ice-blockchain/wintr v1.140.0 h1:OGjjZHCD5WWcJ8BRQ5Dmv1xGSPe8DvqSuPwtpXZxUC4=
github.com/ice-blockchain/wintr v1.140.0/go.mod h1:TFymLSdt+kCAydDS9qmt37+quPUS6Amd5RMvV/4q77g=
github.com/ice-blockchain/wintr v1.141.0 h1:JkQkjXoXLmnLh3hoRzr1SbiVXvdzNaWrkHjRxQqS1yQ=
github.com/ice-blockchain/wintr v1.141.0/go.mod h1:6INwquKAzF28T1DwAh/n3fPTpuyGQQz9cSUbDjxSCAY=
github.com/imroc/req/v3 v3.43.7 h1:dOcNb9n0X83N5/5/AOkiU+cLhzx8QFXjv5MhikazzQA=
github.com/imroc/req/v3 v3.43.7/go.mod h1:SQIz5iYop16MJxbo8ib+4LnostGCok8NQf8ToyQc2xA=
github.com/ip2location/ip2location-go/v9 v9.7.0 h1:ipwl67HOWcrw+6GOChkEXcreRQR37NabqBd2ayYa4Q0=
Expand Down Expand Up @@ -674,8 +674,8 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU=
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90=
google.golang.org/api v0.184.0 h1:dmEdk6ZkJNXy1JcDhn/ou0ZUq7n9zropG2/tR4z+RDg=
google.golang.org/api v0.184.0/go.mod h1:CeDTtUEiYENAf8PPG5VZW2yNp2VM3VWbCeTioAZBTBA=
google.golang.org/api v0.185.0 h1:ENEKk1k4jW8SmmaT6RE+ZasxmxezCrD5Vw4npvr+pAU=
google.golang.org/api v0.185.0/go.mod h1:HNfvIkJGlgrIlrbYkAm9W9IdkmKZjOTVh33YltygGbg=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine/v2 v2.0.6 h1:LvPZLGuchSBslPBp+LAhihBeGSiRh1myRoYK4NtuBIw=
Expand Down
4 changes: 2 additions & 2 deletions miner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func init() {

type telemetry struct {
registry metrics.Registry
steps [10]string
steps [11]string
currentStepName string
cfg config
}
Expand All @@ -31,7 +31,7 @@ func (t *telemetry) mustInit(cfg config) *telemetry {
)
t.cfg = cfg
t.registry = metrics.NewRegistry()
t.steps = [10]string{"mine[full iteration]", "mine", "get_users", "get_referrals", "send_messages", "get_history", "sync_quiz_status", "insert_history", "collect_coin_distributions", "update_users"} //nolint:lll // .
t.steps = [11]string{"mine[full iteration]", "mine", "get_users", "get_referrals", "get_t1_rankings", "send_messages", "get_history", "sync_quiz_status", "insert_history", "collect_coin_distributions", "update_users"} //nolint:lll // .
for ix := range &t.steps {
if ix > 1 {
t.steps[ix] = fmt.Sprintf("[%v]mine.%v", ix-1, t.steps[ix])
Expand Down
105 changes: 88 additions & 17 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
t0Referrals, tMinus1Referrals = make(map[int64]*referral, batchSize), make(map[int64]*referral, batchSize)
t1ReferralsToIncrementActiveValue, t2ReferralsToIncrementActiveValue = make(map[int64]int32, batchSize), make(map[int64]int32, batchSize)
t1ReferralsThatStoppedMining, t2ReferralsThatStoppedMining = make(map[int64]uint32, batchSize), make(map[int64]uint32, batchSize)
activeReferralsOfT0, activeReferralsOfT0RankingKeys = make(map[int64]map[int64]struct{}, batchSize), make(map[string]int64, batchSize)
balanceT1EthereumIncr, balanceT2EthereumIncr = make(map[int64]float64, batchSize), make(map[int64]float64, batchSize)
pendingBalancesForTMinus1, pendingBalancesForT0 = make(map[int64]float64, batchSize), make(map[int64]float64, batchSize)
referralsThatStoppedMining = make([]*referralThatStoppedMining, 0, batchSize)
Expand Down Expand Up @@ -246,6 +247,12 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
for k := range t2ReferralsToIncrementActiveValue {
delete(t2ReferralsToIncrementActiveValue, k)
}
for k := range activeReferralsOfT0RankingKeys {
delete(activeReferralsOfT0RankingKeys, k)
}
for k := range activeReferralsOfT0 {
delete(activeReferralsOfT0, k)
}
for k := range balanceT1EthereumIncr {
delete(balanceT1EthereumIncr, k)
}
Expand Down Expand Up @@ -327,10 +334,6 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
go m.telemetry.collectElapsed(3, *before.Time)
}

/******************************************************************************************************************************************************
3. Mining for the users.
******************************************************************************************************************************************************/

for _, ref := range referralResults {
if !isAdvancedTeamDisabled(ref.LatestDevice) {
if _, found := tMinus1Referrals[ref.ID]; found {
Expand All @@ -341,6 +344,65 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
t0Referrals[ref.ID] = ref
}
}

/******************************************************************************************************************************************************
3. Fetch ranking of active T1 referrals (N most recent active), to update ForT0Balance for them
******************************************************************************************************************************************************/
for idT0 := range t0Referrals {
t0Ref := t0Referrals[idT0]
if t0Ref.MiningBoostLevelIndex != nil {
activeReferralsOfT0RankingKeys[fmt.Sprintf("%v_active_t1_referrals_ranking", idT0)] = idT0
}
}
if len(activeReferralsOfT0RankingKeys) > 0 {
before = time.Now()
reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline)
if zRangeResult, zErr := m.db.Pipelined(reqCtx, func(pipeliner redis.Pipeliner) error {
for key, t0Id := range activeReferralsOfT0RankingKeys {
t0Ref := t0Referrals[t0Id]
rangeBy := &redis.ZRangeBy{Min: "0", Max: "+inf", Offset: int64(0), Count: int64((*cfg.miningBoostLevels.Load())[int(*t0Ref.MiningBoostLevelIndex)].MaxT1Referrals)}
if _, err := pipeliner.ZRevRangeByScore(ctx, key, rangeBy).Result(); err != nil {
return err
}
}
return nil
}); zErr != nil {
log.Error(errors.Wrapf(zErr, "[miner] failed to get active T1 referrals ranking for batchNumber:%v,workerNumber:%v", batchNumber, workerNumber))
reqCancel()
resetVars(false)

continue
} else {
reqCancel()
for _, cmdRes := range zRangeResult {
if cmdRes.Err() != nil {
errs = append(errs, errors.Wrapf(cmdRes.Err(), "failed to fetch active referrals ranking %#v for batchNumber:%v,workerNumber:%v", cmdRes.Args(), batchNumber, workerNumber))
continue
}
sliceRes := cmdRes.(*redis.StringSliceCmd)
key := sliceRes.Args()[1].(string)
ids := make(map[int64]struct{}, 0)
for _, v := range sliceRes.Val() {
id, _ := strconv.Atoi(v)
ids[int64(id)] = struct{}{}
}
activeReferralsOfT0[activeReferralsOfT0RankingKeys[key]] = ids
}
if cmdErr := multierror.Append(nil, errs...).ErrorOrNil(); cmdErr != nil {
log.Error(errors.Wrapf(cmdErr, "[miner] failed to get active T1 referrals ranking for batchNumber:%v,workerNumber:%v", batchNumber, workerNumber))
reqCancel()
resetVars(false)

continue
}
}
go m.telemetry.collectElapsed(4, *before.Time)
}

/******************************************************************************************************************************************************
4. Mining for the users.
******************************************************************************************************************************************************/

shouldSynchronizeBalance := shouldSynchronizeBalanceFunc(uint64(batchNumber))
for _, usr := range userResults {
if usr.UserID == "" {
Expand All @@ -362,7 +424,13 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
if isAdvancedTeamDisabled(usr.LatestDevice) {
usr.ActiveT2Referrals = 0
}
updatedUser, shouldGenerateHistory, IDT0Changed, pendingAmountForTMinus1, pendingAmountForT0 := mine(now, usr, t0Ref, tMinus1Ref)
userFittingToT0RefLimit := false
if t0Ref != nil {
if t0ActiveRefs, t0HasActiveRefs := activeReferralsOfT0[t0Ref.ID]; t0HasActiveRefs {
_, userFittingToT0RefLimit = t0ActiveRefs[usr.ID]
}
}
updatedUser, shouldGenerateHistory, IDT0Changed, pendingAmountForTMinus1, pendingAmountForT0 := mine(now, usr, t0Ref, tMinus1Ref, userFittingToT0RefLimit)
if shouldGenerateHistory {
syncQuizUserIDs = append(syncQuizUserIDs, usr.UserID)
userHistoryKeys = append(userHistoryKeys, usr.Key())
Expand Down Expand Up @@ -433,11 +501,14 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
}

/******************************************************************************************************************************************************
4. Sending messages to the broker.
5. Sending messages to the broker.
******************************************************************************************************************************************************/

before = time.Now()
reqCtx, reqCancel = context.WithTimeout(context.Background(), requestDeadline)
if len(errs) != 0 {
errs = errs[:0]
}
for _, message := range msgs {
m.mb.SendMessage(reqCtx, message, msgResponder)
}
Expand All @@ -453,11 +524,11 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
}
reqCancel()
if len(msgs) > 0 {
go m.telemetry.collectElapsed(4, *before.Time)
go m.telemetry.collectElapsed(5, *before.Time)
}

/******************************************************************************************************************************************************
5. Fetching all relevant fields that will be added to the history/bookkeeping.
6. Fetching all relevant fields that will be added to the history/bookkeeping.
******************************************************************************************************************************************************/

before = time.Now()
Expand All @@ -472,11 +543,11 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
reqCancel()

if len(userHistoryKeys) > 0 {
go m.telemetry.collectElapsed(5, *before.Time)
go m.telemetry.collectElapsed(6, *before.Time)
}

/******************************************************************************************************************************************************
6. Syncing quiz state
7. Syncing quiz state
******************************************************************************************************************************************************/
before = time.Now()
if false && (len(syncQuizUserIDs) > 0 && len(histories) > 0) {
Expand All @@ -498,12 +569,12 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
histories[i].KYCQuizCompleted = quizSync.KYCQuizCompleted
}
}
go m.telemetry.collectElapsed(6, *before.Time)
go m.telemetry.collectElapsed(7, *before.Time)
}
}

/******************************************************************************************************************************************************
7. Inserting history/bookkeeping data.
8. Inserting history/bookkeeping data.
******************************************************************************************************************************************************/

before = time.Now()
Expand All @@ -517,11 +588,11 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
}
reqCancel()
if len(histories) > 0 {
go m.telemetry.collectElapsed(7, *before.Time)
go m.telemetry.collectElapsed(8, *before.Time)
}

/******************************************************************************************************************************************************
8. Processing Ethereum Coin Distributions for eligible users.
9. Processing Ethereum Coin Distributions for eligible users.
******************************************************************************************************************************************************/

before = time.Now()
Expand All @@ -535,11 +606,11 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
}
reqCancel()
if len(coinDistributions) > 0 {
go m.telemetry.collectElapsed(8, *before.Time)
go m.telemetry.collectElapsed(9, *before.Time)
}

/******************************************************************************************************************************************************
9. Persisting the mining progress for the users.
10. Persisting the mining progress for the users.
******************************************************************************************************************************************************/

for _, usr := range referralsThatStoppedMining {
Expand Down Expand Up @@ -667,7 +738,7 @@ func (m *miner) mine(ctx context.Context, workerNumber int64) {
}
}
if transactional || len(updatedUsers) > 0 {
go m.telemetry.collectElapsed(9, *before.Time)
go m.telemetry.collectElapsed(10, *before.Time)
}

batchNumber++
Expand Down
6 changes: 4 additions & 2 deletions miner/mining.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/ice-blockchain/wintr/time"
)

func mine(now *time.Time, usr *user, t0Ref, tMinus1Ref *referral) (updatedUser *user, shouldGenerateHistory, IDT0Changed bool, pendingAmountForTMinus1, pendingAmountForT0 float64) {
func mine(now *time.Time, usr *user, t0Ref, tMinus1Ref *referral, userFittingToT0RefLimit bool) (updatedUser *user, shouldGenerateHistory, IDT0Changed bool, pendingAmountForTMinus1, pendingAmountForT0 float64) {
if usr == nil || usr.MiningSessionSoloStartedAt.IsNil() || usr.MiningSessionSoloEndedAt.IsNil() {
return nil, false, false, 0, 0
}
Expand Down Expand Up @@ -119,7 +119,9 @@ func mine(now *time.Time, usr *user, t0Ref, tMinus1Ref *referral) (updatedUser *
}
if t0Ref != nil && !t0Ref.MiningSessionSoloEndedAt.IsNil() && t0Ref.MiningSessionSoloEndedAt.After(*now.Time) {
rate := 25 * baseMiningRate * elapsedTimeFraction / 100
updatedUser.BalanceForT0 += rate
if userFittingToT0RefLimit {
updatedUser.BalanceForT0 += rate
}
updatedUser.BalanceT0 += rate
mintedAmount += rate

Expand Down
Loading