Skip to content

Commit

Permalink
fix: Broadcast of post to sse blocks execution (#11)
Browse files Browse the repository at this point in the history
It seems some issues around the Broadcast functionality was blocking my
entire post db writer causing the feed to go stale as no posts were
being written.

Two fixes have been implemented for this. First the Broadcast receiver
function no longer requests a mutex lock. If it tries to send a post to
a closed or nil channel, we don't really care. Locking for every post
coming in would likely lead to situations where multiple calls to
Broadcast would be waiting for the previous to complete sending channel
events.

Secondly the Broadcast call is called in a go routine (i.e. go
server.Broadcast(post)). This should allow the post subscriber in the
serve command to quickly process the posts regardless of how long the
broadcast function uses to loop over SSE clients.
  • Loading branch information
snorremd authored Nov 9, 2023
1 parent ed95262 commit 0a5c8eb
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
5 changes: 4 additions & 1 deletion cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func serveCmd() *cli.Command {
Value: 3000,
},
},

Action: func(ctx *cli.Context) error {

log.Info("Starting Norsky feed generator")
Expand Down Expand Up @@ -100,9 +101,11 @@ func serveCmd() *cli.Command {
go func() {
for post := range postChan {
switch post := post.(type) {
// Don't crash if broadcast fails
case models.CreatePostEvent:
dbPostChan <- post
broadcaster.Broadcast(post) // Broadcast new post to SSE clients
// Broadcast without blocking
go broadcaster.Broadcast(post) // Broadcast new post to SSE clients
default:
dbPostChan <- post
}
Expand Down
9 changes: 5 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ func (b *Broadcaster) Broadcast(post models.CreatePostEvent) {
log.WithFields(log.Fields{
"clients": len(b.clients),
}).Info("Broadcasting post to SSE clients")

b.Lock()
defer b.Unlock()
for _, client := range b.clients {
client <- post
}
Expand All @@ -82,8 +79,12 @@ func (b *Broadcaster) AddClient(key string, client chan models.CreatePostEvent)
func (b *Broadcaster) RemoveClient(key string) {
b.Lock()
defer b.Unlock()
// Check if client channel exists in map
if _, ok := b.clients[key]; !ok {
close(b.clients[key])
// Close the channel unless it's already closed
if b.clients[key] != nil {
close(b.clients[key])
}
delete(b.clients, key)
}

Expand Down

0 comments on commit 0a5c8eb

Please sign in to comment.