Skip to content

Commit

Permalink
Implements client connection pooling
Browse files Browse the repository at this point in the history
  • Loading branch information
ruffrey committed Dec 17, 2021
1 parent ec45644 commit 044cef6
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 41 deletions.
21 changes: 16 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ import (
)

const (
serverPort = 3509
// include just one server or comma-separated pool
serverIPPortPool = "127.0.0.1:3509,192.168.0.1:3509"
namespace = "default"
)

Expand All @@ -143,7 +144,7 @@ var (
)

func main() {
c := client.NewClient("127.0.0.1", serverPort, clientResponseTimeout, "very-secure3")
c := client.NewClient(serverIPPortPool, clientResponseTimeout, "very-secure3")
c.Listen(9001)

// seed some entries
Expand Down Expand Up @@ -171,11 +172,21 @@ See `server/server_test.go` for examples.

## High Availability / Failover

Rudimentary and experimental HA is possible via replication by using the `-p` peers list and `-i` self IP:host pair flags such as: `dracula-server -p "127.0.0.1:3509,127.0.0.1:3519,127.0.0.1:3529" -i 127.0.0.1:3529`. All peers in the cluster are listed, as well as the self IP and host in the cluster. These flags tell the dracula server to replicate all PUT messages to peers.
Rudimentary and experimental HA is possible via replication by using the `-p` peers list and `-i` self `IP:host` pair flags such as:
```
dracula-server -p "127.0.0.1:3509,127.0.0.1:3519,127.0.0.1:3529" -i 127.0.0.1:3529
```

where clients can connect to the pool and maintain a list of `-i` servers:
```
dracula-cli -i "127.0.0.1:3509,127.0.0.1:3519,127.0.0.1:3529" [...more flags]
```

All peers in the cluster are listed, as well as the self IP and host in the cluster. These flags tell the dracula server to replicate all PUT messages to peers.

In practice, replicatoin only meets the use case of short-lived, imperfectly consistent metrics. We run it behind HAProxy and recommend you do the same, though you could implement a wrapper on the client side as well.
In practice, replication only meets the use case of short-lived, imperfectly consistent metrics.

If you require exact replication across peers, this feature will not be tolerant to network partitioning and not meet your needs.
If you require exact replication across peers, this feature will not be tolerant to network partitioning and will not meet your needs.

## Limitations

Expand Down
86 changes: 75 additions & 11 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package client

import (
"errors"
"fmt"
"github.com/mailsac/dracula/client/serverpool"
"github.com/mailsac/dracula/client/waitingmessage"
"github.com/mailsac/dracula/protocol"
"io/ioutil"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -17,11 +21,15 @@ var (
ErrMessageTimedOut = errors.New("timed out waiting for message response")
ErrClientAlreadyInit = errors.New("client already initialized")
ErrCountReturnBytesTooShort = errors.New("too few bytes returned in count callback")
ErrNoHealthyServers = errors.New("no healthy dracula servers")
)

type Client struct {
conn *net.UDPConn
remoteServer *net.UDPAddr
// conn is this clients incoming listen connection
conn *net.UDPConn
// pool is the list of servers it will communciate with
pool *serverpool.Pool
//remoteServer *net.UDPAddr
messagesWaiting *waitingmessage.ResponseCache // byte is the expected response command type

messageIDCounter uint32
Expand All @@ -31,20 +39,41 @@ type Client struct {
log *log.Logger
}

func NewClient(remoteServerIP string, remoteUDPPort int, timeout time.Duration, preSharedKey string) *Client {
func NewClient(remoteServerIPPortList string, timeout time.Duration, preSharedKey string) *Client {
var servers []*net.UDPAddr
parts := strings.Split(remoteServerIPPortList, ",")
if len(parts) < 1 {
panic("missing dracula server list on client init!")
}
for _, ipPort := range parts {
p := strings.Split(ipPort, ":")
if len(p) != 2 {
panic(fmt.Errorf("bad <ip:port> dracula client init %s", ipPort))
}
sport, err := strconv.Atoi(p[1])
if err != nil {
panic(fmt.Errorf("bad ip:<port> dracula client init %s", ipPort))
}
servers = append(servers, &net.UDPAddr{
IP: net.ParseIP(p[0]),
Port: sport,
})
}
c := &Client{
remoteServer: &net.UDPAddr{
Port: remoteUDPPort,
IP: net.ParseIP(remoteServerIP),
},
preSharedKey: []byte(preSharedKey),
messagesWaiting: waitingmessage.NewCache(timeout),
log: log.New(os.Stdout, "", 0),
}
c.pool = serverpool.NewPool(c, servers)

c.DebugDisable()
return c
}

func (c *Client) GetConn() *net.UDPConn {
return c.conn
}

func (c *Client) DebugEnable(prefix string) {
c.log.SetOutput(os.Stdout)
c.log.SetPrefix(prefix + " ")
Expand Down Expand Up @@ -76,6 +105,9 @@ func (c *Client) Listen(localUDPPort int) error {
go c.handleResponsesForever()
go c.handleTimeouts()

c.pool.Listen()
c.log.Printf("client created server pool %v\n", c.pool.ListServers())

return nil
}

Expand All @@ -87,6 +119,9 @@ func (c *Client) Close() error {
c.disposed = true
c.messagesWaiting.Dispose()

if c.pool != nil {
c.pool.Dispose()
}
if c.conn != nil {
err = c.conn.Close()
if err != nil {
Expand Down Expand Up @@ -187,6 +222,26 @@ func (c *Client) Count(namespace, entryKey string) (int, error) {
return int(output), err
}

// Healthcheck implements serverpool.Checker
func (c *Client) Healthcheck(specificServer *net.UDPAddr) error {
messageID := c.makeMessageID()
var wg sync.WaitGroup
var err error
cb := func(b []byte, e error) {
if e != nil {
err = e
}
wg.Done()
}
wg.Add(1)
// callback has been setup, now make the request
p := protocol.NewPacketFromParts(protocol.CmdCount, messageID, []byte("server_healthcheck_"+specificServer.String()), []byte("check"), c.preSharedKey)
c._send(p, specificServer, cb)

wg.Wait() // wait for callback to be called
return err
}

// CountNamespace (expensive) returns the number of key entries across all keys in a namespace.
func (c *Client) CountNamespace(namespace string) (int, error) {
messageID := c.makeMessageID()
Expand Down Expand Up @@ -257,8 +312,8 @@ func (c *Client) Put(namespace, value string) error {
return err
}

func (c *Client) sendOrCallbackErr(packet *protocol.Packet, cb waitingmessage.Callback) {
c.log.Println("client sending packet:", c.remoteServer, string(packet.Command), packet.MessageID, packet.NamespaceString(), packet.DataValueString())
func (c *Client) _send(packet *protocol.Packet, remoteServer *net.UDPAddr, cb waitingmessage.Callback) {
c.log.Println("client sending packet:", remoteServer, string(packet.Command), packet.MessageID, packet.NamespaceString(), packet.DataValueString())

b, err := packet.Bytes()
if err != nil {
Expand All @@ -274,12 +329,12 @@ func (c *Client) sendOrCallbackErr(packet *protocol.Packet, cb waitingmessage.Ca
return
}

_, err = c.conn.WriteToUDP(b, c.remoteServer)
_, err = c.conn.WriteToUDP(b, remoteServer)
if err != nil {
// immediate failure, handle here
reCall, pullErr := c.messagesWaiting.Pull(packet.MessageID)
if pullErr != nil {
c.log.Println("client failed callback could not be called!", c.remoteServer, string(packet.Command), packet.MessageID, packet.NamespaceString(), packet.DataValueString())
c.log.Println("client failed callback could not be called!", remoteServer, string(packet.Command), packet.MessageID, packet.NamespaceString(), packet.DataValueString())
reCall = cb
}
reCall([]byte{}, err)
Expand All @@ -288,3 +343,12 @@ func (c *Client) sendOrCallbackErr(packet *protocol.Packet, cb waitingmessage.Ca

// ok
}

func (c *Client) sendOrCallbackErr(packet *protocol.Packet, cb waitingmessage.Callback) {
remoteServer := c.pool.Choose()
if remoteServer == nil {
cb([]byte{}, ErrNoHealthyServers)
return
}
c._send(packet, remoteServer, cb)
}
49 changes: 42 additions & 7 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@ func TestClient_Auth(t *testing.T) {
// there are already tests with empty secret as well

secret := "asdf-jkl-HOHOHO!"
badSecret := "Brute-Force9"
s := server.NewServer(60, secret)
s.DebugEnable("9000")
err := s.Listen(9000)
if err != nil {
t.Fatal(err)
}
s.DebugEnable("9000")
defer s.Close()

goodClient := NewClient("127.0.0.1", 9000, 5, secret)
goodClient := NewClient("127.0.0.1:9000", 5, secret)
goodClient.DebugEnable("9001")
err = goodClient.Listen(9001)
if err != nil {
t.Fatal(err)
}
goodClient.DebugEnable("9001")
defer goodClient.Close()

badClient := NewClient("127.0.0.1", 9000, 5, badSecret)
// START with good secret so it can connect to server in pool, then switch to bad later
badClient := NewClient("127.0.0.1:9000", 5, secret)
err = badClient.Listen(9002)
if err != nil {
t.Fatal(err)
Expand All @@ -54,13 +54,48 @@ func TestClient_Auth(t *testing.T) {
}

// bad client checks, put same and count same

// pre-check
assert.Equal(t, "[127.0.0.1:9000]", badClient.pool.ListHealthy())
// change to BAD secret!
badClient.preSharedKey = []byte("Brute-Force9")
err = badClient.Put("asdf", "99.33.22.44")
assert.Error(t, err)
assert.Equal(t, "auth failed: packet hash invalid", err.Error())
}

func TestClient_Healthcheck(t *testing.T) {
s1 := server.NewServer(60, "sec1")
s1.DebugEnable("9000")
err := s1.Listen(9000)
if err != nil {
t.Fatal(err)
}
defer s1.Close()

s2 := server.NewServer(60, "sec1")
s2.DebugEnable("9100")
err = s2.Listen(9100)
if err != nil {
t.Fatal(err)
}
defer s2.Close()

c1 := NewClient("127.0.0.1:9000,127.0.0.1:9100,127.0.0.1:99999", 5, "sec1")
c1.pool.Debug = true
c1.DebugEnable("9001")
err = c1.Listen(9001)
if err != nil {
t.Fatal(err)
}
defer c1.Close()
assert.Equal(t, "[127.0.0.1:9000 127.0.0.1:9100 127.0.0.1:99999]", c1.pool.ListServers(), "did not parse servers correctly")
assert.Equal(t, "[127.0.0.1:9000 127.0.0.1:9100]", c1.pool.ListHealthy())
assert.Equal(t, "[127.0.0.1:99999]", c1.pool.ListUnHealthy())
}

func TestClient_messageIDOverflow(t *testing.T) {
cl := NewClient("127.0.0.1", 9000, time.Second*5, "")
cl := NewClient("127.0.0.1:9000", time.Second*5, "")
cl.messageIDCounter = math.MaxUint32 - 1
actual := protocol.Uint32FromBytes(cl.makeMessageID())
assert.Equal(t, uint32(math.MaxUint32), actual)
Expand All @@ -71,7 +106,7 @@ func TestClient_messageIDOverflow(t *testing.T) {
}

func TestClient_messageIDThreadSafe(t *testing.T) {
cl := NewClient("127.0.0.1", 9000, time.Second*5, "")
cl := NewClient("127.0.0.1:9000", time.Second*5, "")
var wg sync.WaitGroup
const expected uint32 = 5001

Expand Down
Loading

0 comments on commit 044cef6

Please sign in to comment.