From 3fa933a5475ac43e9d56cc75e8bc809b1e4a5b05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Snorre=20Magnus=20Dav=C3=B8en?= Date: Sat, 21 Oct 2023 23:24:46 +0200 Subject: [PATCH] feat: Show posts from firehose on dashboard Add a list view of up to 500 firehose posts on the dashboard. This new feature introduces some changes in the plumbing in the serve command. Prevously there were only one channel that would allow the firehose to send create posts events to the db writer. Now both the db writer and server is interested in these events. Further more one channel is not enough for the server use case. In golang channels are single receiver. Even in a fan out pattern with multiple listeners, a message will be read by at most one receiver! To enable 1 to N broadcasting you need N channels. This commit introduces a new Broadcaster receiver struct that extends the sync.Mutex struct to handle locking so that go routines don't get into conflict when using the broadcaster. It contains a map of uuids keys (to identify a single SSE channel) and channel values. When a post is posted by the firehose to its channel glue code picks it up and passes it on to every channel in the broadcaster struct. To pass the posts to the client side dashboard Server Sent Events are used. The client creates an EventSource to initate the SSE connection. Upon connection the go server creates a channel and adds it to the broadcaster map. The first message passed by the server is the key identifying the SSE channel. This way the client can send a DELETE request before disconnecting to allow the server to clean up immediately. Otherwise the connection is held open until the underlying TCP connection disconnects and the server can no longer write to the connection. This can take a bit of time. The client shows the posts in a div which takes up at most 60% of the viewport height. it then uses scroll inside the element to allow scrolling posts. It only shows the data available in the firehose post format which means usernames and profile pictures are not available. --- cmd/serve.go | 37 +++++++-- db/reader.go | 20 ++--- db/writer.go | 5 +- feeds/feeds.go | 2 +- firehose/firehose.go | 1 + frontend/App.tsx | 104 ++++++++++++++++++++++++-- frontend/index.css | 15 ++++ go.mod | 6 +- go.sum | 12 +-- models/models.go | 20 +++-- server/server.go | 174 +++++++++++++++++++++++++++++++++++-------- 11 files changed, 318 insertions(+), 78 deletions(-) diff --git a/cmd/serve.go b/cmd/serve.go index e9979dd..5e1089d 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -8,6 +8,7 @@ import ( "fmt" "norsky/db" "norsky/firehose" + "norsky/models" "norsky/server" "os" "os/signal" @@ -80,26 +81,48 @@ func serveCmd() *cli.Command { // Channel for subscribing to bluesky posts postChan := make(chan interface{}) + dbPostChan := make(chan interface{}) // Channel for writing posts to the database + broadcaster := server.NewBroadcaster() // Setup the server and firehose app := server.Server(&server.ServerConfig{ - Hostname: hostname, - Reader: db.NewReader(database), + Hostname: hostname, + Reader: db.NewReader(database), + Broadcaster: broadcaster, }) fh := firehose.New(postChan, ctx.Context) - dbwriter := db.NewWriter(database, postChan) + + dbwriter := db.NewWriter(database, dbPostChan) // Graceful shutdown via wait group c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) var wg sync.WaitGroup + // Graceful shutdown logic go func() { <-c fmt.Println("Gracefully shutting down...") - app.ShutdownWithTimeout(60 * time.Second) - defer wg.Add(-3) // Decrement the waitgroup counter by 2 after shutdown of server and firehose + app.ShutdownWithTimeout(5 * time.Second) // Wait 5 seconds for all connections to close fh.Shutdown() + broadcaster.Shutdown() + defer wg.Add(-4) // Decrement the waitgroup counter by 2 after shutdown of server and firehose + + }() + + // Some glue code to pass posts from the firehose to the database and/or broadcaster + // Ideally one might want to do this in a more elegant way + // TODO: Move broadcaster into server package, i.e. make server a receiver and handle broadcaster and fiber together + go func() { + for post := range postChan { + switch post := post.(type) { + case models.CreatePostEvent: + dbPostChan <- post + broadcaster.Broadcast(post) // Broadcast new post to SSE clients + default: + dbPostChan <- post + } + } }() go func() { @@ -109,6 +132,7 @@ func serveCmd() *cli.Command { go func() { fmt.Println("Starting server...") + if err := app.Listen(fmt.Sprintf("%s:%d", host, port)); err != nil { log.Error(err) c <- os.Interrupt @@ -121,13 +145,12 @@ func serveCmd() *cli.Command { }() // Wait for both the server and firehose to shutdown - wg.Add(3) + wg.Add(4) wg.Wait() log.Info("Norsky feed generator stopped") return nil - }, } } diff --git a/db/reader.go b/db/reader.go index abfbf3a..597e40e 100644 --- a/db/reader.go +++ b/db/reader.go @@ -4,11 +4,9 @@ import ( "database/sql" "norsky/models" "strconv" - "strings" "time" sqlbuilder "github.com/huandu/go-sqlbuilder" - log "github.com/sirupsen/logrus" ) type Reader struct { @@ -26,12 +24,12 @@ func NewReader(database string) *Reader { } } -func (reader *Reader) GetFeed(lang string, limit int, postId int64) ([]models.Post, error) { +func (reader *Reader) GetFeed(lang string, limit int, postId int64) ([]models.FeedPost, error) { // Return next limit posts after cursor, ordered by created_at and uri sb := sqlbuilder.NewSelectBuilder() - sb.Select("id", "uri", "created_at", "group_concat(language)").From("posts") + sb.Select("id", "uri").From("posts") if postId != 0 { sb.Where( sb.LessThan("id", postId), @@ -53,18 +51,15 @@ func (reader *Reader) GetFeed(lang string, limit int, postId int64) ([]models.Po defer rows.Close() // Scan rows, collapse languages into single post - var posts []models.Post + var posts []models.FeedPost for rows.Next() { - var post models.Post - var langs string + var post models.FeedPost // Scan row and - if err := rows.Scan(&post.Id, &post.Uri, &post.CreatedAt, &langs); err != nil { + if err := rows.Scan(&post.Id, &post.Uri); err != nil { return nil, err } - // Split languages into a slice and add to the post model - post.Languages = strings.Split(langs, ",") posts = append(posts, post) } @@ -125,11 +120,6 @@ func (reader *Reader) GetPostCountPerTime(lang string, timeAgg string) ([]models sql, args := sb.BuildWithFlavor(sqlbuilder.Flavor(sqlbuilder.SQLite)) - log.WithFields(log.Fields{ - "sql": sql, - "args": args, - }).Info("Get posts per hour") - rows, err := reader.db.Query(sql, args...) if err != nil { return nil, err diff --git a/db/writer.go b/db/writer.go index 76aa05b..a7cf4a7 100644 --- a/db/writer.go +++ b/db/writer.go @@ -39,11 +39,10 @@ func (writer *Writer) Subscribe() { } case post := <-writer.postChan: - log.WithField("post", post).Info("Received post") switch post := post.(type) { - case models.CreatePostEvent: + case *models.CreatePostEvent: createPost(writer.db, post.Post) - case models.DeletePostEvent: + case *models.DeletePostEvent: deletePost(writer.db, post.Post) default: log.Info("Unknown post type") diff --git a/feeds/feeds.go b/feeds/feeds.go index 582d6d7..2a1edee 100644 --- a/feeds/feeds.go +++ b/feeds/feeds.go @@ -37,7 +37,7 @@ func genericAlgo(reader *db.Reader, cursor string, limit int, lang string) (*mod } if posts == nil { - posts = []models.Post{} + posts = []models.FeedPost{} } var nextCursor *string diff --git a/firehose/firehose.go b/firehose/firehose.go index 9452f85..01e4add 100644 --- a/firehose/firehose.go +++ b/firehose/firehose.go @@ -75,6 +75,7 @@ func (firehose *Firehose) Subscribe() { } func (firehose *Firehose) Shutdown() { + // TODO: Graceful shutdown here as "Error handling repo stream: read tcp use of closed network connection " firehose.scheduler.Shutdown() firehose.conn.Close() fmt.Println("Firehose shutdown") diff --git a/frontend/App.tsx b/frontend/App.tsx index 408aa8d..eb0da7e 100644 --- a/frontend/App.tsx +++ b/frontend/App.tsx @@ -5,13 +5,14 @@ import { createResource, Resource, Accessor, - createComputed, createMemo, - createEffect, + onCleanup, + For, } from "solid-js"; import { Chart, Title, Tooltip, Legend, Colors, TimeScale, ChartDataset, ChartType, Point, TimeUnit, TimeSeriesScale } from "chart.js"; import { Line } from "solid-chartjs"; import { type ChartData, type ChartOptions } from "chart.js"; +import { formatRelative } from 'date-fns' import colors from "tailwindcss/colors"; import "chartjs-adapter-date-fns"; @@ -157,7 +158,7 @@ const PostPerHourChart: Component<{ data: Resource, time: Accessor ); @@ -178,18 +179,110 @@ const PostPerTime: Component<{ const [data] = createResource(() => [time(), lang] as const, fetcher); return ( -
+

{label}

); }; +interface Post { + createdAt: number, + languages: string[] + text: string + uri: string +} + +const langToName = (lang: string): string => { + switch (lang) { + case "nb": + return "Norwegian bokmål" + case "nn": + return "Norwegian nynorsk" + case "smi": + return "Sami" + default: + return lang + } +} + +const PostFirehose: Component = () => { + const [key, setKey] = createSignal(); // Used to politely close the event source + const [posts, setPosts] = createSignal([]); + const [eventSource, setEventSource] = createSignal(null); + + onMount(() => { + console.log("Mounting event source") + const es = new EventSource(`${host}/dashboard/feed/sse`); + setEventSource(es); + + es.onmessage = (e) => { + if(key() === undefined) { + setKey(e.data); + return; + } + console.log("Message received", e); + const post = JSON.parse(e.data) as Post; + setPosts((posts) => [post, ...posts.slice(0, 499)]); // Limit to 500 posts + }; + }); + + const close = async () => { + console.log("Closing event source"); + eventSource()?.close(); + await fetch(`${host}/dashboard/feed/sse?key=${key()}`, { method: "DELETE" }) + } + + if (import.meta.hot) { + import.meta.hot.accept(close); + } + + window.addEventListener("beforeunload", close) + + + // Display a pretty list of the posts + // Set a max height and use overflow-y: scroll to make it scrollable + // Height should be whatever the parent is. + + return
+

Recent posts

+
+ + {(post) => { + const createdAt = formatRelative(new Date(post.createdAt * 1000), new Date()) + // Match regex to get the profile and post id + // URI example: at://did:plc:opkjeuzx2lego6a7gueytryu/app.bsky.feed.post/3kcbxsslpu623 + // profile = did:plc:opkjeuzx2lego6a7gueytryu + // post = 3kcbxsslpu623 + + const regex = /at:\/\/(did:plc:[a-z0-9]+)\/app.bsky.feed.post\/([a-z0-9]+)/ + const [profile, postId] = regex.exec(post.uri)!.slice(1) + const bskyLink = `https://bsky.app/profile/${profile}/post/${postId}` + return
+
+

{createdAt}

+

{post.languages.map(langToName).join(", ")}

+
+

{post.text}

+ + {/* Link to post on Bluesky */} + +
+ }} +
+
+
; +} + + + const App: Component = () => { const [time, setTime] = createSignal("hour"); return ( -
+
{/* Add a header here showing the Norsky logo and the name */}
Norsky logo @@ -213,6 +306,7 @@ const App: Component = () => { +
diff --git a/frontend/index.css b/frontend/index.css index b5c61c9..810f27b 100644 --- a/frontend/index.css +++ b/frontend/index.css @@ -1,3 +1,18 @@ @tailwind base; @tailwind components; @tailwind utilities; + +@layer utilities { + @variants responsive { + /* Hide scrollbar for Chrome, Safari and Opera */ + .no-scrollbar::-webkit-scrollbar { + display: none; + } + + /* Hide scrollbar for IE, Edge and Firefox */ + .no-scrollbar { + -ms-overflow-style: none; /* IE and Edge */ + scrollbar-width: none; /* Firefox */ + } + } + } \ No newline at end of file diff --git a/go.mod b/go.mod index 9306062..0b751ce 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/bluesky-social/indigo v0.0.0-20230919180850-251fff6498dc github.com/cenkalti/backoff/v4 v4.2.1 github.com/cqroot/prompt v0.9.1 - github.com/gofiber/fiber/v2 v2.49.2 + github.com/gofiber/fiber/v2 v2.50.0 github.com/golang-migrate/migrate/v4 v4.16.2 github.com/gorilla/websocket v1.5.0 github.com/huandu/go-sqlbuilder v1.22.0 @@ -15,7 +15,7 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/strideynet/bsky-furry-feed v0.0.37 github.com/urfave/cli/v2 v2.25.1 - github.com/valyala/fasthttp v1.49.0 + github.com/valyala/fasthttp v1.50.0 golang.org/x/crypto/x509roots/fallback v0.0.0-20230928175846-ec07f4e35b9e ) @@ -116,7 +116,7 @@ require ( golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect golang.org/x/mod v0.12.0 // indirect golang.org/x/sync v0.3.0 // indirect - golang.org/x/sys v0.12.0 // indirect + golang.org/x/sys v0.13.0 // indirect golang.org/x/term v0.11.0 // indirect golang.org/x/text v0.12.0 // indirect golang.org/x/time v0.3.0 // indirect diff --git a/go.sum b/go.sum index d4806b5..dce3e3b 100644 --- a/go.sum +++ b/go.sum @@ -54,8 +54,8 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= -github.com/gofiber/fiber/v2 v2.49.2 h1:ONEN3/Vc+dUCxxDgZZwpqvhISgHqb+bu+isBiEyKEQs= -github.com/gofiber/fiber/v2 v2.49.2/go.mod h1:gNsKnyrmfEWFpJxQAV0qvW6l70K1dZGno12oLtukcts= +github.com/gofiber/fiber/v2 v2.50.0 h1:ia0JaB+uw3GpNSCR5nvC5dsaxXjRU5OEu36aytx+zGw= +github.com/gofiber/fiber/v2 v2.50.0/go.mod h1:21eytvay9Is7S6z+OgPi7c7n4++tnClWmhpimVHMimw= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= @@ -398,8 +398,8 @@ github.com/urfave/cli/v2 v2.25.1 h1:zw8dSP7ghX0Gmm8vugrs6q9Ku0wzweqPyshy+syu9Gw= github.com/urfave/cli/v2 v2.25.1/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.49.0 h1:9FdvCpmxB74LH4dPb7IJ1cOSsluR07XG3I1txXWwJpE= -github.com/valyala/fasthttp v1.49.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA= +github.com/valyala/fasthttp v1.50.0 h1:H7fweIlBm0rXLs2q0XbalvJ6r0CUPFWK3/bB4N13e9M= +github.com/valyala/fasthttp v1.50.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= @@ -516,8 +516,8 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/models/models.go b/models/models.go index 6a35f80..483d1b8 100644 --- a/models/models.go +++ b/models/models.go @@ -4,11 +4,17 @@ import "time" // Post model with key fields from the post type Post struct { - Id int64 `json:"-"` - Uri string `json:"post"` - CreatedAt int64 `json:"-"` - Text string `json:"-"` - Languages []string `json:"-"` + Id int64 `json:"id"` + CreatedAt int64 `json:"createdAt"` + Text string `json:"text"` + Languages []string `json:"languages"` + Uri string `json:"uri"` +} + +// Omit all but the Uri field +type FeedPost struct { + Id int64 `json:"-"` + Uri string `json:"post"` } // CreateEvent fired when a new post is created @@ -27,8 +33,8 @@ type DeletePostEvent struct { } type FeedResponse struct { - Feed []Post `json:"feed"` - Cursor *string `json:"cursor"` + Feed []FeedPost `json:"feed"` + Cursor *string `json:"cursor"` } type PostsAggregatedByTime struct { diff --git a/server/server.go b/server/server.go index 72dd105..30c37ab 100644 --- a/server/server.go +++ b/server/server.go @@ -3,12 +3,15 @@ package server import ( "bufio" "embed" + "encoding/json" "fmt" "net/http" "norsky/db" "norsky/feeds" + "norsky/models" "strconv" "strings" + "sync" "time" "github.com/bluesky-social/indigo/api/bsky" @@ -18,6 +21,8 @@ import ( "github.com/gofiber/fiber/v2/middleware/compress" "github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/filesystem" + "github.com/gofiber/fiber/v2/middleware/requestid" + "github.com/google/uuid" log "github.com/sirupsen/logrus" "github.com/valyala/fasthttp" ) @@ -32,25 +37,77 @@ type ServerConfig struct { // The reader to use for reading posts Reader *db.Reader + + // Broadcast channels to pass posts to SSE clients + Broadcaster *Broadcaster } -// Returns a fiber.App instance to be used as an HTTP server for the norsky feed -func Server(config *ServerConfig) *fiber.App { +// Make it sync +type Broadcaster struct { + sync.Mutex + clients map[string]chan models.CreatePostEvent +} - app := fiber.New() +// Constructor +func NewBroadcaster() *Broadcaster { + return &Broadcaster{ + clients: make(map[string]chan models.CreatePostEvent), + } +} - app.Use(compress.New()) +func (b *Broadcaster) Broadcast(post models.CreatePostEvent) { + log.WithFields(log.Fields{ + "clients": len(b.clients), + }).Info("Broadcasting post to SSE clients") - // Setup CORS for localhost:3001 - app.Use(func(c *fiber.Ctx) error { - corsConfig := cors.Config{ - AllowOrigins: "*", - AllowCredentials: true, - } - return cors.New(corsConfig)(c) - }) + b.Lock() + defer b.Unlock() + for _, client := range b.clients { + client <- post + } +} - // Serve the assets +// Function to add a client to the broadcaster +func (b *Broadcaster) AddClient(key string, client chan models.CreatePostEvent) { + b.Lock() + defer b.Unlock() + b.clients[key] = client + log.WithFields(log.Fields{ + "key": key, + "count": len(b.clients), + }).Info("Adding client to broadcaster") +} + +// Function to remove a client from the broadcaster +func (b *Broadcaster) RemoveClient(key string) { + b.Lock() + defer b.Unlock() + if _, ok := b.clients[key]; !ok { + close(b.clients[key]) + delete(b.clients, key) + } + + log.WithFields(log.Fields{ + "key": key, + "count": len(b.clients), + }).Info("Removed client from broadcaster") +} + +func (b *Broadcaster) Shutdown() { + log.Info("Shutting down broadcaster") + b.Lock() + defer b.Unlock() + for _, client := range b.clients { + close(client) + } +} + +// Returns a fiber.App instance to be used as an HTTP server for the norsky feed +func Server(config *ServerConfig) *fiber.App { + + bc := config.Broadcaster + + app := fiber.New() // Middleware to track the latency of each request app.Use(func(c *fiber.Ctx) error { @@ -72,9 +129,34 @@ func Server(config *ServerConfig) *fiber.App { return err }) + app.Use(requestid.New(requestid.ConfigDefault)) + app.Use(compress.New()) + + // Setup CORS for localhost:3001 + app.Use(func(c *fiber.Ctx) error { + corsConfig := cors.Config{ + AllowOrigins: "*", + AllowHeaders: "Cache-Control", + AllowCredentials: true, + } + return cors.New(corsConfig)(c) + }) + + // Serve the assets + // Setup cache app.Use(cache.New(cache.Config{ Next: func(c *fiber.Ctx) bool { + + if c.Method() != "GET" { + return true + } + + // If the pathname ends with /sse, don't cache + if strings.HasSuffix(c.Path(), "/sse") { + return true + } + // Only cache dashboard requests if strings.HasPrefix(c.Path(), "/dashboard") { log.WithFields(log.Fields{ @@ -196,36 +278,66 @@ func Server(config *ServerConfig) *fiber.App { return c.Status(200).JSON(postsPerTime) }) - app.Get("/dashboard/feed", func(c *fiber.Ctx) error { - // Make a server sent event stream of the feed + app.Delete("/dashboard/feed/sse", func(c *fiber.Ctx) error { + // Get the feed query parameters and parse the limit + key := c.Query("key", "") + bc.RemoveClient(key) + return c.Status(200).SendString("OK") + }) + + app.Get("/dashboard/feed/sse", func(c *fiber.Ctx) error { c.Set("Content-Type", "text/event-stream") c.Set("Cache-Control", "no-cache") c.Set("Connection", "keep-alive") c.Set("Transfer-Encoding", "chunked") c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { - var i int + // Register the channel to write posts to so server can write to it + key := uuid.New().String() + sseChannel := make(chan models.CreatePostEvent) + aliveChan := time.NewTicker(5 * time.Second) + bc.AddClient(key, sseChannel) + + // A function to cleanup + cleanup := func() { + log.Info("Cleaning up SSE stream ", key) + // Remove sseChannel from the list of channels + bc.RemoveClient(key) + aliveChan.Stop() + } + + // Send the SSE key first so the client can close the connection when it wants to + fmt.Fprintf(w, "data: %s\n\n", key) + w.Flush() + for { - i++ - msg := fmt.Sprintf("%d", i) - fmt.Fprintf(w, "data: Message: %s\n\n", msg) - fmt.Println(msg) - - err := w.Flush() - if err != nil { - // Refreshing page in web browser will establish a new - // SSE connection, but only (the last) one is alive, so - // dead connections must be closed here. - fmt.Printf("Error while flushing: %v. Closing http connection.\n", err) - - break + select { + case <-aliveChan.C: + err := w.Flush() + if err != nil { + cleanup() + return + } + case post := <-sseChannel: + jsonPost, jsonErr := json.Marshal(post.Post) + + if jsonErr != nil { + log.Error("Error marshalling post", jsonErr) + break // Break out of the select statement as we have no post to write + } + + fmt.Fprintf(w, "data: %s\n\n", jsonPost) + err := w.Flush() + if err != nil { + cleanup() + return + } } - time.Sleep(2 * time.Second) } + })) return nil - }) // Serve the Solid dashboard