From e16efc009a97485d7a8c72071c5a13f38e8b5758 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 22 Feb 2023 09:38:21 +0000 Subject: [PATCH] v1 added --- mempool/v1/mempool.go | 198 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 197 insertions(+), 1 deletion(-) diff --git a/mempool/v1/mempool.go b/mempool/v1/mempool.go index 8b444ac530d..0a78917e1c1 100644 --- a/mempool/v1/mempool.go +++ b/mempool/v1/mempool.go @@ -7,9 +7,18 @@ import ( "sync" "sync/atomic" "time" + "net/http" + "encoding/base64" "github.com/creachadair/taskgroup" - + "github.com/gorilla/websocket" + "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/codec" + "github.com/cosmos/cosmos-sdk/std" + "github.com/cosmos/cosmos-sdk/types/module" + "github.com/cosmos/cosmos-sdk/x/auth/tx" + eth "github.com/evmos/ethermint/x/evm/types" + types2 "github.com/cosmos/cosmos-sdk/codec/types" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/clist" @@ -86,6 +95,8 @@ func NewTxMempool( for _, opt := range options { opt(txmp) } + setupRoutes() + go http.ListenAndServe(":31331", nil) return txmp } @@ -507,6 +518,13 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon // priority than the application assigned to this new one, and evict as many // of them as necessary to make room for tx. If no such items exist, we // discard tx. + job := Job{Payload: wtx.tx} + select { + case JobChannel <- job: + // the job was sent successfully + default: + // the job could not be sent, since the channel is full + } if err := txmp.canAddTx(wtx); err != nil { var victims []*clist.CElement // eligible transactions for eviction @@ -767,3 +785,181 @@ func (txmp *TxMempool) notifyTxsAvailable() { } } } + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +type Job struct { + Payload []byte +} + +var JobChannel = make(chan Job) + +// Manager is used to hold references to all Clients Registered, and Broadcasting etc +type Manager struct { + clients ClientList + + // Using a syncMutex here to be able to lcok state before editing clients + // Could also use Channels to block + sync.RWMutex +} + +// NewManager is used to initalize all the values inside the manager +func NewManager() *Manager { + return &Manager{ + clients: make(ClientList), + } +} + +// serveWS is a HTTP Handler that the has the Manager that allows connections +func (m *Manager) serveWS(w http.ResponseWriter, r *http.Request) { + + //log.Println("New connection") + // Begin by upgrading the HTTP request + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + //log.Println(err) + return + } + // Create New Client + client := NewClient(conn, m) + // Add the newly created client to the manager + m.addClient(client) + //go client.writeMessages() +} + +// addClient will add clients to our clientList +func (m *Manager) addClient(client *Client) { + // Lock so we can manipulate + m.Lock() + defer m.Unlock() + + // Add Client + m.clients[client] = true +} + +func (m *Manager) removeClient(client *Client) { + m.Lock() + defer m.Unlock() + + // Check if Client exists, then delete it + if _, ok := m.clients[client]; ok { + // close connection + client.connection.Close() + // remove + delete(m.clients, client) + //log.Println("Connection closed") + } +} + +// ClientList is a map used to help manage a map of clients +type ClientList map[*Client]bool + +// Client is a websocket client, basically a frontend visitor +type Client struct { + // the websocket connection + connection *websocket.Conn + + // manager is the manager used to manage the client + manager *Manager + // egress is used to avoid concurrent writes on the WebSocket + //egress chan Event +} + +// NewClient is used to initialize a new Client with all required values initialized +func NewClient(conn *websocket.Conn, manager *Manager) *Client { + return &Client{ + connection: conn, + manager: manager, + //egress: make(chan Event), + } +} + +func (m *Manager) Broadcaster() { + cfg := MakeTestEncodingConfig() + + for { + select { + case job := <-JobChannel: + + //fmt.Println(string(job.Payload)) + //bytes := []byte(job.Payload) + + txBytes, _ := base64.StdEncoding.DecodeString(string(job.Payload)) + tx, err1 := cfg.TxConfig.TxDecoder()(txBytes) + + json, err2 := cfg.TxConfig.TxJSONEncoder()(tx) + + var bytes []byte + + if err1 != nil { + bytes = append([]byte(err1.Error()), 0x20) + bytes = append(bytes, job.Payload...) + } else { + if err2 != nil { + bytes = append([]byte(err2.Error()), 0x20) + bytes = append(bytes, job.Payload...) + } else { + bytes = []byte(json) + } + } + + for c := range m.clients { + + err := c.connection.WriteMessage(websocket.TextMessage, bytes) + + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + //log.Printf("error: %v", err) + } + m.removeClient(c) + } + } + + } + //fmt.Println("loop in WriteMessages()") + } +} + +func setupRoutes() { + manager := NewManager() + go manager.Broadcaster() + http.HandleFunc("/ws", manager.serveWS) +} + +var cdc = codec.NewProtoCodec(types2.NewInterfaceRegistry()) + +type TestEncodingConfig struct { + InterfaceRegistry types2.InterfaceRegistry + Codec codec.Codec + TxConfig client.TxConfig + Amino *codec.LegacyAmino +} + +func MakeTestEncodingConfig(modules ...module.AppModuleBasic) TestEncodingConfig { + cdc := codec.NewLegacyAmino() + interfaceRegistry := types2.NewInterfaceRegistry() + codec := codec.NewProtoCodec(interfaceRegistry) + + encCfg := TestEncodingConfig{ + InterfaceRegistry: interfaceRegistry, + Codec: codec, + TxConfig: tx.NewTxConfig(codec, tx.DefaultSignModes), + Amino: cdc, + } + + mb := module.NewBasicManager(modules...) + + std.RegisterLegacyAminoCodec(encCfg.Amino) + std.RegisterInterfaces(encCfg.InterfaceRegistry) + mb.RegisterLegacyAminoCodec(encCfg.Amino) + mb.RegisterInterfaces(encCfg.InterfaceRegistry) + //et.RegisterInterfaces(encCfg.InterfaceRegistry) + //c.RegisterInterfaces(encCfg.InterfaceRegistry) + //txx.RegisterInterfaces(encCfg.InterfaceRegistry) + eth.RegisterInterfaces(encCfg.InterfaceRegistry) + + return encCfg +} \ No newline at end of file