From eca4c2acf79cb4f677637a159fd7a6cafbad171c Mon Sep 17 00:00:00 2001 From: WendelHime <6754291+WendelHime@users.noreply.github.com> Date: Fri, 31 May 2024 17:07:02 -0300 Subject: [PATCH] feat: replace lantern-shadowsocks references to outline-ss-server and outline-sdk --- shadowsocks/config.go | 4 +- shadowsocks/local.go | 104 +++++--------- shadowsocks/local_test.go | 121 ++++++++-------- shadowsocks/metrics.go | 266 ++++++++++++++++++++++++++++++++++++ shadowsocks/metrics_test.go | 119 ++++++++++++++++ shadowsocks/service.go | 77 +++++++++++ 6 files changed, 563 insertions(+), 128 deletions(-) create mode 100644 shadowsocks/metrics.go create mode 100644 shadowsocks/metrics_test.go create mode 100644 shadowsocks/service.go diff --git a/shadowsocks/config.go b/shadowsocks/config.go index 9523d095..e62e8b7e 100644 --- a/shadowsocks/config.go +++ b/shadowsocks/config.go @@ -7,8 +7,8 @@ import ( "container/list" "fmt" + "github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks" "github.com/Jigsaw-Code/outline-ss-server/service" - outlineShadowsocks "github.com/Jigsaw-Code/outline-ss-server/shadowsocks" ) const ( @@ -46,7 +46,7 @@ func UpdateCipherList(cipherList service.CipherList, configs []CipherConfig) err if config.Secret == "" { return fmt.Errorf("Secret was not specified for cipher %s", config.ID) } - ci, err := outlineShadowsocks.NewCipher(cipher, config.Secret) + ci, err := shadowsocks.NewEncryptionKey(cipher, config.Secret) if err != nil { return fmt.Errorf("Failed to create cipher entry (%v, %v, %v) : %w", config.ID, config.Cipher, config.Secret, err) } diff --git a/shadowsocks/local.go b/shadowsocks/local.go index 911058b9..de2e3ffe 100644 --- a/shadowsocks/local.go +++ b/shadowsocks/local.go @@ -3,15 +3,14 @@ package shadowsocks import ( "errors" "net" - "sync" "time" "github.com/getlantern/golog" "github.com/getlantern/netx" + "github.com/Jigsaw-Code/outline-sdk/transport" onet "github.com/Jigsaw-Code/outline-ss-server/net" "github.com/Jigsaw-Code/outline-ss-server/service" - "github.com/Jigsaw-Code/outline-ss-server/service/metrics" ) // shadowsocks/local.go houses adapters for use with Lantern. This mostly is in @@ -37,35 +36,6 @@ type HandleLocalPredicate func(addr string) bool // AlwaysLocal is a HandleLocalPredicate that requests local handling for all addresses func AlwaysLocal(addr string) bool { return true } -func maybeLocalDialer(isLocal HandleLocalPredicate, handleLocal service.TargetDialer, handleUpstream service.TargetDialer) service.TargetDialer { - return func(tgtAddr string, clientTCPConn onet.TCPConn, proxyMetrics *metrics.ProxyMetrics, targetIPValidator onet.TargetIPValidator) (onet.TCPConn, *onet.ConnectionError) { - if isLocal(tgtAddr) { - return handleLocal(tgtAddr, clientTCPConn, proxyMetrics, targetIPValidator) - } else { - return handleUpstream(tgtAddr, clientTCPConn, proxyMetrics, targetIPValidator) - } - } -} - -type ListenerOptions struct { - Listener onet.TCPListener - Ciphers service.CipherList - ReplayCache *service.ReplayCache - Timeout time.Duration - ShouldHandleLocally HandleLocalPredicate // determines whether an upstream should be handled by the listener locally or dial upstream - TargetIPValidator onet.TargetIPValidator // determines validity of non-local upstream dials - MaxPendingConnections int // defaults to 1000 -} - -type llistener struct { - service.TCPService - wrapped net.Listener - connections chan net.Conn - closedSignal chan struct{} - closeOnce sync.Once - closeError error -} - // ListenLocalTCP creates a net.Listener that returns all inbound shadowsocks connections to the // returned listener rather than dialing upstream. Any upstream or local handling should be handled by the // caller of Accept(). @@ -77,9 +47,10 @@ func ListenLocalTCP( replayCache := service.NewReplayCache(replayHistory) options := &ListenerOptions{ - Listener: &tcpListenerAdapter{l}, - Ciphers: ciphers, - ReplayCache: &replayCache, + Listener: &tcpListenerAdapter{l}, + Ciphers: ciphers, + ReplayCache: &replayCache, + ShadowsocksMetrics: &service.NoOpTCPMetrics{}, } return ListenLocalTCPOptions(options), nil @@ -117,26 +88,29 @@ func ListenLocalTCPOptions(options *ListenerOptions) net.Listener { isLocal = AlwaysLocal } - dialer := maybeLocalDialer(isLocal, l.dialPipe, service.DefaultDialTarget) - l.TCPService = service.NewTCPService( - options.Ciphers, - options.ReplayCache, - &metrics.NoOpMetrics{}, - timeout, - &service.TCPServiceOptions{ - DialTarget: dialer, - TargetIPValidator: validator, - }, - ) - - go func() { - err := l.Serve(options.Listener) - if err != nil { - log.Errorf("serving on %s: %v", l.Addr(), err) + authFunc := service.NewShadowsocksStreamAuthenticator(options.Ciphers, options.ReplayCache, options.ShadowsocksMetrics) + tcpHandler := service.NewTCPHandler(options.Listener.Addr().(*net.TCPAddr).Port, authFunc, options.ShadowsocksMetrics, tcpReadTimeout) + accept := func() (transport.StreamConn, error) { + if listener, ok := l.wrapped.(*tcpListenerAdapter); ok { + conn, err := listener.AcceptTCP() + if err == nil { + conn.SetKeepAlive(true) + if options.Accept != nil { + options.Accept(conn) + } + } + return conn, err } - l.Close() - }() + listener := l.wrapped.(*net.TCPListener) + conn, err := listener.AcceptTCP() + if err == nil { + conn.SetKeepAlive(true) + } + return conn, err + } + + go service.StreamServe(accept, tcpHandler.Handle) return l } @@ -145,10 +119,13 @@ func (l *llistener) Accept() (net.Conn, error) { select { case conn, ok := <-l.connections: if !ok { + log.Debug("No connection available at channel") return nil, ErrListenerClosed } + log.Debug("received connection") return conn, nil case <-l.closedSignal: + log.Debug("received closed signal") return nil, ErrListenerClosed } } @@ -157,7 +134,7 @@ func (l *llistener) Accept() (net.Conn, error) { func (l *llistener) Close() error { l.closeOnce.Do(func() { close(l.closedSignal) - l.closeError = l.Stop() + l.closeError = l.wrapped.Close() }) return l.closeError } @@ -167,23 +144,6 @@ func (l *llistener) Addr() net.Addr { return l.wrapped.Addr() } -// dialPipe is the dialer used by the shadowsocks tcp service when handling the upstream locally. -// When the shadowsocks TcpService dials upstream, one end of a duplex Pipe is returned to it -// and the other end is issued to the consumer of the Listener. -func (l *llistener) dialPipe(addr string, clientTCPConn onet.TCPConn, proxyMetrics *metrics.ProxyMetrics, targetIPValidator onet.TargetIPValidator) (onet.TCPConn, *onet.ConnectionError) { - c1, c2 := net.Pipe() - - // this is returned to the shadowsocks handler as the upstream connection - a := metrics.MeasureConn(&tcpConnAdapter{c1}, &proxyMetrics.ProxyTarget, &proxyMetrics.TargetProxy) - - // this is returned via the Listener as a client connection - b := &lfwd{c2, clientTCPConn, clientTCPConn.RemoteAddr(), addr} - - l.connections <- b - - return a, nil -} - // this is an adapter that fulfills the expectation // of the shadowsocks handler that it can independently // close the read and write on it's upstream side. @@ -244,7 +204,7 @@ type tcpListenerAdapter struct { net.Listener } -func (l *tcpListenerAdapter) AcceptTCP() (onet.TCPConn, error) { +func (l *tcpListenerAdapter) AcceptTCP() (TCPConn, error) { conn, err := l.Listener.Accept() if err != nil { return nil, err @@ -258,7 +218,7 @@ func (l *tcpListenerAdapter) AcceptTCP() (onet.TCPConn, error) { // is also available if needed. type lfwd struct { net.Conn - clientTCPConn onet.TCPConn + clientTCPConn net.Conn remoteAddr net.Addr upstreamTarget string } diff --git a/shadowsocks/local_test.go b/shadowsocks/local_test.go index eb4ef32a..454413c9 100644 --- a/shadowsocks/local_test.go +++ b/shadowsocks/local_test.go @@ -2,10 +2,10 @@ package shadowsocks import ( "bytes" + "context" + "crypto/rand" "fmt" - "math/rand" "net" - "strconv" "testing" "time" @@ -14,9 +14,9 @@ import ( "github.com/getlantern/fdcount" "github.com/getlantern/grtrack" - "github.com/Jigsaw-Code/outline-ss-server/client" + "github.com/Jigsaw-Code/outline-sdk/transport" + "github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks" "github.com/Jigsaw-Code/outline-ss-server/service" - outlineShadowsocks "github.com/Jigsaw-Code/outline-ss-server/shadowsocks" "github.com/stretchr/testify/require" ) @@ -28,12 +28,22 @@ func makeTestCiphers(secrets []string) (service.CipherList, error) { configs := make([]CipherConfig, len(secrets)) for i, secret := range secrets { configs[i].Secret = secret + configs[i].Cipher = shadowsocks.CHACHA20IETFPOLY1305 } cipherList, err := NewCipherListWithConfigs(configs) return cipherList, err } +// makeTestSecrets returns a slice of `n` test passwords. Not secure! +func makeTestSecrets(n int) []string { + secrets := make([]string, n) + for i := 0; i < n; i++ { + secrets[i] = fmt.Sprintf("secret-%v", i) + } + return secrets +} + // tests interception of upstream connection func TestLocalUpstreamHandling(t *testing.T) { req := make([]byte, 1024) @@ -46,60 +56,61 @@ func TestLocalUpstreamHandling(t *testing.T) { l0, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0}) require.Nil(t, err, "ListenTCP failed: %v", err) - secrets := outlineShadowsocks.MakeTestSecrets(1) + secrets := makeTestSecrets(1) cipherList, err := makeTestCiphers(secrets) - require.Nil(t, err, "MakeTestCiphers failed: %v", err) + require.Nil(t, err, "M:akeTestCiphers failed: %v", err) + replayCache := service.NewReplayCache(1) options := &ListenerOptions{ - Listener: &tcpListenerAdapter{l0}, - Ciphers: cipherList, - // Metrics: testMetrics, - Timeout: 200 * time.Millisecond, + Listener: &tcpListenerAdapter{l0}, + Ciphers: cipherList, + Timeout: 200 * time.Millisecond, + ReplayCache: &replayCache, + ShadowsocksMetrics: &service.NoOpTCPMetrics{}, + } + + req = []byte("test") + accept := func(c net.Conn) { + buf := make([]byte, 2*len(req)) + n, err := c.Read(buf) + if err != nil { + log.Errorf("error reading: %v", err) + return + } + buf = buf[:n] + if !bytes.Equal(buf, req) { + log.Errorf("unexpected request %v %v, len buf: %d, len req: %d", buf, req, len(buf), len(req)) + return + } + c.Write(res) } + options.Accept = accept l1 := ListenLocalTCPOptions(options) defer l1.Close() - go func() { - for { - c, err := l1.Accept() - if err != nil { - return - } - - go func(c net.Conn) { - defer c.Close() - buf := make([]byte, 2*len(req)) - n, err := c.Read(buf) - if err != nil { - log.Errorf("error reading: %v", err) - return - } - buf = buf[:n] - if !bytes.Equal(buf, req) { - log.Errorf("unexpected request %v %v", buf, req) - return - } - c.Write(res) - }(c) - } - }() - - host, portStr, _ := net.SplitHostPort(l1.Addr().String()) - port, err := strconv.ParseInt(portStr, 10, 32) - require.Nil(t, err, "Error parsing port") - client, err := client.NewClient(host, int(port), secrets[0], outlineShadowsocks.TestCipher) + host, _, _ := net.SplitHostPort(l1.Addr().String()) + ciphers := cipherList.SnapshotForClientIP(net.ParseIP(host)) + require.NotEmpty(t, ciphers, "No ciphers available") + require.NotEmpty(t, ciphers[0].Value.(*service.CipherEntry).CryptoKey, "No crypto key available") + client, err := shadowsocks.NewStreamDialer( + &transport.TCPEndpoint{Address: l1.Addr().String()}, + ciphers[0].Value.(*service.CipherEntry).CryptoKey, + ) require.Nil(t, err, "Error creating client") - conn, err := client.DialTCP(nil, "127.0.0.1:443") + + conn, err := client.DialStream(context.Background(), "127.0.0.1:443") require.Nil(t, err, "failed to dial") - _, err = conn.Write(req) + defer conn.Close() + + n, err := conn.Write(req) require.Nil(t, err, "failed to write request") + log.Debugf("wrote %d bytes", n) buf := make([]byte, 2*len(res)) - n, err := conn.Read(buf) + n, err = conn.Read(buf) require.Nil(t, err, "failed to read response") require.Equal(t, res, buf[:n], "unexpected response") - conn.Close() } func TestConcurrentLocalUpstreamHandling(t *testing.T) { @@ -131,14 +142,17 @@ func TestConcurrentLocalUpstreamHandling(t *testing.T) { l0, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0}) require.Nil(t, err, "ListenTCP failed: %v", err) - secrets := outlineShadowsocks.MakeTestSecrets(1) + secrets := makeTestSecrets(1) cipherList, err := makeTestCiphers(secrets) require.Nil(t, err, "MakeTestCiphers failed: %v", err) + replayCache := service.NewReplayCache(1) options := &ListenerOptions{ - Listener: &tcpListenerAdapter{l0}, - Ciphers: cipherList, - Timeout: 200 * time.Millisecond, + Listener: &tcpListenerAdapter{l0}, + Ciphers: cipherList, + Timeout: 200 * time.Millisecond, + ReplayCache: &replayCache, + ShadowsocksMetrics: &service.NoOpTCPMetrics{}, } l1 := ListenLocalTCPOptions(options) @@ -174,16 +188,15 @@ func TestConcurrentLocalUpstreamHandling(t *testing.T) { req := reqs[rnum] res := []byte(ress[string(req)]) - host, portStr, _ := net.SplitHostPort(l1.Addr().String()) - port, err := strconv.ParseInt(portStr, 10, 32) + ciphers := cipherList.SnapshotForClientIP(net.ParseIP("127.0.0.1")) + require.NotEmpty(t, ciphers, "No ciphers available") + require.NotEmpty(t, ciphers[0].Value.(*service.CipherEntry).CryptoKey, "No crypto key available") + client, err := shadowsocks.NewStreamDialer(&transport.TCPEndpoint{Address: l1.Addr().String()}, ciphers[0].Value.(*service.CipherEntry).CryptoKey) if err != nil { return err } - client, err := client.NewClient(host, int(port), secrets[0], outlineShadowsocks.TestCipher) - if err != nil { - return err - } - conn, err := client.DialTCP(nil, "127.0.0.1:443") + + conn, err := client.DialStream(context.Background(), "127.0.0.1:443") if err != nil { return err } diff --git a/shadowsocks/metrics.go b/shadowsocks/metrics.go new file mode 100644 index 00000000..9d39cde8 --- /dev/null +++ b/shadowsocks/metrics.go @@ -0,0 +1,266 @@ +package shadowsocks + +import ( + "errors" + "fmt" + "net" + "strconv" + "time" + + "github.com/Jigsaw-Code/outline-ss-server/service/metrics" + "github.com/oschwald/geoip2-golang" + "github.com/prometheus/client_golang/prometheus" +) + +// ShadowsocksMetrics registers metrics for the Shadowsocks service. +type ShadowsocksMetrics interface { + SetBuildInfo(version string) + + GetLocation(net.Addr) (string, error) + + SetNumAccessKeys(numKeys int, numPorts int) + + // TCP metrics + AddOpenTCPConnection(clientLocation string) + AddClosedTCPConnection(clientLocation, accessKey, status string, data metrics.ProxyMetrics, timeToCipher, duration time.Duration) + AddTCPProbe(status, drainResult string, port int, data metrics.ProxyMetrics) + + // UDP metrics + AddUDPPacketFromClient(clientLocation, accessKey, status string, clientProxyBytes, proxyTargetBytes int, timeToCipher time.Duration) + AddUDPPacketFromTarget(clientLocation, accessKey, status string, targetProxyBytes, proxyClientBytes int) + AddUDPNatEntry() + RemoveUDPNatEntry() +} + +type shadowsocksMetrics struct { + ipCountryDB *geoip2.Reader + + buildInfo *prometheus.GaugeVec + accessKeys prometheus.Gauge + ports prometheus.Gauge + dataBytes *prometheus.CounterVec + dataBytesPerLocation *prometheus.CounterVec + timeToCipherMs *prometheus.HistogramVec + // TODO: Add time to first byte. + + tcpProbes *prometheus.HistogramVec + tcpOpenConnections *prometheus.CounterVec + tcpClosedConnections *prometheus.CounterVec + tcpConnectionDurationMs *prometheus.HistogramVec + + udpPacketsFromClientPerLocation *prometheus.CounterVec + udpAddedNatEntries prometheus.Counter + udpRemovedNatEntries prometheus.Counter +} + +func newShadowsocksMetrics(ipCountryDB *geoip2.Reader) *shadowsocksMetrics { + // Don't forget to pass the counters to the registerer.MustRegister call in NewPrometheusShadowsocksMetrics. + return &shadowsocksMetrics{ + ipCountryDB: ipCountryDB, + buildInfo: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "shadowsocks", + Name: "build_info", + Help: "Information on the outline-ss-server build", + }, []string{"version"}), + accessKeys: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "shadowsocks", + Name: "keys", + Help: "Count of access keys", + }), + ports: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "shadowsocks", + Name: "ports", + Help: "Count of open Shadowsocks ports", + }), + tcpProbes: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "shadowsocks", + Name: "tcp_probes", + Buckets: []float64{0, 49, 50, 51, 73, 91}, + Help: "Histogram of number of bytes from client to proxy, for detecting possible probes", + }, []string{"port", "status", "error"}), + tcpOpenConnections: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "shadowsocks", + Subsystem: "tcp", + Name: "connections_opened", + Help: "Count of open TCP connections", + }, []string{"location"}), + tcpClosedConnections: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "shadowsocks", + Subsystem: "tcp", + Name: "connections_closed", + Help: "Count of closed TCP connections", + }, []string{"location", "status", "access_key"}), + tcpConnectionDurationMs: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "shadowsocks", + Subsystem: "tcp", + Name: "connection_duration_ms", + Help: "TCP connection duration distributions.", + Buckets: []float64{ + 100, + float64(time.Second.Milliseconds()), + float64(time.Minute.Milliseconds()), + float64(time.Hour.Milliseconds()), + float64(24 * time.Hour.Milliseconds()), // Day + float64(7 * 24 * time.Hour.Milliseconds()), // Week + }, + }, []string{"status"}), + dataBytes: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "shadowsocks", + Name: "data_bytes", + Help: "Bytes transferred by the proxy, per access key", + }, []string{"dir", "proto", "access_key"}), + dataBytesPerLocation: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "shadowsocks", + Name: "data_bytes_per_location", + Help: "Bytes transferred by the proxy, per location", + }, []string{"dir", "proto", "location"}), + timeToCipherMs: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "shadowsocks", + Name: "time_to_cipher_ms", + Help: "Time needed to find the cipher", + Buckets: []float64{0.1, 1, 10, 100, 1000}, + }, []string{"proto", "found_key"}), + udpPacketsFromClientPerLocation: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "shadowsocks", + Subsystem: "udp", + Name: "packets_from_client_per_location", + Help: "Packets received from the client, per location and status", + }, []string{"location", "status"}), + udpAddedNatEntries: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "shadowsocks", + Subsystem: "udp", + Name: "nat_entries_added", + Help: "Entries added to the UDP NAT table", + }), + udpRemovedNatEntries: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "shadowsocks", + Subsystem: "udp", + Name: "nat_entries_removed", + Help: "Entries removed from the UDP NAT table", + }), + } +} + +// NewPrometheusShadowsocksMetrics constructs a metrics object that uses +// `ipCountryDB` to convert IP addresses to countries, and reports all +// metrics to Prometheus via `registerer`. `ipCountryDB` may be nil, but +// `registerer` must not be. +func NewPrometheusShadowsocksMetrics(ipCountryDB *geoip2.Reader, registerer prometheus.Registerer) ShadowsocksMetrics { + m := newShadowsocksMetrics(ipCountryDB) + // TODO: Is it possible to pass where to register the collectors? + registerer.MustRegister(m.buildInfo, m.accessKeys, m.ports, m.tcpProbes, m.tcpOpenConnections, m.tcpClosedConnections, m.tcpConnectionDurationMs, + m.dataBytes, m.dataBytesPerLocation, m.timeToCipherMs, m.udpPacketsFromClientPerLocation, m.udpAddedNatEntries, m.udpRemovedNatEntries) + return m +} + +const ( + errParseAddr = "XA" + errDbLookupError = "XD" + localLocation = "XL" + unknownLocation = "ZZ" +) + +func (m *shadowsocksMetrics) SetBuildInfo(version string) { + m.buildInfo.WithLabelValues(version).Set(1) +} + +func (m *shadowsocksMetrics) GetLocation(addr net.Addr) (string, error) { + if m.ipCountryDB == nil { + return "", nil + } + hostname, _, err := net.SplitHostPort(addr.String()) + if err != nil { + return errParseAddr, errors.New("Failed to split hostname and port") + } + ip := net.ParseIP(hostname) + if ip == nil { + return errParseAddr, errors.New("Failed to parse address as IP") + } + if ip.IsLoopback() { + return localLocation, nil + } + if !ip.IsGlobalUnicast() { + return localLocation, nil + } + record, err := m.ipCountryDB.Country(ip) + if err != nil { + return errDbLookupError, errors.New("IP lookup failed") + } + if record == nil { + return unknownLocation, errors.New("IP lookup returned nil") + } + if record.Country.IsoCode == "" { + return unknownLocation, errors.New("IP Lookup has empty ISO code") + } + return record.Country.IsoCode, nil +} + +func (m *shadowsocksMetrics) SetNumAccessKeys(numKeys int, ports int) { + m.accessKeys.Set(float64(numKeys)) + m.ports.Set(float64(ports)) +} + +func (m *shadowsocksMetrics) AddOpenTCPConnection(clientLocation string) { + m.tcpOpenConnections.WithLabelValues(clientLocation).Inc() +} + +// Converts accessKey to "true" or "false" +func isFound(accessKey string) string { + return fmt.Sprintf("%t", accessKey != "") +} + +// addIfNonZero helps avoid the creation of series that are always zero. +func addIfNonZero(value int64, counterVec *prometheus.CounterVec, lvs ...string) { + if value > 0 { + counterVec.WithLabelValues(lvs...).Add(float64(value)) + } +} + +func (m *shadowsocksMetrics) AddClosedTCPConnection(clientLocation, accessKey, status string, data metrics.ProxyMetrics, timeToCipher, duration time.Duration) { + m.tcpClosedConnections.WithLabelValues(clientLocation, status, accessKey).Inc() + m.tcpConnectionDurationMs.WithLabelValues(status).Observe(duration.Seconds() * 1000) + m.timeToCipherMs.WithLabelValues("tcp", isFound(accessKey)).Observe(timeToCipher.Seconds() * 1000) + addIfNonZero(data.ClientProxy, m.dataBytes, "c>p", "tcp", accessKey) + addIfNonZero(data.ClientProxy, m.dataBytesPerLocation, "c>p", "tcp", clientLocation) + addIfNonZero(data.ProxyTarget, m.dataBytes, "p>t", "tcp", accessKey) + addIfNonZero(data.ProxyTarget, m.dataBytesPerLocation, "p>t", "tcp", clientLocation) + addIfNonZero(data.TargetProxy, m.dataBytes, "pp", "udp", accessKey) + addIfNonZero(int64(clientProxyBytes), m.dataBytesPerLocation, "c>p", "udp", clientLocation) + addIfNonZero(int64(proxyTargetBytes), m.dataBytes, "p>t", "udp", accessKey) + addIfNonZero(int64(proxyTargetBytes), m.dataBytesPerLocation, "p>t", "udp", clientLocation) +} + +func (m *shadowsocksMetrics) AddUDPPacketFromTarget(clientLocation, accessKey, status string, targetProxyBytes, proxyClientBytes int) { + addIfNonZero(int64(targetProxyBytes), m.dataBytes, "p