Skip to content

Commit

Permalink
restructure eventbridge server
Browse files Browse the repository at this point in the history
  • Loading branch information
AnatoleAM committed Dec 15, 2023
1 parent 8bb8a27 commit 905c758
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 46 deletions.
101 changes: 55 additions & 46 deletions internal/api/eventbridge/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,38 @@ import (

const SESSION_ID_KEY = utils.Key("session_id")

func handle(gctx global.Context, body []byte) ([]events.Message[json.RawMessage], error) {
type eventBridge struct {
gctx global.Context
}

// The EventAPI Bridge allows passing commands from the eventapi via the websocket
func New(gctx global.Context) <-chan struct{} {
done := make(chan struct{})

createUserStateLoader(gctx)

evb := eventBridge{gctx}

go func() {
http.HandleFunc("/", evb.handleEventBridgeOp)

err := http.ListenAndServe(gctx.Config().EventBridge.Bind, nil)
if err != nil {
zap.S().Errorw("eventapi bridge failed", "error", err)

close(done)
}
}()

go func() {
<-gctx.Done()
close(done)
}()

return done
}

func (evb *eventBridge) handle(gctx global.Context, body []byte) ([]events.Message[json.RawMessage], error) {
var err error

req := getCommandBody[json.RawMessage](body)
Expand All @@ -36,59 +67,37 @@ func handle(gctx global.Context, body []byte) ([]events.Message[json.RawMessage]
return result, err
}

// The EventAPI Bridge allows passing commands from the eventapi via the websocket
func New(gctx global.Context) <-chan struct{} {
done := make(chan struct{})

createUserStateLoader(gctx)

go func() {
err := http.ListenAndServe(gctx.Config().EventBridge.Bind, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var err error

// read body into byte slice
if r.Body == nil {
zap.S().Errorw("invalid eventapi bridge message", "err", "empty body")
}

defer r.Body.Close()

var buf bytes.Buffer
if _, err = buf.ReadFrom(r.Body); err != nil {
zap.S().Errorw("invalid eventapi bridge message", "err", err)

return
}
func (evb *eventBridge) handleEventBridgeOp(w http.ResponseWriter, r *http.Request) {
var err error

result, err := handle(gctx, buf.Bytes())
if err != nil {
zap.S().Errorw("eventapi bridge command failed", "error", err)
}
// read body into byte slice
if r.Body == nil {
zap.S().Errorw("invalid eventapi bridge message", "err", "empty body")
}

if result == nil {
result = []events.Message[json.RawMessage]{}
}
defer r.Body.Close()

if err := json.NewEncoder(w).Encode(result); err != nil {
zap.S().Errorw("eventapi bridge command failed", "error", err)
}
var buf bytes.Buffer
if _, err = buf.ReadFrom(r.Body); err != nil {
zap.S().Errorw("invalid eventapi bridge message", "err", err)

w.WriteHeader(200)
}))
return
}

if err != nil {
zap.S().Errorw("eventapi bridge failed", "error", err)
result, err := evb.handle(evb.gctx, buf.Bytes())
if err != nil {
zap.S().Errorw("eventapi bridge command failed", "error", err)
}

close(done)
}
}()
if result == nil {
result = []events.Message[json.RawMessage]{}
}

go func() {
<-gctx.Done()
close(done)
}()
if err := json.NewEncoder(w).Encode(result); err != nil {
zap.S().Errorw("eventapi bridge command failed", "error", err)
}

return done
w.WriteHeader(200)
}

func getCommandBody[T events.BridgedCommandBody](body []byte) events.BridgedCommandPayload[T] {
Expand Down

0 comments on commit 905c758

Please sign in to comment.