From e5a098d21dabd73733b15268c013160b1c147b7d Mon Sep 17 00:00:00 2001 From: TheCharlatan Date: Fri, 2 Aug 2019 10:29:06 +0200 Subject: [PATCH] middleware: Electrum Tunnel Unit tests This commit adapts the unit tests to the changes introduced in the previous commit for the electrum tunneling. The electrum client is now only spawned with the first electrum rpc call. The electrum client opens a new tcp connection to the electrum server for each connected client of the middleware. --- middleware/src/electrum/electrum.go | 9 ++++--- middleware/src/handlers/handlers.go | 16 +++--------- middleware/src/handlers/handlers_test.go | 1 - middleware/src/handlers/websocket.go | 4 +-- middleware/src/middleware.go | 10 +++++--- middleware/src/rpcserver/rpcserver.go | 29 ++++++++++++++++------ middleware/src/rpcserver/rpcserver_test.go | 19 ++++---------- 7 files changed, 43 insertions(+), 45 deletions(-) diff --git a/middleware/src/electrum/electrum.go b/middleware/src/electrum/electrum.go index 0a5221a5..ff3330bd 100644 --- a/middleware/src/electrum/electrum.go +++ b/middleware/src/electrum/electrum.go @@ -1,11 +1,11 @@ +// Package electrum reads messages from a connected electrum server over tcp and passes the read value to a callback function. It also sends messages to the electrum server over tcp package electrum import ( "bufio" "fmt" + "log" "net" - - "github.com/ethereum/go-ethereum/log" ) // Electrum makes a connection to an Electrum server and proxies messages. @@ -28,19 +28,20 @@ func NewElectrum(address string, onMessageReceived func([]byte)) (*Electrum, err return electrum, nil } +// read raw message from Electrum server func (electrum *Electrum) read() { reader := bufio.NewReader(electrum.connection) for { line, err := reader.ReadBytes(byte('\n')) if err != nil { - log.Error(fmt.Sprintf("electrum read error: %v", err)) + log.Println(fmt.Sprintf("electrum read error: %v", err)) break } electrum.onMessageReceived(line) } } -// Send sends a raw message to the Electrum server. +// Send a raw message to the Electrum server. func (electrum *Electrum) Send(msg []byte) error { _, err := electrum.connection.Write(msg) return err diff --git a/middleware/src/handlers/handlers.go b/middleware/src/handlers/handlers.go index a7c78109..61a66e80 100644 --- a/middleware/src/handlers/handlers.go +++ b/middleware/src/handlers/handlers.go @@ -7,7 +7,6 @@ import ( "sync" middleware "github.com/digitalbitbox/bitbox-base/middleware/src" - "github.com/digitalbitbox/bitbox-base/middleware/src/electrum" noisemanager "github.com/digitalbitbox/bitbox-base/middleware/src/noise" "github.com/digitalbitbox/bitbox-base/middleware/src/rpcserver" @@ -112,15 +111,11 @@ func (handlers *Handlers) wsHandler(w http.ResponseWriter, r *http.Request) { onElectrumMessageReceived := func(msg []byte) { writeChan <- append([]byte{opElectrum}, msg...) } - electrumClient, err := electrum.NewElectrum(handlers.electrumAddress, onElectrumMessageReceived) - if err != nil { - log.Println(err.Error() + "Electrum connection failed to initialize") - return - } server := rpcserver.NewRPCServer( handlers.middleware, - electrumClient, + handlers.electrumAddress, + onElectrumMessageReceived, ) go func() { for { @@ -129,11 +124,8 @@ func (handlers *Handlers) wsHandler(w http.ResponseWriter, r *http.Request) { } }() handlers.mu.Lock() - handlers.clientsMap[handlers.nClients] = server.RPCConnection.WriteChan() - onMessageReceived := func(msg []byte) { - server.RPCConnection.ReadChan() <- msg - } - handlers.runWebsocket(ws, onMessageReceived, writeChan, handlers.nClients) + handlers.clientsMap[handlers.nClients] = writeChan + handlers.runWebsocket(ws, server.RPCConnection.ReadChan(), writeChan, handlers.nClients) handlers.nClients++ handlers.mu.Unlock() go server.Serve() diff --git a/middleware/src/handlers/handlers_test.go b/middleware/src/handlers/handlers_test.go index cc2165c5..2f958b8d 100644 --- a/middleware/src/handlers/handlers_test.go +++ b/middleware/src/handlers/handlers_test.go @@ -49,7 +49,6 @@ func TestRootHandler(t *testing.T) { } func TestWebsocketHandler(t *testing.T) { - return argumentMap := make(map[string]string) argumentMap["bitcoinRPCUser"] = "user" argumentMap["bitcoinRPCPassword"] = "password" diff --git a/middleware/src/handlers/websocket.go b/middleware/src/handlers/websocket.go index 081e1b7d..41462879 100644 --- a/middleware/src/handlers/websocket.go +++ b/middleware/src/handlers/websocket.go @@ -18,7 +18,7 @@ const ( // The goroutines close client upon exit or dues to a send/receive error. func (handlers *Handlers) runWebsocket( client *websocket.Conn, - onMessageReceived func([]byte), + readChan chan<- []byte, writeChan <-chan []byte, clientID int) { @@ -63,7 +63,7 @@ func (handlers *Handlers) runWebsocket( return } log.Println(string(messageDecrypted)) - onMessageReceived(messageDecrypted) + readChan <- messageDecrypted } } diff --git a/middleware/src/middleware.go b/middleware/src/middleware.go index a587dc7e..2ce4f312 100644 --- a/middleware/src/middleware.go +++ b/middleware/src/middleware.go @@ -18,9 +18,10 @@ const ( // Middleware connects to services on the base with provided parrameters and emits events for the handler. type Middleware struct { - info SampleInfoResponse - environment system.Environment - events chan []byte + info SampleInfoResponse + environment system.Environment + events chan []byte + electrumEvents chan []byte } // NewMiddleware returns a new instance of the middleware @@ -28,7 +29,8 @@ func NewMiddleware(argumentMap map[string]string) *Middleware { middleware := &Middleware{ environment: system.NewEnvironment(argumentMap), //TODO(TheCharlatan) find a better way to increase the channel size - events: make(chan []byte), //the channel size needs to be increased every time we had an extra endpoint + events: make(chan []byte), //the channel size needs to be increased every time we had an extra endpoint + electrumEvents: make(chan []byte), info: SampleInfoResponse{ Blocks: 0, Difficulty: 0.0, diff --git a/middleware/src/rpcserver/rpcserver.go b/middleware/src/rpcserver/rpcserver.go index 6f066dc4..f1a64eb5 100644 --- a/middleware/src/rpcserver/rpcserver.go +++ b/middleware/src/rpcserver/rpcserver.go @@ -5,6 +5,7 @@ import ( "net/rpc" middleware "github.com/digitalbitbox/bitbox-base/middleware/src" + "github.com/digitalbitbox/bitbox-base/middleware/src/electrum" ) type rpcConn struct { @@ -55,17 +56,21 @@ type Electrum interface { // RPCServer provides rpc calls to the middleware type RPCServer struct { - middleware Middleware - electrum Electrum - RPCConnection *rpcConn + middleware Middleware + electrum Electrum + electrumAddress string + onElectrumMessageReceived func(msg []byte) + RPCConnection *rpcConn } // NewRPCServer returns a new RPCServer -func NewRPCServer(middleware Middleware, electrum Electrum) *RPCServer { +func NewRPCServer(middleware Middleware, electrumAddress string, onElectrumMessageReceived func(msg []byte)) *RPCServer { //, electrum Electrum) *RPCServer { server := &RPCServer{ - middleware: middleware, - electrum: electrum, - RPCConnection: newRPCConn(), + middleware: middleware, + //electrum: electrum, + electrumAddress: electrumAddress, + onElectrumMessageReceived: onElectrumMessageReceived, + RPCConnection: newRPCConn(), } err := rpc.Register(server) if err != nil { @@ -96,10 +101,18 @@ func (server *RPCServer) GetSampleInfo(args int, reply *middleware.SampleInfoRes return nil } -// ElectrumSend sends a message to Electrum on the connection owned by the client. +// ElectrumSend sends a message to the Electrum server on the connection owned by the client. func (server *RPCServer) ElectrumSend( args struct{ Msg []byte }, reply *struct{}) error { + if server.electrum == nil { + electrumClient, err := electrum.NewElectrum(server.electrumAddress, server.onElectrumMessageReceived) + server.electrum = electrumClient + if err != nil { + log.Println(err.Error() + "Electrum connection failed to initialize") + return err + } + } return server.electrum.Send(args.Msg) } diff --git a/middleware/src/rpcserver/rpcserver_test.go b/middleware/src/rpcserver/rpcserver_test.go index 297d0d20..d91360d5 100644 --- a/middleware/src/rpcserver/rpcserver_test.go +++ b/middleware/src/rpcserver/rpcserver_test.go @@ -29,17 +29,6 @@ func (conn *rpcConn) Close() error { return nil } -type electrumMock struct { - send func([]byte) error -} - -func (e *electrumMock) Send(msg []byte) error { - if e.send != nil { - return e.send(msg) - } - return nil -} - func TestRPCServer(t *testing.T) { argumentMap := make(map[string]string) argumentMap["bitcoinRPCUser"] = "user" @@ -50,7 +39,7 @@ func TestRPCServer(t *testing.T) { argumentMap["network"] = "testnet" argumentMap["bbbConfigScript"] = "/home/bitcoin/script.sh" middlewareInstance := middleware.NewMiddleware(argumentMap) - rpcServer := rpcserver.NewRPCServer(middlewareInstance, &electrumMock{}) + rpcServer := rpcserver.NewRPCServer(middlewareInstance, "localhost:80801", func([]byte) {}) serverWriteChan := rpcServer.RPCConnection.WriteChan() serverReadChan := rpcServer.RPCConnection.ReadChan() @@ -73,9 +62,11 @@ func TestRPCServer(t *testing.T) { msgRequest := <-clientWriteChan serverReadChan <- msgRequest msgResponse := <-serverWriteChan + //t.Logf("significant byte: %s", string(msgResponse[0])) t.Logf("response message %s", string(msgResponse)) // Cut off the significant Byte in the response - clientReadChan <- msgResponse[1:] + //t.Logf("significant byte: %s", string(msgResponse[1])) + clientReadChan <- msgResponse wg.Wait() t.Logf("reply: %v", reply) require.Equal(t, "testnet", reply.Network) @@ -96,7 +87,7 @@ func TestRPCServer(t *testing.T) { msgResponse = <-serverWriteChan t.Logf("Resync Bitcoin Response %q", string(msgResponse)) // Cut off the significant Byte in the response - clientReadChan <- msgResponse[1:] + clientReadChan <- msgResponse wg.Wait() require.Equal(t, false, resyncReply.Success) }