Skip to content

Commit

Permalink
introduce reuse
Browse files Browse the repository at this point in the history
Signed-off-by: hslam <[email protected]>
  • Loading branch information
hslam committed Feb 19, 2023
1 parent f5ee69f commit be45992
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 7 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.15

require (
github.com/hslam/buffer v0.0.0-20230217202846-e7b1b6ebf283
github.com/hslam/reuse v0.0.0-20230219162114-9a3f8d1f9550
github.com/hslam/scheduler v0.0.0-20211028175315-641598104976
github.com/hslam/sendfile v1.0.1
github.com/hslam/splice v1.0.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/hslam/buffer v0.0.0-20230217202846-e7b1b6ebf283 h1:HurKqAs9w/HX/Y+s/Y
github.com/hslam/buffer v0.0.0-20230217202846-e7b1b6ebf283/go.mod h1:Gvbj40hnzR54zoUOuDZqDi7aziar8UlkHXk6NVYLg2U=
github.com/hslam/mmap v1.0.0 h1:GSp55lZrPDhctob3yE0SqESBjzgCn9cP4iu4Pmmm+gE=
github.com/hslam/mmap v1.0.0/go.mod h1:mtuj54WoaupC65QteY9RubXVPkQT86Q/Xj0WPzRefFw=
github.com/hslam/reuse v0.0.0-20230219162114-9a3f8d1f9550 h1:FJ2dXfgNhK0rLpj0q0DsWOWmP+5WaZprgjbz27k7a5E=
github.com/hslam/reuse v0.0.0-20230219162114-9a3f8d1f9550/go.mod h1:zrrCu2XS412Bqv9D2+VPvjMeOhn70cmHikky63L65XE=
github.com/hslam/scheduler v0.0.0-20211028175315-641598104976 h1:7xDxY7ffYQjS+eM6/7zSvvxrGowxHIwGWHM+drPp/XA=
github.com/hslam/scheduler v0.0.0-20211028175315-641598104976/go.mod h1:5Lu1StnE7hhW2QwdImTNFLdzIUhAUjO7SlLco+H5h9A=
github.com/hslam/sendfile v1.0.1 h1:0OLb5VRwjdN3q9hkslfk5nPQIrH5UmGTjSuJtV4Zq+8=
Expand Down
20 changes: 14 additions & 6 deletions net_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
package netpoll

import (
"errors"
ctx "context"
"net"
"sync/atomic"
)
Expand All @@ -24,10 +24,15 @@ type Server struct {
UnsharedWorkers int
// SharedWorkers do not work for consisted with other system.
SharedWorkers int
// TasksPerWorker do not work for consisted with other system.
TasksPerWorker int
netServer *netServer
closed int32
// If Control is not nil, it is called after creating the network
// connection but before binding it to the operating system.
//
// Network and address parameters passed to Control method are not
// necessarily the ones passed to Listen. For example, passing "tcp" to
// Listen will cause the Control function to be called with "tcp4" or "tcp6".
Control func(network, address strng, c syscall.RawConn) error
netServer *netServer
closed int32
}

// ListenAndServe listens on the network address and then calls
Expand All @@ -39,7 +44,10 @@ func (s *Server) ListenAndServe() error {
if atomic.LoadInt32(&s.closed) != 0 {
return ErrServerClosed
}
ln, err := net.Listen(s.Network, s.Address)
var listenConfig = net.ListenConfig{
Control: s.Control,
}
ln, err := listenConfig.Listen(ctx.Background(), s.Network, s.Address)
if err != nil {
return err
}
Expand Down
22 changes: 21 additions & 1 deletion net_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
package netpoll

import (
ctx "context"
"errors"
"github.com/hslam/buffer"
"github.com/hslam/reuse"
"github.com/hslam/scheduler"
"github.com/hslam/sendfile"
"github.com/hslam/splice"
Expand Down Expand Up @@ -40,6 +42,13 @@ type Server struct {
NoAsync bool
UnsharedWorkers int
SharedWorkers int
// If Control is not nil, it is called after creating the network
// connection but before binding it to the operating system.
//
// Network and address parameters passed to Control method are not
// necessarily the ones passed to Listen. For example, passing "tcp" to
// Listen will cause the Control function to be called with "tcp4" or "tcp6".
Control func(network, address string, c syscall.RawConn) error
addr net.Addr
ln net.Listener
netServer *netServer
Expand Down Expand Up @@ -70,7 +79,18 @@ func (s *Server) ListenAndServe() error {
if atomic.LoadInt32(&s.closed) != 0 {
return ErrServerClosed
}
ln, err := net.Listen(s.Network, s.Address)
var listenConfig = net.ListenConfig{
Control: func(network, address string, c syscall.RawConn) (err error) {
var control = s.Control
if control != nil {
if err = control(network, address, c); err != nil {
return err
}
}
return reuse.Control(network, address, c)
},
}
ln, err := listenConfig.Listen(ctx.Background(), s.Network, s.Address)
if err != nil {
return err
}
Expand Down
53 changes: 53 additions & 0 deletions net_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"strings"
"sync"
"syscall"
"testing"
"time"
)
Expand Down Expand Up @@ -1093,3 +1094,55 @@ func TestMinHeap(t *testing.T) {
}
}
}

func TestControl(t *testing.T) {
var handler = &DataHandler{
NoShared: false,
NoCopy: false,
BufferSize: 1024,
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
},
}
network := "tcp"
addr := ":9999"
server := &Server{
Network: network,
Address: addr,
Handler: handler,
NoAsync: false,
Control: func(network, address string, c syscall.RawConn) error {
return nil
},
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
if err := server.ListenAndServe(); err == nil {
t.Error()
}
}()
time.Sleep(time.Millisecond * 10)
conn, _ := net.Dial(network, addr)
msg := "Hello World"
msg = strings.Repeat(msg, 50)
if n, err := conn.Write([]byte(msg)); err != nil {
t.Error(err)
} else if n != len(msg) {
t.Error(n)
}
buf := make([]byte, len(msg))
if n, err := conn.Read(buf); err != nil {
t.Error(err)
} else if n != len(msg) {
t.Error(n)
} else if string(buf) != msg {
t.Error(string(buf))
}
time.Sleep(time.Millisecond * 500)
conn.Close()
server.Close()
wg.Wait()
}
162 changes: 162 additions & 0 deletions reuse_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright (c) 2023 Meng Huang ([email protected])
// This package is licensed under a MIT license that can be found in the LICENSE file.

//go:build darwin || dragonfly || freebsd || netbsd || openbsd || linux
// +build darwin dragonfly freebsd netbsd openbsd linux

package netpoll

import (
"github.com/hslam/reuse"
"net"
"sync"
"testing"
"time"
)

func TestReuseServerPort(t *testing.T) {
network := "tcp"
addr := ":9999"
msg := "Hello World"
wg := sync.WaitGroup{}
servers := make([]*Server, 2)
var handler = &DataHandler{
NoShared: false,
NoCopy: false,
BufferSize: 1024,
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
},
}
for i := 0; i < 2; i++ {
server := &Server{
Network: network,
Address: addr,
Handler: handler,
NoAsync: false,
}
servers[i] = server
wg.Add(1)
go func() {
defer wg.Done()
server.ListenAndServe()
}()
}
time.Sleep(time.Millisecond * 100)
localPort := 8888
d := net.Dialer{LocalAddr: &net.TCPAddr{Port: localPort}, Control: reuse.Control}
conn, err := d.Dial(network, addr)
if err != nil {
t.Error("dial failed:", err)
return
}
if _, err := conn.Write([]byte(msg)); err != nil {
t.Error(err)
return
}
buf := make([]byte, 1024)
if n, err := conn.Read(buf); err != nil {
t.Error(err)
return
} else if n != len(msg) {
t.Errorf("%d %d", n, len(msg))
}
conn.Close()
for i := 0; i < 2; i++ {
servers[i].Close()
}
wg.Wait()
}

func TestReuseClientPort(t *testing.T) {
network := "tcp"
addr1 := ":9997"
addr2 := ":9998"
msg := "Hello World"
wg := sync.WaitGroup{}
var handler = &DataHandler{
NoShared: false,
NoCopy: false,
BufferSize: 1024,
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
},
}
server1 := &Server{
Network: network,
Address: addr1,
Handler: handler,
NoAsync: false,
}
wg.Add(1)
go func() {
defer wg.Done()
server1.ListenAndServe()
}()
server2 := &Server{
Network: network,
Address: addr2,
Handler: handler,
NoAsync: false,
}
wg.Add(1)
go func() {
defer wg.Done()
server2.ListenAndServe()
}()
time.Sleep(time.Millisecond * 100)
localPort := 8888
d := net.Dialer{LocalAddr: &net.TCPAddr{Port: localPort}, Control: reuse.Control}
wg.Add(1)
go func() {
defer wg.Done()
conn, err := d.Dial(network, addr1)
time.Sleep(time.Millisecond * 100)
if err != nil {
t.Error("dial failed:", err)
return
}
if _, err := conn.Write([]byte(msg)); err != nil {
t.Error(err)
return
}
buf := make([]byte, 1024)
if n, err := conn.Read(buf); err != nil {
t.Error(err)
return
} else if n != len(msg) {
t.Errorf("%d %d", n, len(msg))
}
time.Sleep(time.Millisecond * 500)
conn.Close()
}()
wg.Add(1)
go func() {
defer wg.Done()
conn, err := d.Dial(network, addr2)
time.Sleep(time.Millisecond * 100)
if err != nil {
t.Error("dial failed:", err)
return
}
if _, err := conn.Write([]byte(msg)); err != nil {
t.Error(err)
return
}
buf := make([]byte, 1024)
if n, err := conn.Read(buf); err != nil {
t.Error(err)
return
} else if n != len(msg) {
t.Errorf("%d %d", n, len(msg))
}
time.Sleep(time.Millisecond * 500)
conn.Close()
}()
time.Sleep(time.Second)
server1.Close()
server2.Close()
wg.Wait()
}

0 comments on commit be45992

Please sign in to comment.