Skip to content

Commit

Permalink
feat(netsim): model geographical links delays
Browse files Browse the repository at this point in the history
This change allows us to model geographical links delays, which
makes running this simulation a bit more realistic.
  • Loading branch information
bassosimone committed Nov 27, 2024
1 parent b37fe75 commit e402883
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 2 deletions.
11 changes: 9 additions & 2 deletions netsim/example_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"io"
"log"
"net/http"
"time"

"github.com/rbmk-project/x/netsim"
"github.com/rbmk-project/x/netsim/geolink"
)

// This example shows how to use a router to simulate a network
Expand All @@ -33,9 +35,14 @@ func Example_router() {
// Create server stack emulating www.example.com.
scenario.Attach(scenario.MustNewExampleComStack())

// Create and attach the client stack.
// Create the client stack, build a geographic point-to-point link
// and attach the scenario router to the other end of the link.
clientStack := scenario.MustNewClientStack()
scenario.Attach(clientStack)
linkDev := geolink.Extend(clientStack, &geolink.Config{
Delay: 10 * time.Millisecond,
Log: true,
})
scenario.Attach(linkDev)

// Create the HTTP client
clientTxp := scenario.NewHTTPTransport(clientStack)
Expand Down
162 changes: 162 additions & 0 deletions netsim/geolink/geolink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
//
// SPDX-License-Identifier: BSD-3-Clause
//
// Adapted from: https://github.com/ooni/netem/blob/main/linkfwdfull.go
//

// Package geolink models a geographic point-to-point link.
package geolink

import (
"log"
"net/netip"
"time"

"github.com/rbmk-project/x/netsim/packet"
)

// Config configures a geographic point-to-point link.
type Config struct {
// Delay is the propagation delay.
Delay time.Duration

// Log enables logging of delivered packets.
Log bool
}

// baseDevice is the common implementation for the
// devices type returned by this package.
type baseDevice struct {
input chan *packet.Packet
output chan *packet.Packet
}

func (*baseDevice) Addresses() []netip.Addr {
return nil
}

func (*baseDevice) EOF() <-chan struct{} {
return nil
}

// internalDevice wraps baseDevice and swaps input/output channels. This is required
// to properly forward packets between devices because the internal device's output is
// connected to the base device's input and the internal device's input is connected
// to the base device's output.
type internalDevice struct {
*baseDevice
}

func (id *internalDevice) Input() chan<- *packet.Packet {
return id.output
}

func (id *internalDevice) Output() <-chan *packet.Packet {
return id.input
}

// externalDevice presents the public interface of the
// geographic link. It preserves the normal channel direction
// (input for receiving, output for sending) and is what
// we return to external callers.
type externalDevice struct {
*baseDevice
}

func (ed *externalDevice) Input() chan<- *packet.Packet {
return ed.input
}

func (ed *externalDevice) Output() <-chan *packet.Packet {
return ed.output
}

// Extend creates a geographic link between the
// given device and the returned device.
//
// Internally, this creates the following link:
//
// external <=> dev
//
// where:
//
// - dev is the device passed as argument
//
// - external is the device returned to the caller
//
// Packets flowing through this chain experience
// the configured delay in both directions.
//
// We create two goroutines for forwarding packets,
// which are closed when dev is closed.
func Extend(dev packet.NetworkDevice, config *Config) packet.NetworkDevice {
input, output := packet.NewNetworkDeviceIOChannels()
local := &baseDevice{
input: input,
output: output,
}
go forward(dev, &internalDevice{local}, config)
go forward(&internalDevice{local}, dev, config)
return &externalDevice{local}
}

type sourceDevice interface {
EOF() <-chan struct{}
Output() <-chan *packet.Packet
}

type destDevice interface {
EOF() <-chan struct{}
Input() chan<- *packet.Packet
}

// forward implements packet forwarding with propagation delay.
//
// It maintains a queue of packets and uses a timer to implement the
// configured delay. The timer is only active when there are
// packets to forward, otherwise it runs with a long interval to
// avoid consuming resources.
//
// Packets are forwarded in order and the delay is applied to each
// packet individually. This models how packets travel through a
// physical link where the propagation delay applies to each packet.
func forward(src sourceDevice, dst destDevice, config *Config) {
delay := max(time.Millisecond, config.Delay)
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
var packets []*packet.Packet
for {
select {
case pkt := <-src.Output():
packets = append(packets, pkt)
if len(packets) == 1 {
ticker.Reset(delay)
}

case <-ticker.C:
pkt := packets[0]
packets = packets[1:]
if len(packets) <= 0 {
ticker.Reset(time.Minute)
}

if config.Log {
log.Printf("geolink: %s", pkt)
}

select {
case dst.Input() <- pkt:
// delivered to destination
case <-src.EOF():
return
case <-dst.EOF():
return
}

case <-src.EOF():
return
case <-dst.EOF():
return
}
}
}

0 comments on commit e402883

Please sign in to comment.