Skip to content

Commit

Permalink
Merge pull request #132 from statechannels/multi-hop
Browse files Browse the repository at this point in the history
Add support for multihop payments
  • Loading branch information
lalexgap authored Oct 20, 2022
2 parents 3df3fa6 + 0699615 commit f74bb77
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 23 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type RunConfig struct {
NumPayees uint
NumPayers uint
NumPayeePayers uint
NumIntermediaries uint
ConcurrentPaymentJobs uint
NetworkJitter time.Duration
NetworkLatency time.Duration
Expand All @@ -37,6 +38,7 @@ func GetRunConfig(runEnv *runtime.RunEnv) (RunConfig, error) {
config.NumPayees = uint(runEnv.IntParam(string(numPayeeParam)))
config.NumPayers = uint(runEnv.IntParam(string(numPayersParam)))
config.NumPayeePayers = uint(runEnv.IntParam(string(NumPayeePayersParam)))
config.NumIntermediaries = uint(runEnv.IntParam(string(NumIntermediaries)))
config.NetworkJitter = time.Duration(runEnv.IntParam(string(networkJitterParam))) * time.Millisecond
config.NetworkLatency = time.Duration(runEnv.IntParam(string(networkLatencyParam))) * time.Millisecond
config.PaymentTestDuration = time.Duration(runEnv.IntParam(string(paymentTestDurationParam))) * time.Second
Expand Down
1 change: 1 addition & 0 deletions config/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const (
numPayeeParam param = "numOfPayees"
numPayersParam param = "numOfPayers"
NumPayeePayersParam param = "numOfPayeePayers"
NumIntermediaries param = "numOfIntermediaries"
networkJitterParam param = "networkJitter"
networkLatencyParam param = "networkLatency"
concurrentPaymentJobsParam param = "concurrentPaymentJobs"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.19
require (
github.com/miguelmota/go-ethereum-hdwallet v0.1.1
github.com/multiformats/go-multiaddr v0.7.0
github.com/statechannels/go-nitro v0.0.0-20221006155253-f0e74994b9e4
github.com/statechannels/go-nitro v0.0.0-20221009024643-ab7b1a648d10
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -820,8 +820,8 @@ github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzu
github.com/spf13/pflag v1.0.1-0.20171106142849-4c012f6dcd95/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/statechannels/go-nitro v0.0.0-20221006155253-f0e74994b9e4 h1:3I63UGzArK77M6umURgToOzi/WkJ3UXfW6kOkEPUYfU=
github.com/statechannels/go-nitro v0.0.0-20221006155253-f0e74994b9e4/go.mod h1:Dg68KKTZKjsWhSQbSa5HLJtzzgxtGqofj0JoqMmb8RU=
github.com/statechannels/go-nitro v0.0.0-20221009024643-ab7b1a648d10 h1:xRQnotT6CXS7MQ8y1ZeQxs3gJ0JOqDcaAVOZwstL/CE=
github.com/statechannels/go-nitro v0.0.0-20221009024643-ab7b1a648d10/go.mod h1:Dg68KKTZKjsWhSQbSa5HLJtzzgxtGqofj0JoqMmb8RU=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 h1:Gb2Tyox57NRNuZ2d3rmvB3pcmbu7O1RS3m8WRx7ilrg=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
1 change: 1 addition & 0 deletions manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ isNightly = {type = "bool", default = false, desc = "Whether this test is being
networkJitter = {type = "int", unit = "milliseconds", default = 0}
networkLatency = {type = "int", unit = "milliseconds", default = 0}
numOfHubs = {type = "int", default = 1, desc = "The number of instances that should play the role of the hub"}
numOfIntermediaries = {type = "int", default = 1, desc = "The number of intermediaries(hops) to use in the virtual payment channel"}
numOfPayeePayers = {type = "int", default = 0, desc = "The number of instances that should play the role of the payeepayer"}
numOfPayees = {type = "int", default = 1, desc = "The number of instances that should play the role of the payee"}
numOfPayers = {type = "int", default = 1, desc = "The number of instances that should play the role of the payer"}
Expand Down
1 change: 1 addition & 0 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
type PeerInfo struct {
p2pms.PeerInfo
Role Role
Seq int64
}

// IsPayer returns true if the peer's role is a Payer or PayeePayer
Expand Down
42 changes: 25 additions & 17 deletions tests/virtual-payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func CreateVirtualPaymentTest(runEnv *runtime.RunEnv, init *run.InitContext) err
if err != nil {
panic(err)
}

// Test
role := peer.GetRole(seq, runConfig)
// We use the sequence in the random source so we generate a unique key even if another client is running at the same time
privateKey, err := crypto.GenerateKey()
Expand All @@ -66,10 +66,10 @@ func CreateVirtualPaymentTest(runEnv *runtime.RunEnv, init *run.InitContext) err
ms := p2pms.NewMessageService(ipAddress, port, pk)
client.MustSignalAndWait(ctx, "msStarted", runEnv.TestInstanceCount)

mePeerInfo := peer.PeerInfo{PeerInfo: p2pms.PeerInfo{Address: address, IpAddress: ipAddress, Port: port, Id: ms.Id()}, Role: role}
mePeerInfo := peer.PeerInfo{PeerInfo: p2pms.PeerInfo{Address: address, IpAddress: ipAddress, Port: port, Id: ms.Id()}, Role: role, Seq: seq}
me := peer.MyInfo{PeerInfo: mePeerInfo, PrivateKey: *privateKey}

runEnv.RecordMessage("I am %+v", me)
runEnv.RecordMessage("I am address:%s role:%d seq:%d", me.Address, me.Role, me.Seq)

utils.RecordRunInfo(me, runConfig, runEnv.R())

Expand Down Expand Up @@ -104,13 +104,8 @@ func CreateVirtualPaymentTest(runEnv *runtime.RunEnv, init *run.InitContext) err

client.MustSignalAndWait(ctx, "message service connected", runEnv.TestInstanceCount)

ledgerIds := []types.Destination{}

if me.Role != peer.Hub {
// Create ledger channels with all the hubs
ledgerIds = utils.CreateLedgerChannels(nClient, cm, utils.FINNEY_IN_WEI, me.PeerInfo, peers)

}
// Create ledger channels with all the hubs
ledgerIds := utils.CreateLedgerChannels(nClient, cm, utils.FINNEY_IN_WEI, me.PeerInfo, peers)

client.MustSignalAndWait(ctx, sync.State("ledgerDone"), runEnv.TestInstanceCount)

Expand All @@ -121,7 +116,7 @@ func CreateVirtualPaymentTest(runEnv *runtime.RunEnv, init *run.InitContext) err
payees = append(payees, peer.FilterByRole(peers, peer.PayerPayee)...)

createVirtualPaymentsJob := func() {
randomHub := utils.SelectRandom(hubs)
selectedHubs := utils.SelectRandomHubs(hubs, int(runConfig.NumIntermediaries))
randomPayee := utils.SelectRandom(payees)

var channelId types.Destination
Expand All @@ -140,12 +135,12 @@ func CreateVirtualPaymentTest(runEnv *runtime.RunEnv, init *run.InitContext) err
},
}}

r := nClient.CreateVirtualPaymentChannel([]types.Address{randomHub.Address}, randomPayee.Address, 0, outcome)
r := nClient.CreateVirtualPaymentChannel(selectedHubs, randomPayee.Address, 0, outcome)

channelId = r.ChannelId
cm.WaitForObjectivesToComplete([]protocols.ObjectiveId{r.Id})

runEnv.RecordMessage("Opened virtual channel %s with %s using hub %s", utils.Abbreviate(channelId), utils.Abbreviate(randomPayee.Address), utils.Abbreviate(randomHub.Address))
runEnv.RecordMessage("Opened virtual channel %s with %s using hubs %s", utils.Abbreviate(channelId), utils.Abbreviate(randomPayee.Address), utils.AbbreviateSlice(selectedHubs))

paymentAmount := big.NewInt(utils.KWEI_IN_WEI)
nClient.Pay(r.ChannelId, paymentAmount)
Expand Down Expand Up @@ -178,16 +173,29 @@ func CreateVirtualPaymentTest(runEnv *runtime.RunEnv, init *run.InitContext) err

// Run the job(s)
utils.RunJobs(createVirtualPaymentsJob, runConfig.PaymentTestDuration, int64(runConfig.ConcurrentPaymentJobs))

// We wait to allow hubs to finish processing messages and close out their channels.
// The duration we wait is based on the payment test duration and the amount of concurrent jobs.
toSleep := runConfig.PaymentTestDuration / 10 * time.Duration(runConfig.ConcurrentPaymentJobs)
// Restrict the sleep duration to be between 1 and 30 seconds
if toSleep > 30*time.Second {
toSleep = 30 * time.Second
}
if toSleep < 1*time.Second {
toSleep = 1 * time.Second
}
runEnv.RecordMessage("Waiting %s before closing ledger channels", toSleep)
time.Sleep(toSleep)
}
client.MustSignalAndWait(ctx, "paymentsDone", runEnv.TestInstanceCount)

if me.Role != peer.Hub {
// TODO: Closing a ledger channel too soon after closing a virtual channel seems to fail.
time.Sleep(time.Duration(250 * time.Millisecond))
// TODO: Closing as a hub seems to fail: https://github.com/statechannels/go-nitro-testground/issues/134
// For now we avoid closing as a hub
if len(ledgerIds) > 0 && me.Role != peer.Hub {
// Close all the ledger channels with the hub
oIds := []protocols.ObjectiveId{}
for _, ledgerId := range ledgerIds {
runEnv.RecordMessage("Closing ledger %s", utils.Abbreviate(ledgerId))

oId := nClient.CloseLedgerChannel(ledgerId)
oIds = append(oIds, oId)
}
Expand Down
54 changes: 51 additions & 3 deletions utils/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/big"
"math/rand"
"runtime/debug"
"sort"

s "sync"
"sync/atomic"
Expand Down Expand Up @@ -95,6 +96,19 @@ func RunJobs(job func(), duration time.Duration, concurrencyTarget int64) {
wg.Wait()
}

// AbbreviateSlice returns a string with abbreviated elements of the given slice.
func AbbreviateSlice[U ~[]T, T fmt.Stringer](col U) string {
abbreviated := ""
for i, s := range col {
if i > 0 {
abbreviated += ", "
}
abbreviated += s.String()[0:8]
}

return abbreviated
}

// Abbreviate shortens a string to 8 characters and adds an ellipsis.
func Abbreviate(s fmt.Stringer) string {
return s.String()[0:8] + ".."
Expand Down Expand Up @@ -128,16 +142,51 @@ func SelectRandom[U ~[]T, T any](collection U) T {
return collection[randomIndex]
}

// SelectRandomHubs selects numHub hubs randomly from hubs
func SelectRandomHubs(hubs []peer.PeerInfo, numHubs int) []types.Address {
// Copy and shuffle the slice of hubs
shuffled := make([]peer.PeerInfo, len(hubs))
copy(shuffled, hubs)
rand.Shuffle(len(shuffled),
func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })

// Select the amount of hubs we want
selected := make([]peer.PeerInfo, numHubs)
for i := 0; i < numHubs; i++ {
selected[i] = shuffled[i]
}

// TODO: Virtual defunding seems to fail if intermediaries are not in "order".
// The order seems to be determined by the initiator of the ledger channel.
// Since we use the sequence number to determine the initiator, we can just sort on sequence number.
sort.Slice(selected, func(i, j int) bool {
return selected[i].Seq < selected[j].Seq
})

// Convert to addresses for the callers convenience
selectedAddresses := make([]types.Address, numHubs)
for i, hub := range selected {
selectedAddresses[i] = hub.Address
}
return selectedAddresses
}

// CreateLedgerChannels creates a directly funded ledger channel with each hub in hubs.
// The funding for each channel will be set to amount for both participants.
// This function blocks until all ledger channels have successfully been created.
// If the participant is a hub we use the participant's Seq to determine the initiator of the ledger channel.
func CreateLedgerChannels(client nitro.Client, cm *CompletionMonitor, amount uint, me peer.PeerInfo, peers []peer.PeerInfo) []types.Destination {
ids := []protocols.ObjectiveId{}
cIds := []types.Destination{}
for _, p := range peers {
if p.Role != peer.Hub {
hubs := peer.FilterByRole(peers, peer.Hub)
for _, p := range hubs {

// To co-ordinate creating ledger channels between hubs a hub will
// only create a channel with another hub if it has a greater sequence number.
if me.Role == peer.Hub && p.Seq <= me.Seq {
continue
}

outcome := outcome.Exit{outcome.SingleAssetExit{
Allocations: outcome.Allocations{
outcome.Allocation{
Expand All @@ -150,7 +199,6 @@ func CreateLedgerChannels(client nitro.Client, cm *CompletionMonitor, amount uin
},
},
}}

r := client.CreateLedgerChannel(p.Address, 0, outcome)
cIds = append(cIds, r.ChannelId)
ids = append(ids, r.Id)
Expand Down

0 comments on commit f74bb77

Please sign in to comment.