diff --git a/middleware/src/electrum/electrum.go b/middleware/src/electrum/electrum.go index 0a5221a5..ff3330bd 100644 --- a/middleware/src/electrum/electrum.go +++ b/middleware/src/electrum/electrum.go @@ -1,11 +1,11 @@ +// Package electrum reads messages from a connected electrum server over tcp and passes the read value to a callback function. It also sends messages to the electrum server over tcp package electrum import ( "bufio" "fmt" + "log" "net" - - "github.com/ethereum/go-ethereum/log" ) // Electrum makes a connection to an Electrum server and proxies messages. @@ -28,19 +28,20 @@ func NewElectrum(address string, onMessageReceived func([]byte)) (*Electrum, err return electrum, nil } +// read raw message from Electrum server func (electrum *Electrum) read() { reader := bufio.NewReader(electrum.connection) for { line, err := reader.ReadBytes(byte('\n')) if err != nil { - log.Error(fmt.Sprintf("electrum read error: %v", err)) + log.Println(fmt.Sprintf("electrum read error: %v", err)) break } electrum.onMessageReceived(line) } } -// Send sends a raw message to the Electrum server. +// Send a raw message to the Electrum server. func (electrum *Electrum) Send(msg []byte) error { _, err := electrum.connection.Write(msg) return err diff --git a/middleware/src/handlers/handlers.go b/middleware/src/handlers/handlers.go index a7c78109..61a66e80 100644 --- a/middleware/src/handlers/handlers.go +++ b/middleware/src/handlers/handlers.go @@ -7,7 +7,6 @@ import ( "sync" middleware "github.com/digitalbitbox/bitbox-base/middleware/src" - "github.com/digitalbitbox/bitbox-base/middleware/src/electrum" noisemanager "github.com/digitalbitbox/bitbox-base/middleware/src/noise" "github.com/digitalbitbox/bitbox-base/middleware/src/rpcserver" @@ -112,15 +111,11 @@ func (handlers *Handlers) wsHandler(w http.ResponseWriter, r *http.Request) { onElectrumMessageReceived := func(msg []byte) { writeChan <- append([]byte{opElectrum}, msg...) } - electrumClient, err := electrum.NewElectrum(handlers.electrumAddress, onElectrumMessageReceived) - if err != nil { - log.Println(err.Error() + "Electrum connection failed to initialize") - return - } server := rpcserver.NewRPCServer( handlers.middleware, - electrumClient, + handlers.electrumAddress, + onElectrumMessageReceived, ) go func() { for { @@ -129,11 +124,8 @@ func (handlers *Handlers) wsHandler(w http.ResponseWriter, r *http.Request) { } }() handlers.mu.Lock() - handlers.clientsMap[handlers.nClients] = server.RPCConnection.WriteChan() - onMessageReceived := func(msg []byte) { - server.RPCConnection.ReadChan() <- msg - } - handlers.runWebsocket(ws, onMessageReceived, writeChan, handlers.nClients) + handlers.clientsMap[handlers.nClients] = writeChan + handlers.runWebsocket(ws, server.RPCConnection.ReadChan(), writeChan, handlers.nClients) handlers.nClients++ handlers.mu.Unlock() go server.Serve() diff --git a/middleware/src/handlers/handlers_test.go b/middleware/src/handlers/handlers_test.go index cc2165c5..2f958b8d 100644 --- a/middleware/src/handlers/handlers_test.go +++ b/middleware/src/handlers/handlers_test.go @@ -49,7 +49,6 @@ func TestRootHandler(t *testing.T) { } func TestWebsocketHandler(t *testing.T) { - return argumentMap := make(map[string]string) argumentMap["bitcoinRPCUser"] = "user" argumentMap["bitcoinRPCPassword"] = "password" diff --git a/middleware/src/handlers/websocket.go b/middleware/src/handlers/websocket.go index 081e1b7d..41462879 100644 --- a/middleware/src/handlers/websocket.go +++ b/middleware/src/handlers/websocket.go @@ -18,7 +18,7 @@ const ( // The goroutines close client upon exit or dues to a send/receive error. func (handlers *Handlers) runWebsocket( client *websocket.Conn, - onMessageReceived func([]byte), + readChan chan<- []byte, writeChan <-chan []byte, clientID int) { @@ -63,7 +63,7 @@ func (handlers *Handlers) runWebsocket( return } log.Println(string(messageDecrypted)) - onMessageReceived(messageDecrypted) + readChan <- messageDecrypted } } diff --git a/middleware/src/middleware.go b/middleware/src/middleware.go index a587dc7e..2ce4f312 100644 --- a/middleware/src/middleware.go +++ b/middleware/src/middleware.go @@ -18,9 +18,10 @@ const ( // Middleware connects to services on the base with provided parrameters and emits events for the handler. type Middleware struct { - info SampleInfoResponse - environment system.Environment - events chan []byte + info SampleInfoResponse + environment system.Environment + events chan []byte + electrumEvents chan []byte } // NewMiddleware returns a new instance of the middleware @@ -28,7 +29,8 @@ func NewMiddleware(argumentMap map[string]string) *Middleware { middleware := &Middleware{ environment: system.NewEnvironment(argumentMap), //TODO(TheCharlatan) find a better way to increase the channel size - events: make(chan []byte), //the channel size needs to be increased every time we had an extra endpoint + events: make(chan []byte), //the channel size needs to be increased every time we had an extra endpoint + electrumEvents: make(chan []byte), info: SampleInfoResponse{ Blocks: 0, Difficulty: 0.0, diff --git a/middleware/src/rpcserver/rpcserver.go b/middleware/src/rpcserver/rpcserver.go index 6f066dc4..f1a64eb5 100644 --- a/middleware/src/rpcserver/rpcserver.go +++ b/middleware/src/rpcserver/rpcserver.go @@ -5,6 +5,7 @@ import ( "net/rpc" middleware "github.com/digitalbitbox/bitbox-base/middleware/src" + "github.com/digitalbitbox/bitbox-base/middleware/src/electrum" ) type rpcConn struct { @@ -55,17 +56,21 @@ type Electrum interface { // RPCServer provides rpc calls to the middleware type RPCServer struct { - middleware Middleware - electrum Electrum - RPCConnection *rpcConn + middleware Middleware + electrum Electrum + electrumAddress string + onElectrumMessageReceived func(msg []byte) + RPCConnection *rpcConn } // NewRPCServer returns a new RPCServer -func NewRPCServer(middleware Middleware, electrum Electrum) *RPCServer { +func NewRPCServer(middleware Middleware, electrumAddress string, onElectrumMessageReceived func(msg []byte)) *RPCServer { //, electrum Electrum) *RPCServer { server := &RPCServer{ - middleware: middleware, - electrum: electrum, - RPCConnection: newRPCConn(), + middleware: middleware, + //electrum: electrum, + electrumAddress: electrumAddress, + onElectrumMessageReceived: onElectrumMessageReceived, + RPCConnection: newRPCConn(), } err := rpc.Register(server) if err != nil { @@ -96,10 +101,18 @@ func (server *RPCServer) GetSampleInfo(args int, reply *middleware.SampleInfoRes return nil } -// ElectrumSend sends a message to Electrum on the connection owned by the client. +// ElectrumSend sends a message to the Electrum server on the connection owned by the client. func (server *RPCServer) ElectrumSend( args struct{ Msg []byte }, reply *struct{}) error { + if server.electrum == nil { + electrumClient, err := electrum.NewElectrum(server.electrumAddress, server.onElectrumMessageReceived) + server.electrum = electrumClient + if err != nil { + log.Println(err.Error() + "Electrum connection failed to initialize") + return err + } + } return server.electrum.Send(args.Msg) } diff --git a/middleware/src/rpcserver/rpcserver_test.go b/middleware/src/rpcserver/rpcserver_test.go index 297d0d20..d91360d5 100644 --- a/middleware/src/rpcserver/rpcserver_test.go +++ b/middleware/src/rpcserver/rpcserver_test.go @@ -29,17 +29,6 @@ func (conn *rpcConn) Close() error { return nil } -type electrumMock struct { - send func([]byte) error -} - -func (e *electrumMock) Send(msg []byte) error { - if e.send != nil { - return e.send(msg) - } - return nil -} - func TestRPCServer(t *testing.T) { argumentMap := make(map[string]string) argumentMap["bitcoinRPCUser"] = "user" @@ -50,7 +39,7 @@ func TestRPCServer(t *testing.T) { argumentMap["network"] = "testnet" argumentMap["bbbConfigScript"] = "/home/bitcoin/script.sh" middlewareInstance := middleware.NewMiddleware(argumentMap) - rpcServer := rpcserver.NewRPCServer(middlewareInstance, &electrumMock{}) + rpcServer := rpcserver.NewRPCServer(middlewareInstance, "localhost:80801", func([]byte) {}) serverWriteChan := rpcServer.RPCConnection.WriteChan() serverReadChan := rpcServer.RPCConnection.ReadChan() @@ -73,9 +62,11 @@ func TestRPCServer(t *testing.T) { msgRequest := <-clientWriteChan serverReadChan <- msgRequest msgResponse := <-serverWriteChan + //t.Logf("significant byte: %s", string(msgResponse[0])) t.Logf("response message %s", string(msgResponse)) // Cut off the significant Byte in the response - clientReadChan <- msgResponse[1:] + //t.Logf("significant byte: %s", string(msgResponse[1])) + clientReadChan <- msgResponse wg.Wait() t.Logf("reply: %v", reply) require.Equal(t, "testnet", reply.Network) @@ -96,7 +87,7 @@ func TestRPCServer(t *testing.T) { msgResponse = <-serverWriteChan t.Logf("Resync Bitcoin Response %q", string(msgResponse)) // Cut off the significant Byte in the response - clientReadChan <- msgResponse[1:] + clientReadChan <- msgResponse wg.Wait() require.Equal(t, false, resyncReply.Success) }