Skip to content

Commit

Permalink
feat: replace lantern-shadowsocks references to outline-ss-server and…
Browse files Browse the repository at this point in the history
… outline-sdk
  • Loading branch information
WendelHime committed May 31, 2024
1 parent 1237b04 commit eca4c2a
Show file tree
Hide file tree
Showing 6 changed files with 563 additions and 128 deletions.
4 changes: 2 additions & 2 deletions shadowsocks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"container/list"
"fmt"

"github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks"
"github.com/Jigsaw-Code/outline-ss-server/service"
outlineShadowsocks "github.com/Jigsaw-Code/outline-ss-server/shadowsocks"
)

const (
Expand Down Expand Up @@ -46,7 +46,7 @@ func UpdateCipherList(cipherList service.CipherList, configs []CipherConfig) err
if config.Secret == "" {
return fmt.Errorf("Secret was not specified for cipher %s", config.ID)
}
ci, err := outlineShadowsocks.NewCipher(cipher, config.Secret)
ci, err := shadowsocks.NewEncryptionKey(cipher, config.Secret)
if err != nil {
return fmt.Errorf("Failed to create cipher entry (%v, %v, %v) : %w", config.ID, config.Cipher, config.Secret, err)
}
Expand Down
104 changes: 32 additions & 72 deletions shadowsocks/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ package shadowsocks
import (
"errors"
"net"
"sync"
"time"

"github.com/getlantern/golog"
"github.com/getlantern/netx"

"github.com/Jigsaw-Code/outline-sdk/transport"
onet "github.com/Jigsaw-Code/outline-ss-server/net"
"github.com/Jigsaw-Code/outline-ss-server/service"
"github.com/Jigsaw-Code/outline-ss-server/service/metrics"
)

// shadowsocks/local.go houses adapters for use with Lantern. This mostly is in
Expand All @@ -37,35 +36,6 @@ type HandleLocalPredicate func(addr string) bool
// AlwaysLocal is a HandleLocalPredicate that requests local handling for all addresses
func AlwaysLocal(addr string) bool { return true }

func maybeLocalDialer(isLocal HandleLocalPredicate, handleLocal service.TargetDialer, handleUpstream service.TargetDialer) service.TargetDialer {
return func(tgtAddr string, clientTCPConn onet.TCPConn, proxyMetrics *metrics.ProxyMetrics, targetIPValidator onet.TargetIPValidator) (onet.TCPConn, *onet.ConnectionError) {
if isLocal(tgtAddr) {
return handleLocal(tgtAddr, clientTCPConn, proxyMetrics, targetIPValidator)
} else {
return handleUpstream(tgtAddr, clientTCPConn, proxyMetrics, targetIPValidator)
}
}
}

type ListenerOptions struct {
Listener onet.TCPListener
Ciphers service.CipherList
ReplayCache *service.ReplayCache
Timeout time.Duration
ShouldHandleLocally HandleLocalPredicate // determines whether an upstream should be handled by the listener locally or dial upstream
TargetIPValidator onet.TargetIPValidator // determines validity of non-local upstream dials
MaxPendingConnections int // defaults to 1000
}

type llistener struct {
service.TCPService
wrapped net.Listener
connections chan net.Conn
closedSignal chan struct{}
closeOnce sync.Once
closeError error
}

// ListenLocalTCP creates a net.Listener that returns all inbound shadowsocks connections to the
// returned listener rather than dialing upstream. Any upstream or local handling should be handled by the
// caller of Accept().
Expand All @@ -77,9 +47,10 @@ func ListenLocalTCP(
replayCache := service.NewReplayCache(replayHistory)

options := &ListenerOptions{
Listener: &tcpListenerAdapter{l},
Ciphers: ciphers,
ReplayCache: &replayCache,
Listener: &tcpListenerAdapter{l},
Ciphers: ciphers,
ReplayCache: &replayCache,
ShadowsocksMetrics: &service.NoOpTCPMetrics{},
}

return ListenLocalTCPOptions(options), nil
Expand Down Expand Up @@ -117,26 +88,29 @@ func ListenLocalTCPOptions(options *ListenerOptions) net.Listener {
isLocal = AlwaysLocal
}

dialer := maybeLocalDialer(isLocal, l.dialPipe, service.DefaultDialTarget)
l.TCPService = service.NewTCPService(
options.Ciphers,
options.ReplayCache,
&metrics.NoOpMetrics{},
timeout,
&service.TCPServiceOptions{
DialTarget: dialer,
TargetIPValidator: validator,
},
)

go func() {
err := l.Serve(options.Listener)
if err != nil {
log.Errorf("serving on %s: %v", l.Addr(), err)
authFunc := service.NewShadowsocksStreamAuthenticator(options.Ciphers, options.ReplayCache, options.ShadowsocksMetrics)
tcpHandler := service.NewTCPHandler(options.Listener.Addr().(*net.TCPAddr).Port, authFunc, options.ShadowsocksMetrics, tcpReadTimeout)
accept := func() (transport.StreamConn, error) {
if listener, ok := l.wrapped.(*tcpListenerAdapter); ok {
conn, err := listener.AcceptTCP()
if err == nil {
conn.SetKeepAlive(true)
if options.Accept != nil {
options.Accept(conn)
}
}
return conn, err
}
l.Close()
}()

listener := l.wrapped.(*net.TCPListener)
conn, err := listener.AcceptTCP()
if err == nil {
conn.SetKeepAlive(true)
}
return conn, err
}

go service.StreamServe(accept, tcpHandler.Handle)
return l
}

Expand All @@ -145,10 +119,13 @@ func (l *llistener) Accept() (net.Conn, error) {
select {
case conn, ok := <-l.connections:
if !ok {
log.Debug("No connection available at channel")
return nil, ErrListenerClosed
}
log.Debug("received connection")
return conn, nil
case <-l.closedSignal:
log.Debug("received closed signal")
return nil, ErrListenerClosed
}
}
Expand All @@ -157,7 +134,7 @@ func (l *llistener) Accept() (net.Conn, error) {
func (l *llistener) Close() error {
l.closeOnce.Do(func() {
close(l.closedSignal)
l.closeError = l.Stop()
l.closeError = l.wrapped.Close()
})
return l.closeError
}
Expand All @@ -167,23 +144,6 @@ func (l *llistener) Addr() net.Addr {
return l.wrapped.Addr()
}

// dialPipe is the dialer used by the shadowsocks tcp service when handling the upstream locally.
// When the shadowsocks TcpService dials upstream, one end of a duplex Pipe is returned to it
// and the other end is issued to the consumer of the Listener.
func (l *llistener) dialPipe(addr string, clientTCPConn onet.TCPConn, proxyMetrics *metrics.ProxyMetrics, targetIPValidator onet.TargetIPValidator) (onet.TCPConn, *onet.ConnectionError) {
c1, c2 := net.Pipe()

// this is returned to the shadowsocks handler as the upstream connection
a := metrics.MeasureConn(&tcpConnAdapter{c1}, &proxyMetrics.ProxyTarget, &proxyMetrics.TargetProxy)

// this is returned via the Listener as a client connection
b := &lfwd{c2, clientTCPConn, clientTCPConn.RemoteAddr(), addr}

l.connections <- b

return a, nil
}

// this is an adapter that fulfills the expectation
// of the shadowsocks handler that it can independently
// close the read and write on it's upstream side.
Expand Down Expand Up @@ -244,7 +204,7 @@ type tcpListenerAdapter struct {
net.Listener
}

func (l *tcpListenerAdapter) AcceptTCP() (onet.TCPConn, error) {
func (l *tcpListenerAdapter) AcceptTCP() (TCPConn, error) {
conn, err := l.Listener.Accept()
if err != nil {
return nil, err
Expand All @@ -258,7 +218,7 @@ func (l *tcpListenerAdapter) AcceptTCP() (onet.TCPConn, error) {
// is also available if needed.
type lfwd struct {
net.Conn
clientTCPConn onet.TCPConn
clientTCPConn net.Conn
remoteAddr net.Addr
upstreamTarget string
}
Expand Down
121 changes: 67 additions & 54 deletions shadowsocks/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package shadowsocks

import (
"bytes"
"context"
"crypto/rand"
"fmt"
"math/rand"
"net"
"strconv"
"testing"
"time"

Expand All @@ -14,9 +14,9 @@ import (
"github.com/getlantern/fdcount"
"github.com/getlantern/grtrack"

"github.com/Jigsaw-Code/outline-ss-server/client"
"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks"
"github.com/Jigsaw-Code/outline-ss-server/service"
outlineShadowsocks "github.com/Jigsaw-Code/outline-ss-server/shadowsocks"
"github.com/stretchr/testify/require"
)

Expand All @@ -28,12 +28,22 @@ func makeTestCiphers(secrets []string) (service.CipherList, error) {
configs := make([]CipherConfig, len(secrets))
for i, secret := range secrets {
configs[i].Secret = secret
configs[i].Cipher = shadowsocks.CHACHA20IETFPOLY1305
}

cipherList, err := NewCipherListWithConfigs(configs)
return cipherList, err
}

// makeTestSecrets returns a slice of `n` test passwords. Not secure!
func makeTestSecrets(n int) []string {
secrets := make([]string, n)
for i := 0; i < n; i++ {
secrets[i] = fmt.Sprintf("secret-%v", i)
}
return secrets
}

// tests interception of upstream connection
func TestLocalUpstreamHandling(t *testing.T) {
req := make([]byte, 1024)
Expand All @@ -46,60 +56,61 @@ func TestLocalUpstreamHandling(t *testing.T) {

l0, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0})
require.Nil(t, err, "ListenTCP failed: %v", err)
secrets := outlineShadowsocks.MakeTestSecrets(1)
secrets := makeTestSecrets(1)
cipherList, err := makeTestCiphers(secrets)
require.Nil(t, err, "MakeTestCiphers failed: %v", err)
require.Nil(t, err, "M:akeTestCiphers failed: %v", err)

replayCache := service.NewReplayCache(1)
options := &ListenerOptions{
Listener: &tcpListenerAdapter{l0},
Ciphers: cipherList,
// Metrics: testMetrics,
Timeout: 200 * time.Millisecond,
Listener: &tcpListenerAdapter{l0},
Ciphers: cipherList,
Timeout: 200 * time.Millisecond,
ReplayCache: &replayCache,
ShadowsocksMetrics: &service.NoOpTCPMetrics{},
}

req = []byte("test")
accept := func(c net.Conn) {
buf := make([]byte, 2*len(req))
n, err := c.Read(buf)
if err != nil {
log.Errorf("error reading: %v", err)
return
}
buf = buf[:n]
if !bytes.Equal(buf, req) {
log.Errorf("unexpected request %v %v, len buf: %d, len req: %d", buf, req, len(buf), len(req))
return
}
c.Write(res)
}
options.Accept = accept

l1 := ListenLocalTCPOptions(options)
defer l1.Close()

go func() {
for {
c, err := l1.Accept()
if err != nil {
return
}

go func(c net.Conn) {
defer c.Close()
buf := make([]byte, 2*len(req))
n, err := c.Read(buf)
if err != nil {
log.Errorf("error reading: %v", err)
return
}
buf = buf[:n]
if !bytes.Equal(buf, req) {
log.Errorf("unexpected request %v %v", buf, req)
return
}
c.Write(res)
}(c)
}
}()

host, portStr, _ := net.SplitHostPort(l1.Addr().String())
port, err := strconv.ParseInt(portStr, 10, 32)
require.Nil(t, err, "Error parsing port")
client, err := client.NewClient(host, int(port), secrets[0], outlineShadowsocks.TestCipher)
host, _, _ := net.SplitHostPort(l1.Addr().String())
ciphers := cipherList.SnapshotForClientIP(net.ParseIP(host))
require.NotEmpty(t, ciphers, "No ciphers available")
require.NotEmpty(t, ciphers[0].Value.(*service.CipherEntry).CryptoKey, "No crypto key available")
client, err := shadowsocks.NewStreamDialer(
&transport.TCPEndpoint{Address: l1.Addr().String()},
ciphers[0].Value.(*service.CipherEntry).CryptoKey,
)
require.Nil(t, err, "Error creating client")
conn, err := client.DialTCP(nil, "127.0.0.1:443")

conn, err := client.DialStream(context.Background(), "127.0.0.1:443")
require.Nil(t, err, "failed to dial")
_, err = conn.Write(req)
defer conn.Close()

n, err := conn.Write(req)
require.Nil(t, err, "failed to write request")
log.Debugf("wrote %d bytes", n)

buf := make([]byte, 2*len(res))
n, err := conn.Read(buf)
n, err = conn.Read(buf)
require.Nil(t, err, "failed to read response")
require.Equal(t, res, buf[:n], "unexpected response")
conn.Close()
}

func TestConcurrentLocalUpstreamHandling(t *testing.T) {
Expand Down Expand Up @@ -131,14 +142,17 @@ func TestConcurrentLocalUpstreamHandling(t *testing.T) {

l0, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0})
require.Nil(t, err, "ListenTCP failed: %v", err)
secrets := outlineShadowsocks.MakeTestSecrets(1)
secrets := makeTestSecrets(1)
cipherList, err := makeTestCiphers(secrets)
require.Nil(t, err, "MakeTestCiphers failed: %v", err)

replayCache := service.NewReplayCache(1)
options := &ListenerOptions{
Listener: &tcpListenerAdapter{l0},
Ciphers: cipherList,
Timeout: 200 * time.Millisecond,
Listener: &tcpListenerAdapter{l0},
Ciphers: cipherList,
Timeout: 200 * time.Millisecond,
ReplayCache: &replayCache,
ShadowsocksMetrics: &service.NoOpTCPMetrics{},
}

l1 := ListenLocalTCPOptions(options)
Expand Down Expand Up @@ -174,16 +188,15 @@ func TestConcurrentLocalUpstreamHandling(t *testing.T) {
req := reqs[rnum]
res := []byte(ress[string(req)])

host, portStr, _ := net.SplitHostPort(l1.Addr().String())
port, err := strconv.ParseInt(portStr, 10, 32)
ciphers := cipherList.SnapshotForClientIP(net.ParseIP("127.0.0.1"))
require.NotEmpty(t, ciphers, "No ciphers available")
require.NotEmpty(t, ciphers[0].Value.(*service.CipherEntry).CryptoKey, "No crypto key available")
client, err := shadowsocks.NewStreamDialer(&transport.TCPEndpoint{Address: l1.Addr().String()}, ciphers[0].Value.(*service.CipherEntry).CryptoKey)
if err != nil {
return err
}
client, err := client.NewClient(host, int(port), secrets[0], outlineShadowsocks.TestCipher)
if err != nil {
return err
}
conn, err := client.DialTCP(nil, "127.0.0.1:443")

conn, err := client.DialStream(context.Background(), "127.0.0.1:443")
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit eca4c2a

Please sign in to comment.