Skip to content

Commit

Permalink
Merge pull request #94 from nyrahul/main
Browse files Browse the repository at this point in the history
event channel support
  • Loading branch information
nyrahul authored Jun 28, 2022
2 parents 782b872 + c114afd commit 66830e3
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 38 deletions.
14 changes: 14 additions & 0 deletions .github/workflows/ci-go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,17 @@ jobs:
uses: morphy2k/revive-action@v2
with:
path: "./..."

unit-tests:
runs-on: ubuntu-latest
steps:
- name: Checkout Source
uses: actions/checkout@v2

- uses: actions/setup-go@v2
with:
go-version: v1.18

- name: Run unit tests
run: make test

4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ install: build
clean:
cd $(CURDIR); rm -f karmor

.PHONY: test
test:
cd $(CURDIR); go test -v ./...

.PHONY: protobuf
vm-protobuf:
cd $(CURDIR)/vm/protobuf; protoc --proto_path=. --go_opt=paths=source_relative --go_out=plugins=grpc:. vm.proto
Expand Down
58 changes: 36 additions & 22 deletions log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ type Options struct {
Resource string
Limit uint32
Selector []string
EventChan chan interface{} // channel to send events on
}

// StopChan Channel
var StopChan chan struct{}
var sigChan chan os.Signal
var unblockSignal = false

// GetOSSigChannel Function
func GetOSSigChannel() chan os.Signal {
Expand Down Expand Up @@ -95,21 +98,25 @@ func regexCompile(o Options) error {
return nil
}

func closeStopChan() {
if StopChan == nil {
return
}
close(StopChan)
StopChan = nil
}

// StartObserver Function
func StartObserver(o Options) error {
gRPC := ""
gRPC := "localhost:32767"

if o.GRPC != "" {
gRPC = o.GRPC
} else {
if val, ok := os.LookupEnv("KUBEARMOR_SERVICE"); ok {
gRPC = val
} else {
gRPC = "localhost:32767"
}
} else if val, ok := os.LookupEnv("KUBEARMOR_SERVICE"); ok {
gRPC = val
}

fmt.Println("gRPC server: " + gRPC)
fmt.Fprintln(os.Stderr, "gRPC server: "+gRPC)

if o.MsgPath == "none" && o.LogPath == "none" {
flag.PrintDefaults()
Expand All @@ -126,18 +133,18 @@ func StartObserver(o Options) error {
if logClient == nil {
return errors.New("failed to connect to the gRPC server\nPossible troubleshooting:\n- Check if Kubearmor is running\n- Create a portforward to KubeArmor relay service using\n\t\033[1mkubectl -n kube-system port-forward service/kubearmor --address 0.0.0.0 --address :: 32767:32767\033[0m\n- Configure grpc server information using\n\t\033[1mkarmor log --grpc <info>\033[0m")
}
fmt.Printf("Created a gRPC client (%s)\n", gRPC)
fmt.Fprintf(os.Stderr, "Created a gRPC client (%s)\n", gRPC)

// do healthcheck
if ok := logClient.DoHealthCheck(); !ok {
return errors.New("failed to check the liveness of the gRPC server")
}
fmt.Println("Checked the liveness of the gRPC server")
fmt.Fprintln(os.Stderr, "Checked the liveness of the gRPC server")

if o.MsgPath != "none" {
// watch messages
go logClient.WatchMessages(o.MsgPath, o.JSON)
fmt.Println("Started to watch messages")
fmt.Fprintln(os.Stderr, "Started to watch messages")
}

err := regexCompile(o)
Expand All @@ -150,13 +157,13 @@ func StartObserver(o Options) error {
if o.LogFilter == "all" || o.LogFilter == "policy" {
// watch alerts
go logClient.WatchAlerts(o)
fmt.Println("Started to watch alerts")
fmt.Fprintln(os.Stderr, "Started to watch alerts")
}

if o.LogFilter == "all" || o.LogFilter == "system" {
// watch logs
go logClient.WatchLogs(o)
fmt.Println("Started to watch logs")
fmt.Fprintln(os.Stderr, "Started to watch logs")
}
}

Expand All @@ -169,19 +176,26 @@ func StartObserver(o Options) error {
}
} else {
// listen for interrupt signals
sigChan := GetOSSigChannel()
<-sigChan
unblockSignal = false
sigChan = GetOSSigChannel()
for !unblockSignal {
time.Sleep(50 * time.Millisecond)
select {
case <-sigChan:
unblockSignal = true
default:
}
}
}
close(StopChan)
fmt.Fprintln(os.Stderr, "releasing grpc client")
closeStopChan()

logClient.Running = false
time.Sleep(time.Second * 1)

// destroy the client
if err := logClient.DestroyClient(); err != nil {
return err
}
fmt.Println("Destroyed the gRPC client")
return logClient.DestroyClient()
}

return nil
func StopObserver() {
unblockSignal = true
}
34 changes: 18 additions & 16 deletions log/logClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,28 @@ func StrToFile(str, destFile string) {
if _, err := os.Stat(destFile); err != nil {
newFile, err := os.Create(filepath.Clean(destFile))
if err != nil {
fmt.Printf("Failed to create a file (%s, %s)\n", destFile, err.Error())
fmt.Fprintf(os.Stderr, "Failed to create a file (%s, %s)\n", destFile, err.Error())
return
}
if err := newFile.Close(); err != nil {
fmt.Printf("Failed to close the file (%s, %s)\n", destFile, err.Error())
fmt.Fprintf(os.Stderr, "Failed to close the file (%s, %s)\n", destFile, err.Error())
}
}

// #nosec
file, err := os.OpenFile(destFile, os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
fmt.Printf("Failed to open a file (%s, %s)\n", destFile, err.Error())
fmt.Fprintf(os.Stderr, "Failed to open a file (%s, %s)\n", destFile, err.Error())
}
defer func() {
if err := file.Close(); err != nil {
fmt.Printf("Failed to close the file (%s, %s)\n", destFile, err.Error())
fmt.Fprintf(os.Stderr, "Failed to close the file (%s, %s)\n", destFile, err.Error())
}
}()

_, err = file.WriteString(str)
if err != nil {
fmt.Printf("Failed to write a string into the file (%s, %s)\n", destFile, err.Error())
fmt.Fprintf(os.Stderr, "Failed to write a string into the file (%s, %s)\n", destFile, err.Error())
}
}

Expand Down Expand Up @@ -174,7 +174,7 @@ func (fd *Feeder) WatchMessages(msgPath string, jsonFormat bool) error {
for fd.Running {
res, err := fd.msgStream.Recv()
if err != nil {
fmt.Printf("Failed to receive a message (%s)\n", err.Error())
fmt.Fprintf(os.Stderr, "Failed to receive a message (%s)\n", err.Error())
break
}

Expand All @@ -192,12 +192,12 @@ func (fd *Feeder) WatchMessages(msgPath string, jsonFormat bool) error {

if msgPath == "stdout" {
fmt.Printf("%s", str)
} else {
} else if msgPath != "" {
StrToFile(str, msgPath)
}
}

fmt.Println("Stopped WatchMessages")
fmt.Fprintln(os.Stderr, "Stopped WatchMessages")

return nil
}
Expand Down Expand Up @@ -271,6 +271,9 @@ func watchAlertsHelper(res *pb.Alert, o Options) error {

str := ""

if o.EventChan != nil {
o.EventChan <- *res
}
if o.JSON {
arr, _ := json.Marshal(res)
str = fmt.Sprintf("%s\n", string(arr))
Expand Down Expand Up @@ -325,7 +328,7 @@ func watchAlertsHelper(res *pb.Alert, o Options) error {

if o.LogPath == "stdout" {
fmt.Printf("%s", str)
} else {
} else if o.LogPath != "" {
StrToFile(str, o.LogPath)
}
return nil
Expand All @@ -340,7 +343,6 @@ func (fd *Feeder) WatchAlerts(o Options) error {
for i = 0; i < o.Limit; i++ {
res, err := fd.alertStream.Recv()
if err != nil {
fmt.Printf("Failed to receive an alert (%s)\n", err.Error())
break
}
_ = watchAlertsHelper(res, o)
Expand All @@ -352,15 +354,14 @@ func (fd *Feeder) WatchAlerts(o Options) error {
for fd.Running {
res, err := fd.alertStream.Recv()
if err != nil {
fmt.Printf("Failed to receive an alert (%s)\n", err.Error())
break
}
_ = watchAlertsHelper(res, o)

}
}

fmt.Println("Stopped WatchAlerts")
fmt.Fprintln(os.Stderr, "Stopped WatchAlerts")

return nil
}
Expand Down Expand Up @@ -425,6 +426,9 @@ func WatchLogsHelper(res *pb.Log, o Options) error {

str := ""

if o.EventChan != nil {
o.EventChan <- *res
}
if o.JSON {
arr, _ := json.Marshal(res)
str = fmt.Sprintf("%s\n", string(arr))
Expand Down Expand Up @@ -459,7 +463,7 @@ func WatchLogsHelper(res *pb.Log, o Options) error {

if o.LogPath == "stdout" {
fmt.Printf("%s", str)
} else {
} else if o.LogPath != "" {
StrToFile(str, o.LogPath)
}
return nil
Expand All @@ -475,7 +479,6 @@ func (fd *Feeder) WatchLogs(o Options) error {
for i = 0; i < o.Limit; i++ {
res, err := fd.logStream.Recv()
if err != nil {
fmt.Printf("Failed to receive an alert (%s)\n", err.Error())
break
}
_ = WatchLogsHelper(res, o)
Expand All @@ -485,15 +488,14 @@ func (fd *Feeder) WatchLogs(o Options) error {
for fd.Running {
res, err := fd.logStream.Recv()
if err != nil {
fmt.Printf("Failed to receive an alert (%s)\n", err.Error())
break
}
_ = WatchLogsHelper(res, o)

}
}

fmt.Println("Stopped WatchLogs")
fmt.Fprintln(os.Stderr, "Stopped WatchLogs")

return nil
}
Expand Down
60 changes: 60 additions & 0 deletions log/logClient_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package log

import (
"fmt"
"reflect"
"testing"
"time"

pb "github.com/kubearmor/KubeArmor/protobuf"
)

var eventChan chan interface{}
var gotAlerts = 0
var gotLogs = 0

const maxEvents = 5

func waitOnEvent(cnt int) {
for i := 0; i < cnt; i++ {
evtin := <-eventChan
switch evt := evtin.(type) {
case pb.Alert:
gotAlerts++
case pb.Log:
gotLogs++
default:
fmt.Printf("unknown event rcvd %v\n", reflect.TypeOf(evt))
}
}
}

func TestLogClient(t *testing.T) {
var res = pb.Alert{
ClusterName: "breaking-bad",
HostName: "saymyname",
NamespaceName: "heisenberg",
PodName: "new-mexico",
Labels: "substance=meth,currency=usd",
ContainerID: "12345678901234567890",
ContainerName: "los-polos",
ContainerImage: "evergreen",
}
eventChan = make(chan interface{}, maxEvents)
var o = Options{
EventChan: eventChan,
}
for i := 0; i < maxEvents; i++ {
err := watchAlertsHelper(&res, o)
if err != nil {
t.Errorf("watchAlertsHelper failed\n")
}
}
go waitOnEvent(maxEvents)
for i := 0; i < 10 && gotAlerts < maxEvents; i++ {
time.Sleep(10 * time.Millisecond)
}
if gotAlerts < maxEvents {
t.Errorf("did not receive all the events")
}
}

0 comments on commit 66830e3

Please sign in to comment.