Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add tcp/udp proxy feature #234

Closed
wants to merge 106 commits into from
Closed
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
106 commits
Select commit Hold shift + click to select a range
b02b601
[tcpproxy] first commit
jxd1990 Sep 3, 2021
34fe9db
[tcpproxy] add layer4pipeline and layer4proxy(incomplete)
jxd1990 Sep 3, 2021
b689c5e
[tcpproxy] raw tcp proxy model
jxd1990 Sep 3, 2021
1885b16
[tcpproxy] add some function for tcp proxy model
Sep 4, 2021
05ebe9e
[util] add simple io buffer pool
jxd1990 Sep 7, 2021
831d246
[util] add timerpool(copy from nats-io/nats)
jxd1990 Sep 8, 2021
98ea2f5
[pipeline] layer4pipeline add license
jxd1990 Sep 9, 2021
b5de9a0
[layer4proxy] add read loop and write loop
jxd1990 Sep 9, 2021
391be99
[tcpproxy] create upstream conn
Sep 9, 2021
7431d90
[layer4proxy] resolve cycle dependence
jxd1990 Sep 10, 2021
0d33c87
[layer4proxy] extract connection(30%)
jxd1990 Sep 10, 2021
b0623c4
[layer4proxy] abstract connection operation
jxd1990 Sep 10, 2021
c314759
[layer4proxy] extract connection to client connection and upstream c…
jxd1990 Sep 12, 2021
534956f
[layer4proxy] simplify context methods
jxd1990 Sep 12, 2021
9395ff3
[layer4proxy]almost finish layer4pipeline and layer4context
jxd1990 Sep 13, 2021
c61b7c5
[util] add license for iobuffer(copy from mosn)
jxd1990 Sep 16, 2021
e78a512
[layer4proxy] add udp proxy(70%)
jxd1990 Sep 16, 2021
796e8f4
[connection] fix bug in connection util & add udp session
jxd1990 Sep 17, 2021
01bfd85
[iobufferpool] fix comment
jxd1990 Sep 17, 2021
3037b13
[layer4proxy] complete layer4 proxy method
jxd1990 Sep 17, 2021
d8908d2
[layer4proxy] resolve import cycle
jxd1990 Sep 17, 2021
068a704
[layer4proxy] finish majority layer4 proxy function
jxd1990 Oct 3, 2021
de51f31
[layer4proxy] simplify layer4 server
jxd1990 Oct 12, 2021
e6b8d43
[layer4proxy] extract downstream/upstream write buffer to layer4 context
jxd1990 Oct 12, 2021
f2c1017
[layer4proxy] rollback supervisor modify
jxd1990 Oct 13, 2021
ee0b146
[layer4proxy] rollback traffic controller and protocol package modify
jxd1990 Oct 13, 2021
87027b1
[layer4proxy] cleanup code
jxd1990 Oct 13, 2021
e2ea5bc
[layer4proxy] delete layer4 filter
jxd1990 Oct 13, 2021
48901c7
[layer4proxy] add missing license
jxd1990 Oct 13, 2021
d9293ca
[layer4proxy] change layer4 proxy Category
jxd1990 Oct 14, 2021
b5f3325
[layer4proxy] fix some minor problem
jxd1990 Oct 14, 2021
6defc6b
[layer4proxy] fix tcp connection close bug
jxd1990 Oct 15, 2021
ae9063f
[layer4proxy] fix upstream connect bug
jxd1990 Oct 15, 2021
6e8116b
[tcpproxy] remove udp proxy(something wrong on windows platform)
jxd1990 Oct 18, 2021
af4dc0e
[tcpproxy] remove read enable & fix log
jxd1990 Oct 19, 2021
5f5147f
[tcpproxy] extract connection goWithRecover method
jxd1990 Oct 19, 2021
bfbaa4a
[tcpproxy] fix code warning
jxd1990 Oct 19, 2021
b07d48e
[tcpproxy] fix code warning +1
jxd1990 Oct 19, 2021
23841b0
[tcpproxy] change err EOF variable to ErrEOF
jxd1990 Oct 19, 2021
d133528
[tcpproxy] fix revive warning
jxd1990 Oct 20, 2021
b04282e
[tcpproxy] finish first version of udp proxy
jxd1990 Oct 21, 2021
1cd74d8
[tcpproxy] add missing license
jxd1990 Oct 21, 2021
e158867
[tcpproxy] optimization for no response udp proxy scenes
jxd1990 Oct 21, 2021
b120e1a
[udpproxy] fix udp proxy bug
jxd1990 Oct 21, 2021
615b675
[layer4proxy] update timerpool license
jxd1990 Oct 24, 2021
b14f06d
[layer4proxy] extract ipfilters file in tcpproxy and udpproxy package…
jxd1990 Oct 24, 2021
b5cbd6d
[layer4proxy] change tcpproxy listen function to make code more readable
jxd1990 Oct 24, 2021
0862137
[layer4proxy] fix udp receive bug
jxd1990 Oct 24, 2021
9385a32
Merge branch 'main' into resolve_merge_conflict
jxd134 Oct 24, 2021
3e891e9
[layer4proxy] move layer4 ipfilters to util/ipfilter
jxd1990 Oct 25, 2021
2001239
Update GetReadBuffer function comment
jxd134 Oct 25, 2021
6911fcd
[layer4proxy] fix error modify
jxd1990 Oct 25, 2021
d966d6f
Merge branch 'resolve_merge_conflict' of https://github.com/jxd134/ea…
jxd1990 Oct 25, 2021
ceaaec1
[layer4proxy] update io buffer for tcp/udp proxy
jxd1990 Oct 26, 2021
6124cff
[layer4proxy] fix go.mod import
jxd1990 Oct 26, 2021
baf7e54
[layer4proxy] extract backend servers pool related code to util/layer…
jxd1990 Oct 27, 2021
99ad2f5
[layer4proxy] simplify udp server lifecycle
jxd1990 Oct 27, 2021
6a85708
[layer4proxy] fix udp session close data race bug
jxd1990 Oct 28, 2021
b6f9ea9
[layer4proxy] no need to protect check when connPool close function b…
jxd1990 Oct 28, 2021
2da5a57
[layer4proxy] fix udp proxy buffer bug
jxd1990 Oct 28, 2021
9a6803a
[layer4proxy] optimization udp proxy buffer len setting
jxd1990 Oct 28, 2021
51a0d43
[layer4proxy] simplify backend server spec name
jxd1990 Oct 28, 2021
a0c9290
[udpproxy] rename (downstream/upstream to client/server, oldpool to o…
jxd1990 Oct 30, 2021
3fb339d
[udpproxy] fix bug(closing old rules should happen after new rules ar…
jxd1990 Oct 30, 2021
4670fad
[udpproxy] fix bug(check pool rules before get next server)
jxd1990 Oct 30, 2021
f69f29a
[udpproxy] add comment for Layer4IPFilters
jxd1990 Oct 30, 2021
f8132bb
fix wrong comment
jxd134 Oct 30, 2021
4a29306
Merge branch 'main' into resolve_merge_conflict
jxd134 Oct 30, 2021
c00185f
[tcpproxy] rename downstrean/upstream to client/server in tcpproxy mo…
jxd1990 Oct 30, 2021
f8eee9a
Merge remote-tracking branch 'origin/resolve_merge_conflict' into res…
jxd1990 Oct 30, 2021
e1002b1
[tcpproxy] remove connected param & simplify connect method
jxd1990 Nov 1, 2021
802ce1f
[tcpproxy] simplify tcp connection
jxd1990 Nov 2, 2021
24cebfd
[udpproxy] get udp session by client ip
jxd1990 Nov 2, 2021
9645153
[udpproxy] remove udp server runtime cleanup method(session cleanup i…
jxd1990 Nov 2, 2021
3cd6524
[udpproxy] replace atomic with mutex in session close function
jxd1990 Nov 2, 2021
e3abd99
[layer4proxy] bug fix for udp connection pool
Nov 2, 2021
49df67e
[udpproxy] fix udp connection pool judge bug
jxd1990 Nov 3, 2021
e72fb2f
[tcpproxy] fix tcp client close bug & add comment
jxd1990 Nov 5, 2021
9149981
simplify connection start function
jxd134 Nov 6, 2021
51e77d2
[tcpproxy] remove unnecessary parameters `startOnce`
jxd1990 Nov 6, 2021
3dff752
[tcpproxy] early continue to reduce nesting
jxd1990 Nov 6, 2021
5b7d97b
[tcpproxy] add comment for limit listener connection
jxd1990 Nov 6, 2021
f0d1a61
[tcpproxy] no need to call `atmoic.LoadUint32` for `closed` param whe…
jxd1990 Nov 6, 2021
97d36db
[tcpproxy] refactor `startWriteLoop` function
jxd1990 Nov 6, 2021
ca33400
[layer4backend] pool rules is never empty, no need to check it
jxd1990 Nov 6, 2021
4f494c8
[layer4backend] fix listener stop bug(can not close exist connection)
jxd1990 Nov 6, 2021
a05d0a8
[layer4backend] fix timeout exception check bug
jxd1990 Nov 6, 2021
d311dc7
[tcpproxy] fix tcp bufferpool capacity bug(ref: https://github.com/go…
jxd1990 Nov 7, 2021
d5004a3
[tcpproxy] remove debug log
jxd1990 Nov 7, 2021
8be02cf
[tcpproxy] fix write loop busy loop bug
jxd1990 Nov 8, 2021
c93b12b
[tcpproxy] remove unused param
jxd1990 Nov 8, 2021
1785884
[udpproxy] fix udp runtime not initialized bug
jxd1990 Nov 9, 2021
9b19281
[udpproxy] fix udp session close bug & optimization udp byte buffer
jxd1990 Nov 9, 2021
a995b2d
[tcpproxy] remove unused param in tcp connection and checking
jxd1990 Nov 18, 2021
4c08d07
[tcpproxy] notify read/write loop to exit by connection timeout
jxd1990 Nov 24, 2021
7fb34e1
[tcpproxy] simplify io eof
jxd1990 Nov 24, 2021
98b638a
[tcpproxy] Optimize the read/write exit mechanism
jxd1990 Nov 25, 2021
db3b48d
Merge branch 'resolve_merge_conflict' of https://github.com.cnpmjs.or…
Dec 21, 2021
671d234
[layer4proxy] merge upstream/main code change
Dec 21, 2021
0e07f18
[udpproxy] refactor start session method
jxd1990 Dec 31, 2021
298d6c1
Merge remote-tracking branch 'origin/resolve_merge_conflict' into res…
jxd1990 Dec 31, 2021
af9a795
[util/layer4ipfilters] fix ipfilters create return value
jxd1990 Dec 31, 2021
8edcc7a
[util/layer4ipfilters] fix ipfilters nil check problem
jxd1990 Jan 4, 2022
59dc6e6
[tcpproxy] fix tcp connection close check log
jxd1990 Jan 4, 2022
97b8f1a
[tcpproxy/udpproxy] Add dummy TCP and UDP clients to facilitate revie…
jxd1990 Jan 4, 2022
fc145ec
[tcpproxy/udpproxy] add read/write loop channel to make sure close so…
jxd1990 Jan 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/context/httpcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type (
Method() string
SetMethod(method string)

// URL
// Scheme URL
Scheme() string
Host() string
SetHost(host string)
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/proxy/masterslavereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
type (
// masterSlaveReader reads bytes to master,
// and synchronize them to slave.
// Currently only support one slave.
// Currently, only support one slave.
masterSlaveReader struct {
masterReader io.Reader
slaveReader io.Reader
Expand Down
2 changes: 1 addition & 1 deletion pkg/object/httppipeline/httppipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (hp *HTTPPipeline) DefaultSpec() interface{} {
return &Spec{}
}

// Init initilizes HTTPPipeline.
// Init initializes HTTPPipeline.
func (hp *HTTPPipeline) Init(superSpec *supervisor.Spec, muxMapper protocol.MuxMapper) {
hp.superSpec, hp.spec, hp.muxMapper = superSpec, superSpec.ObjectSpec().(*Spec), muxMapper

Expand Down
2 changes: 1 addition & 1 deletion pkg/object/httpserver/httpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (hs *HTTPServer) Inherit(superSpec *supervisor.Spec, previousGeneration sup
}
}

// Status is the wrapper of runtime's Status.
// Status is the wrapper of runtimes Status.
func (hs *HTTPServer) Status() *supervisor.Status {
return &supervisor.Status{
ObjectStatus: hs.runtime.Status(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/object/httpserver/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (r *runtime) reload(nextSuperSpec *supervisor.Spec, muxMapper protocol.MuxM

nextSpec := nextSuperSpec.ObjectSpec().(*Spec)

// r.limitListener does not created just after the process started and the config load for the first time.
// r.limitListener does not create just after the process started and the config load for the first time.
if nextSpec != nil && r.limitListener != nil {
r.limitListener.SetMaxConnection(nextSpec.MaxConnections)
}
Expand Down
279 changes: 279 additions & 0 deletions pkg/object/tcpproxy/backendserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
/*
* Copyright (c) 2017, MegaEase
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package tcpproxy
jxd134 marked this conversation as resolved.
Show resolved Hide resolved

import (
"fmt"
"math/rand"
"sync"
"sync/atomic"

"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/object/serviceregistry"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/hashtool"
"github.com/megaease/easegress/pkg/util/stringtool"
)

const (
// PolicyRoundRobin is the policy of round-robin.
PolicyRoundRobin = "roundRobin"
// PolicyRandom is the policy of random.
PolicyRandom = "random"
// PolicyWeightedRandom is the policy of weighted random.
PolicyWeightedRandom = "weightedRandom"
// PolicyIPHash is the policy of ip hash.
PolicyIPHash = "ipHash"
)

type (
servers struct {
poolSpec *PoolSpec
super *supervisor.Supervisor

mutex sync.Mutex
serviceRegistry *serviceregistry.ServiceRegistry
serviceWatcher serviceregistry.ServiceWatcher
static *staticServers

done chan struct{}
}

staticServers struct {
count uint64
weightsSum int
servers []*Server
lb LoadBalance
}

// Server is proxy server.
Server struct {
Addr string `yaml:"url" jsonschema:"required,format=hostport"`
Tags []string `yaml:"tags" jsonschema:"omitempty,uniqueItems=true"`
Weight int `yaml:"weight" jsonschema:"omitempty,minimum=0,maximum=100"`
}

// LoadBalance is load balance for multiple servers.
LoadBalance struct {
Policy string `yaml:"policy" jsonschema:"required,enum=roundRobin,enum=random,enum=weightedRandom,enum=ipHash"`
}
)

func newServers(super *supervisor.Supervisor, poolSpec *PoolSpec) *servers {
s := &servers{
poolSpec: poolSpec,
super: super,
done: make(chan struct{}),
}

s.useStaticServers()
if poolSpec.ServiceRegistry == "" || poolSpec.ServiceName == "" {
return s
}

s.serviceRegistry = s.super.MustGetSystemController(serviceregistry.Kind).
Instance().(*serviceregistry.ServiceRegistry)
s.tryUseService()
s.serviceWatcher = s.serviceRegistry.NewServiceWatcher(s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName)

go s.watchService()
return s
}

func (s *Server) String() string {
return fmt.Sprintf("%s,%v,%d", s.Addr, s.Tags, s.Weight)
}

func (s *servers) watchService() {
for {
select {
case <-s.done:
return
case event := <-s.serviceWatcher.Watch():
s.handleEvent(event)
}
}
}

func (s *servers) handleEvent(event *serviceregistry.ServiceEvent) {
s.useService(event.Instances)
}

func (s *servers) tryUseService() {
serviceInstanceSpecs, err := s.serviceRegistry.ListServiceInstances(s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName)

if err != nil {
logger.Errorf("get service %s/%s failed: %v",
s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName, err)
s.useStaticServers()
return
}
s.useService(serviceInstanceSpecs)
}

func (s *servers) useService(serviceInstanceSpecs map[string]*serviceregistry.ServiceInstanceSpec) {
var servers []*Server
for _, instance := range serviceInstanceSpecs {
servers = append(servers, &Server{
Addr: fmt.Sprintf("%s:%d", instance.Address, instance.Port),
Tags: instance.Tags,
Weight: instance.Weight,
})
}
if len(servers) == 0 {
logger.Errorf("%s/%s: empty service instance",
s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName)
s.useStaticServers()
return
}

dynamicServers := newStaticServers(servers, s.poolSpec.ServersTags, s.poolSpec.LoadBalance)
if dynamicServers.len() == 0 {
logger.Errorf("%s/%s: no service instance satisfy tags: %v",
s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName, s.poolSpec.ServersTags)
s.useStaticServers()
}

logger.Infof("use dynamic service: %s/%s", s.poolSpec.ServiceRegistry, s.poolSpec.ServiceName)

s.mutex.Lock()
defer s.mutex.Unlock()
s.static = dynamicServers
}

func (s *servers) useStaticServers() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.static = newStaticServers(s.poolSpec.Servers, s.poolSpec.ServersTags, s.poolSpec.LoadBalance)
}

func (s *servers) snapshot() *staticServers {
s.mutex.Lock()
defer s.mutex.Unlock()

return s.static
}

func (s *servers) len() int {
static := s.snapshot()
return static.len()
}

func (s *servers) next(cliAddr string) (*Server, error) {
static := s.snapshot()
if static.len() == 0 {
return nil, fmt.Errorf("no server available")
}
return static.next(cliAddr), nil
}

func (s *servers) close() {
close(s.done)

if s.serviceWatcher != nil {
s.serviceWatcher.Stop()
}
}

func newStaticServers(servers []*Server, tags []string, lb *LoadBalance) *staticServers {
if servers == nil {
servers = make([]*Server, 0)
}

ss := &staticServers{}
if lb == nil {
ss.lb.Policy = PolicyRoundRobin
} else {
ss.lb = *lb
}

defer ss.prepare()

if len(tags) == 0 {
ss.servers = servers
return ss
}

chosenServers := make([]*Server, 0)
for _, server := range servers {
for _, tag := range tags {
if stringtool.StrInSlice(tag, server.Tags) {
chosenServers = append(chosenServers, server)
break
}
}
}
ss.servers = chosenServers
return ss
}

func (ss *staticServers) prepare() {
for _, server := range ss.servers {
ss.weightsSum += server.Weight
}
}

func (ss *staticServers) len() int {
return len(ss.servers)
}

func (ss *staticServers) next(cliAddr string) *Server {
switch ss.lb.Policy {
case PolicyRoundRobin:
return ss.roundRobin()
case PolicyRandom:
return ss.random()
case PolicyWeightedRandom:
return ss.weightedRandom()
case PolicyIPHash:
return ss.ipHash(cliAddr)
}
logger.Errorf("BUG: unknown load balance policy: %s", ss.lb.Policy)
return ss.roundRobin()
}

func (ss *staticServers) roundRobin() *Server {
count := atomic.AddUint64(&ss.count, 1)
// NOTE: startEventLoop from 0.
count--
return ss.servers[int(count)%len(ss.servers)]
}

func (ss *staticServers) random() *Server {
return ss.servers[rand.Intn(len(ss.servers))]
}

func (ss *staticServers) weightedRandom() *Server {
randomWeight := rand.Intn(ss.weightsSum)
for _, server := range ss.servers {
randomWeight -= server.Weight
if randomWeight < 0 {
return server
}
}

logger.Errorf("BUG: weighted random can't pick a server: sum(%d) servers(%+v)",
ss.weightsSum, ss.servers)

return ss.random()
}

func (ss *staticServers) ipHash(cliAddr string) *Server {
sum32 := int(hashtool.Hash32(cliAddr))
return ss.servers[sum32%len(ss.servers)]
}
Loading