Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: gateway payment tracking for AI pipelines #3358

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions core/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package core
import (
"context"
"math/big"
"strings"
"sync"
"time"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/golang/glog"
"github.com/livepeer/go-livepeer/clog"
)

Expand Down Expand Up @@ -35,7 +37,7 @@ func (b *Balance) Credit(amount *big.Rat) {
// to send with a payment, the new credit represented by the payment and the existing credit (i.e reserved balance)
func (b *Balance) StageUpdate(minCredit, ev *big.Rat) (int, *big.Rat, *big.Rat) {
existingCredit := b.balances.Reserve(b.addr, b.manifestID)

glog.Infof("existing credit for manifest id %v: %v", string(b.manifestID), existingCredit.FloatString(3))
// If the existing credit exceeds the minimum credit then no tickets are required
// and the total payment value is 0
if existingCredit.Cmp(minCredit) >= 0 {
Expand Down Expand Up @@ -211,11 +213,16 @@ func (b *Balances) SetFixedPrice(id ManifestID, fixedPrice *big.Rat) {

func (b *Balances) cleanup() {
for id, balance := range b.balances {
b.mtx.Lock()
if int64(time.Since(balance.lastUpdate)) > int64(b.ttl) {
delete(b.balances, id)
//only cleanup Balance if not a pipeline manifestID
glog.Infof("checking to clear balance for: %v", id)
if len(strings.Split(string(id), `_`)) == 1 {
b.mtx.Lock()
if int64(time.Since(balance.lastUpdate)) > int64(b.ttl) {
glog.Infof("clearing balances session %v", id, balance.amount.FloatString(3))
delete(b.balances, id)
}
b.mtx.Unlock()
}
b.mtx.Unlock()
}
}

Expand Down
1 change: 1 addition & 0 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ func (orch *orchestrator) SufficientBalance(addr ethcommon.Address, manifestID M
}

balance := orch.node.Balances.Balance(addr, manifestID)
glog.Infof("Checking balance for %v | %v: %v", addr.Hex(), string(manifestID), balance.FloatString(3))
if balance == nil || balance.Cmp(orch.node.Recipient.EV()) < 0 {
return false
}
Expand Down
20 changes: 20 additions & 0 deletions pm/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Sender interface {
// StartSession creates a session for a given set of ticket params which tracks information
// for creating new tickets
StartSession(ticketParams TicketParams) string
StartSessionByID(ticketParams TicketParams, sessionID string) string
UpdateSessionByID(ticketParams TicketParams, sessionID string)

// CleanupSession deletes session from the internal map
CleanupSession(sessionID string)
Expand Down Expand Up @@ -75,6 +77,24 @@ func (s *sender) StartSession(ticketParams TicketParams) string {
return sessionID
}

func (s *sender) StartSessionByID(ticketParams TicketParams, sessionID string) string {
s.sessions.Store(sessionID, &session{
ticketParams: ticketParams,
senderNonce: 0,
})

return sessionID
}

func (s *sender) UpdateSessionByID(ticketParams TicketParams, sessionID string) {
_, err := s.loadSession(sessionID)
if err != nil {
s.StartSessionByID(ticketParams, sessionID)
}
session, _ := s.loadSession(sessionID)
session.ticketParams = ticketParams
}

// EV returns the ticket EV for a session
func (s *sender) EV(sessionID string) (*big.Rat, error) {
session, err := s.loadSession(sessionID)
Expand Down
7 changes: 7 additions & 0 deletions pm/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,13 @@
args := m.Called(ticketParams)
return args.String(0)
}
func (m *MockSender) StartSessionByID(ticketParams TicketParams, sessionID string) string {

Check failure on line 513 in pm/stub.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

unused-parameter: parameter 'sessionID' seems to be unused, consider removing or renaming it as _ (revive)

Check warning on line 513 in pm/stub.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

unused-parameter: parameter 'sessionID' seems to be unused, consider removing or renaming it as _ (revive)
args := m.Called(ticketParams)
return args.String(0)
}
func (m *MockSender) UpdateSessionByID(ticketParams TicketParams, sessionID string) {

Check failure on line 517 in pm/stub.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

unused-parameter: parameter 'sessionID' seems to be unused, consider removing or renaming it as _ (revive)

Check warning on line 517 in pm/stub.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

unused-parameter: parameter 'sessionID' seems to be unused, consider removing or renaming it as _ (revive)
m.Called(ticketParams)
}

// CleanupSession deletes session from the internal ma
func (m *MockSender) CleanupSession(sessionID string) {
Expand Down
62 changes: 61 additions & 1 deletion server/ai_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"sync"
"time"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/livepeer/go-livepeer/clog"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/pm"
"github.com/livepeer/go-tools/drivers"
"github.com/livepeer/lpms/stream"
)
Expand Down Expand Up @@ -272,6 +274,8 @@ func (sel *AISessionSelector) Remove(sess *AISession) {
}

func (sel *AISessionSelector) Refresh(ctx context.Context) error {
oldBalances, oldSenderSessions := sel.getBalances()

sessions, err := sel.getSessions(ctx)
if err != nil {
return err
Expand All @@ -292,6 +296,9 @@ func (sel *AISessionSelector) Refresh(ctx context.Context) error {
continue
}

// update session to persist payment balances
updateSessionForAI(sess, sel.cap, sel.modelID, sel.node.Balances, oldBalances, oldSenderSessions)

if modelConstraint.Warm {
warmSessions = append(warmSessions, sess)
} else {
Expand All @@ -307,6 +314,24 @@ func (sel *AISessionSelector) Refresh(ctx context.Context) error {
return nil
}

func (sel *AISessionSelector) getBalances() (map[string]Balance, map[string]pm.Sender) {
balances := make(map[string]Balance)
senders := make(map[string]pm.Sender)
for _, sess := range sel.warmPool.sessMap {
balances[sess.Transcoder()] = sess.Balance
senders[sess.Transcoder()] = sess.Sender
}

for _, sess := range sel.coldPool.sessMap {
if _, ok := balances[sess.Transcoder()]; !ok {
balances[sess.Transcoder()] = sess.Balance
senders[sess.Transcoder()] = sess.Sender
}
}

return balances, senders
}

func (sel *AISessionSelector) getSessions(ctx context.Context) ([]*BroadcastSession, error) {
// No warm constraints applied here because we don't want to filter out orchs based on warm criteria at discovery time
// Instead, we want all orchs that support the model and then will filter for orchs that have a warm model separately
Expand Down Expand Up @@ -367,9 +392,19 @@ func (c *AISessionManager) Select(ctx context.Context, cap core.Capability, mode
return nil, nil
}

if err := refreshSessionIfNeeded(ctx, sess.BroadcastSession); err != nil {
//send a temp session to be refreshed
// updateSession in broadcast.go updates the orchestrator OS and ticket params.
// it also updates the pm.Sender session using ticket params and the Balance using the auth token.
// we want to persist these to new
newSess := *sess.BroadcastSession
newSess.PMSessionID = strconv.Itoa(int(cap)) + "_" + modelID + "_" + "temp"
newSess.Sender.StartSessionByID(*pmTicketParams(newSess.OrchestratorInfo.TicketParams), newSess.PMSessionID)
if err := refreshSessionIfNeeded(ctx, &newSess); err != nil {
return nil, err
}
sess.BroadcastSession.OrchestratorInfo = newSess.OrchestratorInfo
sess.Sender.UpdateSessionByID(*pmTicketParams(sess.OrchestratorInfo.TicketParams), sess.PMSessionID)
//updateSessionForAI(sess.BroadcastSession, cap, modelID, c.node.Balances)

return sess, nil
}
Expand Down Expand Up @@ -415,3 +450,28 @@ func (c *AISessionManager) getSelector(ctx context.Context, cap core.Capability,

return sel, nil
}

func updateSessionForAI(sess *BroadcastSession, cap core.Capability, modelID string, balances *core.AddressBalances, oldBalances map[string]Balance, oldSenderSessions map[string]pm.Sender) {
pipelineSessionId := strconv.Itoa(int(cap)) + "_" + modelID
transcoderUrl := sess.Transcoder()
//clean up other session
if sess.PMSessionID != pipelineSessionId {
sess.CleanupSession(sess.PMSessionID)
}
// override PMSessionID to track tickets per pipeline/model
sess.lock.Lock()
defer sess.lock.Unlock()
sess.PMSessionID = strconv.Itoa(int(cap)) + "_" + modelID
// save balance between refreshes
if oldBalance, ok := oldBalances[transcoderUrl]; ok {
sess.Balance = oldBalance
} else {
sess.Balance = core.NewBalance(ethcommon.BytesToAddress(sess.OrchestratorInfo.TicketParams.Recipient), core.ManifestID(strconv.Itoa(int(cap))+"_"+modelID), balances)
}
// save sender sessions between refreshes
if oldSenderSession, ok := oldSenderSessions[transcoderUrl]; ok {
sess.Sender = oldSenderSession
} else {
sess.Sender.UpdateSessionByID(*pmTicketParams(sess.OrchestratorInfo.TicketParams), sess.PMSessionID)
}
}
2 changes: 1 addition & 1 deletion server/segment_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ func newBalanceUpdate(sess *BroadcastSession, minCredit *big.Rat) (*BalanceUpdat
}

update.NumTickets, update.NewCredit, update.ExistingCredit = sess.Balance.StageUpdate(safeMinCredit, ev)

//glog.Infof("Staged balance update - numTickets=%v newCredit=%v existingCredit=%v", update.NumTickets, update.NewCredit.FloatString(3), update.ExistingCredit.FloatString(3))
return update, nil
}

Expand Down
Loading