Skip to content

Commit

Permalink
added workerpool and reconfiged netfilter to use it
Browse files Browse the repository at this point in the history
  • Loading branch information
imgurbot12 committed Jan 20, 2018
1 parent a399a1c commit 307f34d
Show file tree
Hide file tree
Showing 2 changed files with 317 additions and 65 deletions.
122 changes: 57 additions & 65 deletions netfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,93 +4,92 @@ import (
"log"
"os"
"os/signal"
"sync"
"syscall"

netfilter "github.com/AkihiroSuda/go-netfilter-queue"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
netfilter "github.com/AkihiroSuda/go-netfilter-queue"
)

/***Variables***/

type NetfilterQueue struct {
type NetFilterQueue struct {
// Set Variables
Handler func(*RBKV, *PacketData) netfilter.Verdict
QueueNum uint16
MaxWorkers int
Handler func(*log.Logger, *RBKV, *PacketData) netfilter.Verdict
QueueNum uint16
LogAllErrors bool
Logger *log.Logger

// queue handler objects
nfq *netfilter.NFQueue
pktQueue <-chan netfilter.NFPacket

// worker/class handler objects
started bool
wg sync.WaitGroup
wp *workerPool
}

/***Methods***/

//(*NetfilterQueue).Start : spawn nfq instance and start collecting packets
func (queue *NetfilterQueue) Start() {
//(*NetFilterQueue).start : spawn nfq instance and start collecting packets
func (q *NetFilterQueue) start() {
// check if already started
if queue.started {
log.Fatalf("NFQueue %d ALREADY STARTED!\n", queue.QueueNum)
if q.wp != nil {
q.Logger.Fatalf("NFQueue %d ALREADY STARTED!\n", q.QueueNum)
}
// spawn netfilter queue instance and start collecting packets
var err error
queue.nfq, err = netfilter.NewNFQueue(queue.QueueNum, 100, netfilter.NF_DEFAULT_PACKET_SIZE)
q.nfq, err = netfilter.NewNFQueue(q.QueueNum, 100, netfilter.NF_DEFAULT_PACKET_SIZE)
if err != nil {
log.Fatalf("NFQueue %d Error: %s\n", queue.QueueNum, err.Error())
log.Fatalf("NFQueue %d Error: %s\n", q.QueueNum, err.Error())
}
log.Printf("NFQueue: %d Initialized! Starting Workers...\n", queue.QueueNum)
log.Printf("NFQueue: %d Initialized! Starting WorkerPool...\n", q.QueueNum)
// set packet queue and started boolean
queue.pktQueue = queue.nfq.GetPackets()
queue.started = true
// start max number of workers
for i := 0; i < queue.MaxWorkers; i++ {
go queue.worker()
queue.wg.Add(1)
q.pktQueue = q.nfq.GetPackets()
// spawn workerpool
q.wp = &workerPool{
WorkerFunc: q.handlePacket,
MaxWorkersCount: 10 * 1024,
LogAllErrors: q.LogAllErrors,
Logger: q.Logger,
}
log.Println("Workers Started!")
}

//(*NetfilterQueue).Wait : wait for threads to finish FOREVER!!! (A really long time)
func (queue *NetfilterQueue) Wait() {
queue.wg.Wait()
q.wp.Start()
}

//(*NetfilterQueue).Stop : close nfq instance and stop collecting packets
func (queue *NetfilterQueue) Stop() {
//(*NetFilterQueue).stop : close nfq instance and stop collecting packets
func (q *NetFilterQueue) stop() {
// check if not started
if !queue.started {
log.Fatalf("NFQueue %d NEVER STARTED!\n", queue.QueueNum)
if q.wp == nil {
log.Fatalf("NFQueue %d NEVER STARTED!\n", q.QueueNum)
}
// close queue instance
queue.nfq.Close()
// close packet queue and set started boolean
queue.pktQueue = nil
queue.started = false
// close/stop everything
q.nfq.Close()
q.wp.Stop()
q.pktQueue = nil
}

func (queue *NetfilterQueue) Run() {
//(*NetFilterQueue).Run : run nfq indefinably and block until interrupt
func (q *NetFilterQueue) Run() {
// start netfilter queue instance
queue.Start()
// handle interupts
q.start()
// handle interrupts
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
for sig := range c {
log.Fatalf("Captured Signal: %v! Cleaning up...", sig)
queue.Stop()
log.Fatalf("Captured Signal: %s! Cleaning up...", sig.String())
q.stop()
}
}()
// wait possibly forever
queue.Wait()
// handle incoming packets
var p netfilter.NFPacket
for {
p = <- q.pktQueue
if !q.wp.Serve(p) {
log.Println("worker error! serving connection failed!")
}
}
}

//(*NetfilterQueue).parsePacket : parse gopacket and return collected packet data
func (queue *NetfilterQueue) parsePacket(packetin gopacket.Packet, packetout *PacketData) {
//(*NetFilterQueue).parsePacket : parse gopacket and return collected packet data
func (q *NetFilterQueue) parsePacket(packetin gopacket.Packet, packetout *PacketData) {
//get src and dst ip from ipv4
ipLayer := packetin.Layer(layers.LayerTypeIPv4)
if ipLayer != nil {
Expand All @@ -108,25 +107,18 @@ func (queue *NetfilterQueue) parsePacket(packetin gopacket.Packet, packetout *Pa
}
}

//(*NetfilterQueue).worker : worker instance used to set the verdict for queued packets
func (queue *NetfilterQueue) worker() {
// defer waitgroup completion
defer queue.wg.Done()
//(*NetFilterQueue).worker : worker instance used to set the verdict for queued packets
func (q *NetFilterQueue) handlePacket(p netfilter.NFPacket) error {
// init variables for packet handling
var (
nfqPacket netfilter.NFPacket //Reused netfilter packet object
dataPacket PacketData //Reused parsed packet data as struct
redblackkv *RBKV = NewRedBlackKV() //Reused key/value pair for red black tree caches
dataPacket PacketData //Reused parsed packet data as struct
redBlackKV = &RBKV{} //Reused key/value pair for red black tree caches
)
// loop while running forever
for queue.started {
// collect verdict packet from netfilerqueu
nfqPacket = <-queue.pktQueue
// parse packet for required information
queue.parsePacket(nfqPacket.Packet, &dataPacket)
// complete logic go get verfict on packet and set verdict
nfqPacket.SetVerdict(
queue.Handler(redblackkv, &dataPacket),
)
}
// parse packet for required information
q.parsePacket(p.Packet, &dataPacket)
// complete logic go get verdict on packet and set verdict
p.SetVerdict(
q.Handler(q.Logger, redBlackKV, &dataPacket),
)
return nil
}
Loading

0 comments on commit 307f34d

Please sign in to comment.