Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for deterministic listener ports (based on broker ID) #183

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/kafka-proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func initFlags() {
Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))")
Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started")
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")
Server.Flags().BoolVar(&c.Proxy.DeterministicListeners, "deterministic-listeners", false, "Enable deterministic listeners (listener port = min port + broker id).")
Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.")
Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.")

Expand Down
7 changes: 6 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ var (
Version = "unknown"
)

type NetAddressMappingFunc func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error)
type NetAddressMappingFunc func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error)

type ListenerConfig struct {
BrokerAddress string
ListenerAddress string
AdvertisedAddress string
}
type IdListenerConfig struct {
BrokerAddress string
Listener net.Listener
}
type DialAddressMapping struct {
SourceAddress string
DestinationAddress string
Expand Down Expand Up @@ -74,6 +78,7 @@ type Config struct {
DefaultListenerIP string
BootstrapServers []ListenerConfig
ExternalServers []ListenerConfig
DeterministicListeners bool
DialAddressMappings []DialAddressMapping
DisableDynamicListeners bool
DynamicAdvertisedListener string
Expand Down
7 changes: 4 additions & 3 deletions proxy/processor_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package proxy
import (
"bytes"
"encoding/hex"
"testing"
"time"

"github.com/grepplabs/kafka-proxy/proxy/protocol"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestHandleRequest(t *testing.T) {
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestHandleRequest(t *testing.T) {
}

func TestHandleResponse(t *testing.T) {
netAddressMappingFunc := func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
netAddressMappingFunc := func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
if brokerHost == "localhost" {
switch brokerPort {
case 19092:
Expand Down
13 changes: 11 additions & 2 deletions proxy/protocol/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
brokersKeyName = "brokers"
hostKeyName = "host"
portKeyName = "port"
nodeKeyName = "node_id"

coordinatorKeyName = "coordinator"
coordinatorsKeyName = "coordinators"
Expand Down Expand Up @@ -320,12 +321,16 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu
if !ok {
return errors.New("broker.port not found")
}
nodeId, ok := broker.Get(nodeKeyName).(int32)
if !ok {
return errors.New("broker.node_id not found")
}

if host == "" && port <= 0 {
continue
}

newHost, newPort, err := fn(host, port)
newHost, newPort, err := fn(host, port, nodeId)
if err != nil {
return err
}
Expand Down Expand Up @@ -383,12 +388,16 @@ func modifyCoordinator(decodedStruct *Struct, fn config.NetAddressMappingFunc) e
if !ok {
return errors.New("coordinator.port not found")
}
nodeId, ok := coordinator.Get(nodeKeyName).(int32)
if !ok {
return errors.New("coordinator.node_id not found")
}

if host == "" && port <= 0 {
return nil
}

newHost, newPort, err := fn(host, port)
newHost, newPort, err := fn(host, port, nodeId)
if err != nil {
return err
}
Expand Down
9 changes: 5 additions & 4 deletions proxy/protocol/responses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package protocol
import (
"encoding/hex"
"fmt"
"github.com/google/uuid"
"reflect"
"strings"
"testing"

"github.com/google/uuid"

"github.com/grepplabs/kafka-proxy/config"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
Expand All @@ -20,7 +21,7 @@ var (
// topic_metadata
0x00, 0x00, 0x00, 0x00}

testResponseModifier = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
testResponseModifier = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
if brokerHost == "localhost" && brokerPort == 51 {
return "myhost1", 34001, nil
} else if brokerHost == "google.com" && brokerPort == 273 {
Expand All @@ -31,7 +32,7 @@ var (
return "", 0, errors.New("unexpected data")
}

testResponseModifier2 = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
testResponseModifier2 = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
if brokerHost == "localhost" && brokerPort == 19092 {
return "myhost1", 34001, nil
} else if brokerHost == "localhost" && brokerPort == 29092 {
Expand Down Expand Up @@ -374,7 +375,7 @@ func TestMetadataResponseV0(t *testing.T) {
a.Nil(err)
a.Equal(bytes, resp)

modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
if brokerHost == "localhost" && brokerPort == 51 {
return "azure.microsoft.com", 34001, nil
} else if brokerHost == "google.com" && brokerPort == 273 {
Expand Down
58 changes: 42 additions & 16 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ type Listeners struct {

listenFunc ListenFunc

deterministicListeners bool
disableDynamicListeners bool
dynamicSequentialMinPort int

brokerToListenerConfig map[string]config.ListenerConfig
lock sync.RWMutex
brokerToListenerConfig map[string]config.ListenerConfig
brokerIdToIdListenerConfig map[int32]config.IdListenerConfig
lock sync.RWMutex
}

func NewListeners(cfg *config.Config) (*Listeners, error) {
Expand Down Expand Up @@ -64,15 +66,19 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
return nil, err
}

brokerIdToIdListenerConfig := make(map[int32]config.IdListenerConfig)

return &Listeners{
defaultListenerIP: defaultListenerIP,
dynamicAdvertisedListener: dynamicAdvertisedListener,
connSrc: make(chan Conn, 1),
brokerToListenerConfig: brokerToListenerConfig,
tcpConnOptions: tcpConnOptions,
listenFunc: listenFunc,
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
defaultListenerIP: defaultListenerIP,
dynamicAdvertisedListener: dynamicAdvertisedListener,
connSrc: make(chan Conn, 1),
brokerToListenerConfig: brokerToListenerConfig,
brokerIdToIdListenerConfig: brokerIdToIdListenerConfig,
tcpConnOptions: tcpConnOptions,
listenFunc: listenFunc,
deterministicListeners: cfg.Proxy.DeterministicListeners,
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
}, nil
}

Expand Down Expand Up @@ -117,7 +123,7 @@ func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerCo
return brokerToListenerConfig, nil
}

func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
if brokerHost == "" || brokerPort <= 0 {
return "", 0, fmt.Errorf("broker address '%s:%d' is invalid", brokerHost, brokerPort)
}
Expand All @@ -126,30 +132,49 @@ func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32) (l

p.lock.RLock()
listenerConfig, ok := p.brokerToListenerConfig[brokerAddress]
idListenerConfig, brokerIdFound := p.brokerIdToIdListenerConfig[brokerId]
p.lock.RUnlock()

if ok {
logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s", listenerConfig.BrokerAddress, listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress)
return util.SplitHostPort(listenerConfig.AdvertisedAddress)
}
if !p.disableDynamicListeners {
if brokerIdFound {
logrus.Infof("Broker ID %d has a new advertised listener, closing existing dynamic listener", brokerId)
// Existing broker ID found, but with a different upstream broker
// Close existing listener, remove two mappings:
// * ID to removed upstream broker
// * removed upstream broker
idListenerConfig.Listener.Close()
p.lock.Lock()
delete(p.brokerIdToIdListenerConfig, brokerId)
delete(p.brokerToListenerConfig, idListenerConfig.BrokerAddress)
p.lock.Unlock()
}
logrus.Infof("Starting dynamic listener for broker %s", brokerAddress)
return p.ListenDynamicInstance(brokerAddress)
return p.ListenDynamicInstance(brokerAddress, brokerId)
}
return "", 0, fmt.Errorf("net address mapping for %s:%d was not found", brokerHost, brokerPort)
}

func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32, error) {
func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32) (string, int32, error) {
p.lock.Lock()
defer p.lock.Unlock()
// double check
if v, ok := p.brokerToListenerConfig[brokerAddress]; ok {
return util.SplitHostPort(v.AdvertisedAddress)
}

defaultListenerAddress := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
if p.dynamicSequentialMinPort != 0 {
p.dynamicSequentialMinPort += 1
var defaultListenerAddress string

if p.deterministicListeners {
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort+int(brokerId)))
} else {
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
if p.dynamicSequentialMinPort != 0 {
p.dynamicSequentialMinPort += 1
}
}

cfg := config.ListenerConfig{ListenerAddress: defaultListenerAddress, BrokerAddress: brokerAddress}
Expand All @@ -167,6 +192,7 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32,

advertisedAddress := net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port))
p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress}
p.brokerIdToIdListenerConfig[brokerId] = config.IdListenerConfig{BrokerAddress: brokerAddress, Listener: l}

logrus.Infof("Dynamic listener %s for broker %s advertised as %s", address, brokerAddress, advertisedAddress)

Expand Down
Loading