Skip to content

Commit

Permalink
feat: connection management in web ui (#286)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 authored Feb 4, 2024
1 parent 77a1a71 commit 751ee92
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 91 deletions.
89 changes: 74 additions & 15 deletions internal/cmgr/cmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,55 @@ import (
"github.com/Ehco1996/ehco/internal/conn"
)

const (
ConnectionTypeActive = "active"
ConnectionTypeClosed = "closed"
)

// connection manager interface
type Cmgr interface {
ListConnections(page, pageSize int) []conn.RelayConn
ListConnections(connType string, page, pageSize int) []conn.RelayConn

// AddConnection adds a connection to the connection manager.
AddConnection(conn conn.RelayConn)

CountConnection() int
// RemoveConnection removes a connection from the connection manager.
RemoveConnection(conn conn.RelayConn)

// CountConnection returns the number of active connections.
CountConnection(connType string) int
}

type cmgrImpl struct {
lock sync.RWMutex

// k: relay label, v: connection list
connectionsMap map[string][]conn.RelayConn
activeConnectionsMap map[string][]conn.RelayConn
closedConnectionsMap map[string][]conn.RelayConn
}

func NewCmgr() Cmgr {
return &cmgrImpl{
connectionsMap: make(map[string][]conn.RelayConn),
activeConnectionsMap: make(map[string][]conn.RelayConn),
closedConnectionsMap: make(map[string][]conn.RelayConn),
}
}

func (cm *cmgrImpl) ListConnections(page, pageSize int) []conn.RelayConn {
func (cm *cmgrImpl) ListConnections(connType string, page, pageSize int) []conn.RelayConn {
cm.lock.RLock()
defer cm.lock.RUnlock()

total := cm.CountConnection()
var total int
var m map[string][]conn.RelayConn

if connType == ConnectionTypeActive {
total = cm.countActiveConnection()
m = cm.activeConnectionsMap
} else {
total = cm.countClosedConnection()
m = cm.closedConnectionsMap

}

start := (page - 1) * pageSize
if start > total {
Expand All @@ -44,17 +65,16 @@ func (cm *cmgrImpl) ListConnections(page, pageSize int) []conn.RelayConn {
if end > total {
end = total
}

relayLabelList := make([]string, 0, len(cm.connectionsMap))
for k := range cm.connectionsMap {
relayLabelList := make([]string, 0, len(m))
for k := range m {
relayLabelList = append(relayLabelList, k)
}
// Sort the relay label list to make the result more predictable
sort.Strings(relayLabelList)

var conns []conn.RelayConn
for _, label := range relayLabelList {
conns = append(conns, cm.connectionsMap[label]...)
conns = append(conns, m[label]...)
}
if end > len(conns) {
end = len(conns) // Don't let the end index be more than slice length
Expand All @@ -67,17 +87,56 @@ func (cm *cmgrImpl) AddConnection(c conn.RelayConn) {
defer cm.lock.Unlock()
label := c.GetRelayLabel()

if _, ok := cm.connectionsMap[label]; !ok {
cm.connectionsMap[label] = []conn.RelayConn{}
if _, ok := cm.activeConnectionsMap[label]; !ok {
cm.activeConnectionsMap[label] = []conn.RelayConn{}
}
cm.activeConnectionsMap[label] = append(cm.activeConnectionsMap[label], c)
}

func (cm *cmgrImpl) RemoveConnection(c conn.RelayConn) {
cm.lock.Lock()
defer cm.lock.Unlock()

label := c.GetRelayLabel()
connections, ok := cm.activeConnectionsMap[label]
if !ok {
return // If the label doesn't exist, nothing to remove
}
cm.connectionsMap[label] = append(cm.connectionsMap[label], c)

// Find and remove the connection from activeConnectionsMap
for i, activeConn := range connections {
if activeConn == c {
cm.activeConnectionsMap[label] = append(connections[:i], connections[i+1:]...)
break
}
}
// Add to closedConnectionsMap
cm.closedConnectionsMap[label] = append(cm.closedConnectionsMap[label], c)
}

func (cm *cmgrImpl) CountConnection(connType string) int {
if connType == ConnectionTypeActive {
return cm.countActiveConnection()
} else {
return cm.countClosedConnection()
}
}

func (cm *cmgrImpl) countActiveConnection() int {
cm.lock.RLock()
defer cm.lock.RUnlock()
cnt := 0
for _, v := range cm.activeConnectionsMap {
cnt += len(v)
}
return cnt
}

func (cm *cmgrImpl) CountConnection() int {
func (cm *cmgrImpl) countClosedConnection() int {
cm.lock.RLock()
defer cm.lock.RUnlock()
cnt := 0
for _, v := range cm.connectionsMap {
for _, v := range cm.closedConnectionsMap {
cnt += len(v)
}
return cnt
Expand Down
22 changes: 19 additions & 3 deletions internal/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package conn
import (
"fmt"
"net"
"time"

"go.uber.org/zap"
)
Expand All @@ -19,7 +20,7 @@ type RelayConn interface {
func NewRelayConn(relayName string, clientConn, remoteConn net.Conn) RelayConn {
return &relayConnImpl{
RelayLabel: relayName,
Stats: &Stats{},
Stats: &Stats{Up: 0, Down: 0},

clientConn: clientConn,
remoteConn: remoteConn,
Expand All @@ -29,7 +30,11 @@ func NewRelayConn(relayName string, clientConn, remoteConn net.Conn) RelayConn {
type relayConnImpl struct {
RelayLabel string `json:"relay_label"`
Closed bool `json:"closed"`
Stats *Stats `json:"stats"`

StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time,omitempty"`

Stats *Stats `json:"stats"`

clientConn net.Conn
remoteConn net.Conn
Expand All @@ -52,20 +57,31 @@ func (rc *relayConnImpl) Transport(remoteLabel string) error {
remoteLabel: remoteLabel,
underlyingConn: rc.remoteConn,
}

rc.StartTime = time.Now()
err := CopyConn(c1, c2)
if err != nil {
cl.Error("transport error", zap.Error(err))
}
cl.Debug("transport end", zap.String("stats", rc.Stats.String()))
rc.Closed = true
rc.EndTime = time.Now()
return err
}

func (rc *relayConnImpl) GetTime() string {
if rc.EndTime.IsZero() {
return fmt.Sprintf("%s - N/A", rc.StartTime.Format(time.Stamp))
}
return fmt.Sprintf("%s - %s", rc.StartTime.Format(time.Stamp), rc.EndTime.Format(time.Stamp))
}

func (rc *relayConnImpl) Name() string {
return fmt.Sprintf("c1:[%s] c2:[%s]", connectionName(rc.clientConn), connectionName(rc.remoteConn))
}

func (rc *relayConnImpl) Flow() string {
return fmt.Sprintf("%s <-> %s", rc.clientConn.LocalAddr(), rc.remoteConn.RemoteAddr())
}
func (rc *relayConnImpl) GetRelayLabel() string {
return rc.RelayLabel
}
8 changes: 4 additions & 4 deletions internal/conn/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ type Stats struct {
Down int64 `json:"down"`
}

func (s *Stats) String() string {
return fmt.Sprintf("up: %s, down: %s", bytes.PrettyByteSize(float64(s.Up)), bytes.PrettyByteSize(float64(s.Down)))
}

func (s *Stats) Record(up, down int64) {
s.Up += up
s.Down += down
}

func (s *Stats) String() string {
return fmt.Sprintf("up: %s, down: %s", bytes.PrettyByteSize(float64(s.Up)), bytes.PrettyByteSize(float64(s.Down)))
}

type metricsConn struct {
net.Conn

Expand Down
1 change: 1 addition & 0 deletions internal/transporter/mtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (s *MTCP) HandleTCPConn(c net.Conn, remote *lb.Node) error {
s.l.Infof("HandleTCPConn from:%s to:%s", c.LocalAddr(), remote.Address)
relayConn := conn.NewRelayConn(s.relayLabel, c, mtcpc)
s.cmgr.AddConnection(relayConn)
defer s.cmgr.RemoveConnection(relayConn)
return relayConn.Transport(remote.Label)
}

Expand Down
1 change: 1 addition & 0 deletions internal/transporter/mwss.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (s *Mwss) HandleTCPConn(c net.Conn, remote *lb.Node) error {
s.l.Infof("HandleTCPConn from:%s to:%s", c.LocalAddr(), remote.Address)
relayConn := conn.NewRelayConn(s.relayLabel, c, mwsc)
s.cmgr.AddConnection(relayConn)
defer s.cmgr.RemoveConnection(relayConn)
return relayConn.Transport(remote.Label)
}

Expand Down
1 change: 1 addition & 0 deletions internal/transporter/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,5 +169,6 @@ func (raw *Raw) HandleTCPConn(c net.Conn, remote *lb.Node) error {
raw.l.Infof("HandleTCPConn from %s to %s", c.LocalAddr(), remote.Address)
relayConn := conn.NewRelayConn(raw.relayLabel, c, rc)
raw.cmgr.AddConnection(relayConn)
defer raw.cmgr.RemoveConnection(relayConn)
return relayConn.Transport(remote.Label)
}
1 change: 1 addition & 0 deletions internal/transporter/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (s *Ws) HandleTCPConn(c net.Conn, remote *lb.Node) error {
s.l.Infof("HandleTCPConn from %s to %s", c.LocalAddr(), remote.Address)
relayConn := conn.NewRelayConn(s.relayLabel, c, wsc)
s.cmgr.AddConnection(relayConn)
defer s.cmgr.RemoveConnection(relayConn)
return relayConn.Transport(remote.Label)
}

Expand Down
1 change: 1 addition & 0 deletions internal/transporter/wss.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (s *Wss) HandleTCPConn(c net.Conn, remote *lb.Node) error {

relayConn := conn.NewRelayConn(s.relayLabel, c, wssc)
s.cmgr.AddConnection(relayConn)
defer s.cmgr.RemoveConnection(relayConn)
return relayConn.Transport(remote.Label)
}

Expand Down
31 changes: 20 additions & 11 deletions internal/web/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,27 +110,36 @@ func (s *Server) ListConnections(c echo.Context) error {
if err != nil || page < 1 {
page = 1
}

pageSizeStr := c.QueryParam("page_size")
pageSize, err := strconv.Atoi(pageSizeStr)
if err != nil || pageSize < 1 {
pageSize = defaultPageSize
}

total := s.connMgr.CountConnection()

perv := 1
connType := c.QueryParam("conn_type")
total := s.connMgr.CountConnection(connType)
perv := 0
if page > 1 {
perv = page - 1
}
next := page
if page*pageSize < total {
next := 0
if page*pageSize < total && page*pageSize > 0 {
next = page + 1
}

activeCount := s.connMgr.CountConnection("active")
closedCount := s.connMgr.CountConnection("closed")

return c.Render(http.StatusOK, "connection.html", map[string]interface{}{
"Data": s.connMgr.ListConnections(page, pageSize),
"Prev": perv,
"Next": next,
"Count": total,
"ConnType": connType,
"ConnectionList": s.connMgr.ListConnections(connType, page, pageSize),
"CurrentPage": page,
"TotalPage": total / pageSize,
"PageSize": pageSize,
"Prev": perv,
"Next": next,
"Count": total,
"ActiveCount": activeCount,
"ClosedCount": closedCount,
"AllCount": s.connMgr.CountConnection("active") + s.connMgr.CountConnection("closed"),
})
}
Loading

0 comments on commit 751ee92

Please sign in to comment.