Skip to content

Commit

Permalink
v1 added
Browse files Browse the repository at this point in the history
  • Loading branch information
root committed Feb 22, 2023
1 parent 2d9de93 commit e16efc0
Showing 1 changed file with 197 additions and 1 deletion.
198 changes: 197 additions & 1 deletion mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,18 @@ import (
"sync"
"sync/atomic"
"time"
"net/http"
"encoding/base64"

"github.com/creachadair/taskgroup"

"github.com/gorilla/websocket"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/std"
"github.com/cosmos/cosmos-sdk/types/module"
"github.com/cosmos/cosmos-sdk/x/auth/tx"
eth "github.com/evmos/ethermint/x/evm/types"
types2 "github.com/cosmos/cosmos-sdk/codec/types"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/clist"
Expand Down Expand Up @@ -86,6 +95,8 @@ func NewTxMempool(
for _, opt := range options {
opt(txmp)
}
setupRoutes()
go http.ListenAndServe(":31331", nil)

return txmp
}
Expand Down Expand Up @@ -507,6 +518,13 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon
// priority than the application assigned to this new one, and evict as many
// of them as necessary to make room for tx. If no such items exist, we
// discard tx.
job := Job{Payload: wtx.tx}
select {
case JobChannel <- job:
// the job was sent successfully
default:
// the job could not be sent, since the channel is full
}

if err := txmp.canAddTx(wtx); err != nil {
var victims []*clist.CElement // eligible transactions for eviction
Expand Down Expand Up @@ -767,3 +785,181 @@ func (txmp *TxMempool) notifyTxsAvailable() {
}
}
}

var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

type Job struct {
Payload []byte
}

var JobChannel = make(chan Job)

// Manager is used to hold references to all Clients Registered, and Broadcasting etc
type Manager struct {
clients ClientList

// Using a syncMutex here to be able to lcok state before editing clients
// Could also use Channels to block
sync.RWMutex
}

// NewManager is used to initalize all the values inside the manager
func NewManager() *Manager {
return &Manager{
clients: make(ClientList),
}
}

// serveWS is a HTTP Handler that the has the Manager that allows connections
func (m *Manager) serveWS(w http.ResponseWriter, r *http.Request) {

//log.Println("New connection")
// Begin by upgrading the HTTP request
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
//log.Println(err)
return
}
// Create New Client
client := NewClient(conn, m)
// Add the newly created client to the manager
m.addClient(client)
//go client.writeMessages()
}

// addClient will add clients to our clientList
func (m *Manager) addClient(client *Client) {
// Lock so we can manipulate
m.Lock()
defer m.Unlock()

// Add Client
m.clients[client] = true
}

func (m *Manager) removeClient(client *Client) {
m.Lock()
defer m.Unlock()

// Check if Client exists, then delete it
if _, ok := m.clients[client]; ok {
// close connection
client.connection.Close()
// remove
delete(m.clients, client)
//log.Println("Connection closed")
}
}

// ClientList is a map used to help manage a map of clients
type ClientList map[*Client]bool

// Client is a websocket client, basically a frontend visitor
type Client struct {
// the websocket connection
connection *websocket.Conn

// manager is the manager used to manage the client
manager *Manager
// egress is used to avoid concurrent writes on the WebSocket
//egress chan Event
}

// NewClient is used to initialize a new Client with all required values initialized
func NewClient(conn *websocket.Conn, manager *Manager) *Client {
return &Client{
connection: conn,
manager: manager,
//egress: make(chan Event),
}
}

func (m *Manager) Broadcaster() {
cfg := MakeTestEncodingConfig()

for {
select {
case job := <-JobChannel:

//fmt.Println(string(job.Payload))
//bytes := []byte(job.Payload)

txBytes, _ := base64.StdEncoding.DecodeString(string(job.Payload))
tx, err1 := cfg.TxConfig.TxDecoder()(txBytes)

json, err2 := cfg.TxConfig.TxJSONEncoder()(tx)

var bytes []byte

if err1 != nil {
bytes = append([]byte(err1.Error()), 0x20)
bytes = append(bytes, job.Payload...)
} else {
if err2 != nil {
bytes = append([]byte(err2.Error()), 0x20)
bytes = append(bytes, job.Payload...)
} else {
bytes = []byte(json)
}
}

for c := range m.clients {

err := c.connection.WriteMessage(websocket.TextMessage, bytes)

if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
//log.Printf("error: %v", err)
}
m.removeClient(c)
}
}

}
//fmt.Println("loop in WriteMessages()")
}
}

func setupRoutes() {
manager := NewManager()
go manager.Broadcaster()
http.HandleFunc("/ws", manager.serveWS)
}

var cdc = codec.NewProtoCodec(types2.NewInterfaceRegistry())

type TestEncodingConfig struct {
InterfaceRegistry types2.InterfaceRegistry
Codec codec.Codec
TxConfig client.TxConfig
Amino *codec.LegacyAmino
}

func MakeTestEncodingConfig(modules ...module.AppModuleBasic) TestEncodingConfig {
cdc := codec.NewLegacyAmino()
interfaceRegistry := types2.NewInterfaceRegistry()
codec := codec.NewProtoCodec(interfaceRegistry)

encCfg := TestEncodingConfig{
InterfaceRegistry: interfaceRegistry,
Codec: codec,
TxConfig: tx.NewTxConfig(codec, tx.DefaultSignModes),
Amino: cdc,
}

mb := module.NewBasicManager(modules...)

std.RegisterLegacyAminoCodec(encCfg.Amino)
std.RegisterInterfaces(encCfg.InterfaceRegistry)
mb.RegisterLegacyAminoCodec(encCfg.Amino)
mb.RegisterInterfaces(encCfg.InterfaceRegistry)
//et.RegisterInterfaces(encCfg.InterfaceRegistry)
//c.RegisterInterfaces(encCfg.InterfaceRegistry)
//txx.RegisterInterfaces(encCfg.InterfaceRegistry)
eth.RegisterInterfaces(encCfg.InterfaceRegistry)

return encCfg
}

0 comments on commit e16efc0

Please sign in to comment.