diff --git a/cmd/serve.go b/cmd/serve.go index e9979dd..5e1089d 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -8,6 +8,7 @@ import ( "fmt" "norsky/db" "norsky/firehose" + "norsky/models" "norsky/server" "os" "os/signal" @@ -80,26 +81,48 @@ func serveCmd() *cli.Command { // Channel for subscribing to bluesky posts postChan := make(chan interface{}) + dbPostChan := make(chan interface{}) // Channel for writing posts to the database + broadcaster := server.NewBroadcaster() // Setup the server and firehose app := server.Server(&server.ServerConfig{ - Hostname: hostname, - Reader: db.NewReader(database), + Hostname: hostname, + Reader: db.NewReader(database), + Broadcaster: broadcaster, }) fh := firehose.New(postChan, ctx.Context) - dbwriter := db.NewWriter(database, postChan) + + dbwriter := db.NewWriter(database, dbPostChan) // Graceful shutdown via wait group c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) var wg sync.WaitGroup + // Graceful shutdown logic go func() { <-c fmt.Println("Gracefully shutting down...") - app.ShutdownWithTimeout(60 * time.Second) - defer wg.Add(-3) // Decrement the waitgroup counter by 2 after shutdown of server and firehose + app.ShutdownWithTimeout(5 * time.Second) // Wait 5 seconds for all connections to close fh.Shutdown() + broadcaster.Shutdown() + defer wg.Add(-4) // Decrement the waitgroup counter by 2 after shutdown of server and firehose + + }() + + // Some glue code to pass posts from the firehose to the database and/or broadcaster + // Ideally one might want to do this in a more elegant way + // TODO: Move broadcaster into server package, i.e. make server a receiver and handle broadcaster and fiber together + go func() { + for post := range postChan { + switch post := post.(type) { + case models.CreatePostEvent: + dbPostChan <- post + broadcaster.Broadcast(post) // Broadcast new post to SSE clients + default: + dbPostChan <- post + } + } }() go func() { @@ -109,6 +132,7 @@ func serveCmd() *cli.Command { go func() { fmt.Println("Starting server...") + if err := app.Listen(fmt.Sprintf("%s:%d", host, port)); err != nil { log.Error(err) c <- os.Interrupt @@ -121,13 +145,12 @@ func serveCmd() *cli.Command { }() // Wait for both the server and firehose to shutdown - wg.Add(3) + wg.Add(4) wg.Wait() log.Info("Norsky feed generator stopped") return nil - }, } } diff --git a/db/reader.go b/db/reader.go index abfbf3a..597e40e 100644 --- a/db/reader.go +++ b/db/reader.go @@ -4,11 +4,9 @@ import ( "database/sql" "norsky/models" "strconv" - "strings" "time" sqlbuilder "github.com/huandu/go-sqlbuilder" - log "github.com/sirupsen/logrus" ) type Reader struct { @@ -26,12 +24,12 @@ func NewReader(database string) *Reader { } } -func (reader *Reader) GetFeed(lang string, limit int, postId int64) ([]models.Post, error) { +func (reader *Reader) GetFeed(lang string, limit int, postId int64) ([]models.FeedPost, error) { // Return next limit posts after cursor, ordered by created_at and uri sb := sqlbuilder.NewSelectBuilder() - sb.Select("id", "uri", "created_at", "group_concat(language)").From("posts") + sb.Select("id", "uri").From("posts") if postId != 0 { sb.Where( sb.LessThan("id", postId), @@ -53,18 +51,15 @@ func (reader *Reader) GetFeed(lang string, limit int, postId int64) ([]models.Po defer rows.Close() // Scan rows, collapse languages into single post - var posts []models.Post + var posts []models.FeedPost for rows.Next() { - var post models.Post - var langs string + var post models.FeedPost // Scan row and - if err := rows.Scan(&post.Id, &post.Uri, &post.CreatedAt, &langs); err != nil { + if err := rows.Scan(&post.Id, &post.Uri); err != nil { return nil, err } - // Split languages into a slice and add to the post model - post.Languages = strings.Split(langs, ",") posts = append(posts, post) } @@ -125,11 +120,6 @@ func (reader *Reader) GetPostCountPerTime(lang string, timeAgg string) ([]models sql, args := sb.BuildWithFlavor(sqlbuilder.Flavor(sqlbuilder.SQLite)) - log.WithFields(log.Fields{ - "sql": sql, - "args": args, - }).Info("Get posts per hour") - rows, err := reader.db.Query(sql, args...) if err != nil { return nil, err diff --git a/db/writer.go b/db/writer.go index 76aa05b..a7cf4a7 100644 --- a/db/writer.go +++ b/db/writer.go @@ -39,11 +39,10 @@ func (writer *Writer) Subscribe() { } case post := <-writer.postChan: - log.WithField("post", post).Info("Received post") switch post := post.(type) { - case models.CreatePostEvent: + case *models.CreatePostEvent: createPost(writer.db, post.Post) - case models.DeletePostEvent: + case *models.DeletePostEvent: deletePost(writer.db, post.Post) default: log.Info("Unknown post type") diff --git a/feeds/feeds.go b/feeds/feeds.go index 582d6d7..2a1edee 100644 --- a/feeds/feeds.go +++ b/feeds/feeds.go @@ -37,7 +37,7 @@ func genericAlgo(reader *db.Reader, cursor string, limit int, lang string) (*mod } if posts == nil { - posts = []models.Post{} + posts = []models.FeedPost{} } var nextCursor *string diff --git a/firehose/firehose.go b/firehose/firehose.go index 9452f85..01e4add 100644 --- a/firehose/firehose.go +++ b/firehose/firehose.go @@ -75,6 +75,7 @@ func (firehose *Firehose) Subscribe() { } func (firehose *Firehose) Shutdown() { + // TODO: Graceful shutdown here as "Error handling repo stream: read tcp use of closed network connection " firehose.scheduler.Shutdown() firehose.conn.Close() fmt.Println("Firehose shutdown") diff --git a/frontend/App.tsx b/frontend/App.tsx index 408aa8d..eb0da7e 100644 --- a/frontend/App.tsx +++ b/frontend/App.tsx @@ -5,13 +5,14 @@ import { createResource, Resource, Accessor, - createComputed, createMemo, - createEffect, + onCleanup, + For, } from "solid-js"; import { Chart, Title, Tooltip, Legend, Colors, TimeScale, ChartDataset, ChartType, Point, TimeUnit, TimeSeriesScale } from "chart.js"; import { Line } from "solid-chartjs"; import { type ChartData, type ChartOptions } from "chart.js"; +import { formatRelative } from 'date-fns' import colors from "tailwindcss/colors"; import "chartjs-adapter-date-fns"; @@ -157,7 +158,7 @@ const PostPerHourChart: Component<{ data: Resource, time: Accessor ); @@ -178,18 +179,110 @@ const PostPerTime: Component<{ const [data] = createResource(() => [time(), lang] as const, fetcher); return ( -
+

{label}

); }; +interface Post { + createdAt: number, + languages: string[] + text: string + uri: string +} + +const langToName = (lang: string): string => { + switch (lang) { + case "nb": + return "Norwegian bokmål" + case "nn": + return "Norwegian nynorsk" + case "smi": + return "Sami" + default: + return lang + } +} + +const PostFirehose: Component = () => { + const [key, setKey] = createSignal(); // Used to politely close the event source + const [posts, setPosts] = createSignal([]); + const [eventSource, setEventSource] = createSignal(null); + + onMount(() => { + console.log("Mounting event source") + const es = new EventSource(`${host}/dashboard/feed/sse`); + setEventSource(es); + + es.onmessage = (e) => { + if(key() === undefined) { + setKey(e.data); + return; + } + console.log("Message received", e); + const post = JSON.parse(e.data) as Post; + setPosts((posts) => [post, ...posts.slice(0, 499)]); // Limit to 500 posts + }; + }); + + const close = async () => { + console.log("Closing event source"); + eventSource()?.close(); + await fetch(`${host}/dashboard/feed/sse?key=${key()}`, { method: "DELETE" }) + } + + if (import.meta.hot) { + import.meta.hot.accept(close); + } + + window.addEventListener("beforeunload", close) + + + // Display a pretty list of the posts + // Set a max height and use overflow-y: scroll to make it scrollable + // Height should be whatever the parent is. + + return
+

Recent posts

+
+ + {(post) => { + const createdAt = formatRelative(new Date(post.createdAt * 1000), new Date()) + // Match regex to get the profile and post id + // URI example: at://did:plc:opkjeuzx2lego6a7gueytryu/app.bsky.feed.post/3kcbxsslpu623 + // profile = did:plc:opkjeuzx2lego6a7gueytryu + // post = 3kcbxsslpu623 + + const regex = /at:\/\/(did:plc:[a-z0-9]+)\/app.bsky.feed.post\/([a-z0-9]+)/ + const [profile, postId] = regex.exec(post.uri)!.slice(1) + const bskyLink = `https://bsky.app/profile/${profile}/post/${postId}` + return
+
+

{createdAt}

+

{post.languages.map(langToName).join(", ")}

+
+

{post.text}

+ + {/* Link to post on Bluesky */} + +
+ }} +
+
+
; +} + + + const App: Component = () => { const [time, setTime] = createSignal("hour"); return ( -
+
{/* Add a header here showing the Norsky logo and the name */}
Norsky logo @@ -213,6 +306,7 @@ const App: Component = () => { +
diff --git a/frontend/index.css b/frontend/index.css index b5c61c9..810f27b 100644 --- a/frontend/index.css +++ b/frontend/index.css @@ -1,3 +1,18 @@ @tailwind base; @tailwind components; @tailwind utilities; + +@layer utilities { + @variants responsive { + /* Hide scrollbar for Chrome, Safari and Opera */ + .no-scrollbar::-webkit-scrollbar { + display: none; + } + + /* Hide scrollbar for IE, Edge and Firefox */ + .no-scrollbar { + -ms-overflow-style: none; /* IE and Edge */ + scrollbar-width: none; /* Firefox */ + } + } + } \ No newline at end of file diff --git a/go.mod b/go.mod index 9306062..0b751ce 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/bluesky-social/indigo v0.0.0-20230919180850-251fff6498dc github.com/cenkalti/backoff/v4 v4.2.1 github.com/cqroot/prompt v0.9.1 - github.com/gofiber/fiber/v2 v2.49.2 + github.com/gofiber/fiber/v2 v2.50.0 github.com/golang-migrate/migrate/v4 v4.16.2 github.com/gorilla/websocket v1.5.0 github.com/huandu/go-sqlbuilder v1.22.0 @@ -15,7 +15,7 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/strideynet/bsky-furry-feed v0.0.37 github.com/urfave/cli/v2 v2.25.1 - github.com/valyala/fasthttp v1.49.0 + github.com/valyala/fasthttp v1.50.0 golang.org/x/crypto/x509roots/fallback v0.0.0-20230928175846-ec07f4e35b9e ) @@ -116,7 +116,7 @@ require ( golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect golang.org/x/mod v0.12.0 // indirect golang.org/x/sync v0.3.0 // indirect - golang.org/x/sys v0.12.0 // indirect + golang.org/x/sys v0.13.0 // indirect golang.org/x/term v0.11.0 // indirect golang.org/x/text v0.12.0 // indirect golang.org/x/time v0.3.0 // indirect diff --git a/go.sum b/go.sum index d4806b5..dce3e3b 100644 --- a/go.sum +++ b/go.sum @@ -54,8 +54,8 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= -github.com/gofiber/fiber/v2 v2.49.2 h1:ONEN3/Vc+dUCxxDgZZwpqvhISgHqb+bu+isBiEyKEQs= -github.com/gofiber/fiber/v2 v2.49.2/go.mod h1:gNsKnyrmfEWFpJxQAV0qvW6l70K1dZGno12oLtukcts= +github.com/gofiber/fiber/v2 v2.50.0 h1:ia0JaB+uw3GpNSCR5nvC5dsaxXjRU5OEu36aytx+zGw= +github.com/gofiber/fiber/v2 v2.50.0/go.mod h1:21eytvay9Is7S6z+OgPi7c7n4++tnClWmhpimVHMimw= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= @@ -398,8 +398,8 @@ github.com/urfave/cli/v2 v2.25.1 h1:zw8dSP7ghX0Gmm8vugrs6q9Ku0wzweqPyshy+syu9Gw= github.com/urfave/cli/v2 v2.25.1/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.49.0 h1:9FdvCpmxB74LH4dPb7IJ1cOSsluR07XG3I1txXWwJpE= -github.com/valyala/fasthttp v1.49.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA= +github.com/valyala/fasthttp v1.50.0 h1:H7fweIlBm0rXLs2q0XbalvJ6r0CUPFWK3/bB4N13e9M= +github.com/valyala/fasthttp v1.50.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= @@ -516,8 +516,8 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/models/models.go b/models/models.go index 6a35f80..483d1b8 100644 --- a/models/models.go +++ b/models/models.go @@ -4,11 +4,17 @@ import "time" // Post model with key fields from the post type Post struct { - Id int64 `json:"-"` - Uri string `json:"post"` - CreatedAt int64 `json:"-"` - Text string `json:"-"` - Languages []string `json:"-"` + Id int64 `json:"id"` + CreatedAt int64 `json:"createdAt"` + Text string `json:"text"` + Languages []string `json:"languages"` + Uri string `json:"uri"` +} + +// Omit all but the Uri field +type FeedPost struct { + Id int64 `json:"-"` + Uri string `json:"post"` } // CreateEvent fired when a new post is created @@ -27,8 +33,8 @@ type DeletePostEvent struct { } type FeedResponse struct { - Feed []Post `json:"feed"` - Cursor *string `json:"cursor"` + Feed []FeedPost `json:"feed"` + Cursor *string `json:"cursor"` } type PostsAggregatedByTime struct { diff --git a/server/server.go b/server/server.go index 72dd105..30c37ab 100644 --- a/server/server.go +++ b/server/server.go @@ -3,12 +3,15 @@ package server import ( "bufio" "embed" + "encoding/json" "fmt" "net/http" "norsky/db" "norsky/feeds" + "norsky/models" "strconv" "strings" + "sync" "time" "github.com/bluesky-social/indigo/api/bsky" @@ -18,6 +21,8 @@ import ( "github.com/gofiber/fiber/v2/middleware/compress" "github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/filesystem" + "github.com/gofiber/fiber/v2/middleware/requestid" + "github.com/google/uuid" log "github.com/sirupsen/logrus" "github.com/valyala/fasthttp" ) @@ -32,25 +37,77 @@ type ServerConfig struct { // The reader to use for reading posts Reader *db.Reader + + // Broadcast channels to pass posts to SSE clients + Broadcaster *Broadcaster } -// Returns a fiber.App instance to be used as an HTTP server for the norsky feed -func Server(config *ServerConfig) *fiber.App { +// Make it sync +type Broadcaster struct { + sync.Mutex + clients map[string]chan models.CreatePostEvent +} - app := fiber.New() +// Constructor +func NewBroadcaster() *Broadcaster { + return &Broadcaster{ + clients: make(map[string]chan models.CreatePostEvent), + } +} - app.Use(compress.New()) +func (b *Broadcaster) Broadcast(post models.CreatePostEvent) { + log.WithFields(log.Fields{ + "clients": len(b.clients), + }).Info("Broadcasting post to SSE clients") - // Setup CORS for localhost:3001 - app.Use(func(c *fiber.Ctx) error { - corsConfig := cors.Config{ - AllowOrigins: "*", - AllowCredentials: true, - } - return cors.New(corsConfig)(c) - }) + b.Lock() + defer b.Unlock() + for _, client := range b.clients { + client <- post + } +} - // Serve the assets +// Function to add a client to the broadcaster +func (b *Broadcaster) AddClient(key string, client chan models.CreatePostEvent) { + b.Lock() + defer b.Unlock() + b.clients[key] = client + log.WithFields(log.Fields{ + "key": key, + "count": len(b.clients), + }).Info("Adding client to broadcaster") +} + +// Function to remove a client from the broadcaster +func (b *Broadcaster) RemoveClient(key string) { + b.Lock() + defer b.Unlock() + if _, ok := b.clients[key]; !ok { + close(b.clients[key]) + delete(b.clients, key) + } + + log.WithFields(log.Fields{ + "key": key, + "count": len(b.clients), + }).Info("Removed client from broadcaster") +} + +func (b *Broadcaster) Shutdown() { + log.Info("Shutting down broadcaster") + b.Lock() + defer b.Unlock() + for _, client := range b.clients { + close(client) + } +} + +// Returns a fiber.App instance to be used as an HTTP server for the norsky feed +func Server(config *ServerConfig) *fiber.App { + + bc := config.Broadcaster + + app := fiber.New() // Middleware to track the latency of each request app.Use(func(c *fiber.Ctx) error { @@ -72,9 +129,34 @@ func Server(config *ServerConfig) *fiber.App { return err }) + app.Use(requestid.New(requestid.ConfigDefault)) + app.Use(compress.New()) + + // Setup CORS for localhost:3001 + app.Use(func(c *fiber.Ctx) error { + corsConfig := cors.Config{ + AllowOrigins: "*", + AllowHeaders: "Cache-Control", + AllowCredentials: true, + } + return cors.New(corsConfig)(c) + }) + + // Serve the assets + // Setup cache app.Use(cache.New(cache.Config{ Next: func(c *fiber.Ctx) bool { + + if c.Method() != "GET" { + return true + } + + // If the pathname ends with /sse, don't cache + if strings.HasSuffix(c.Path(), "/sse") { + return true + } + // Only cache dashboard requests if strings.HasPrefix(c.Path(), "/dashboard") { log.WithFields(log.Fields{ @@ -196,36 +278,66 @@ func Server(config *ServerConfig) *fiber.App { return c.Status(200).JSON(postsPerTime) }) - app.Get("/dashboard/feed", func(c *fiber.Ctx) error { - // Make a server sent event stream of the feed + app.Delete("/dashboard/feed/sse", func(c *fiber.Ctx) error { + // Get the feed query parameters and parse the limit + key := c.Query("key", "") + bc.RemoveClient(key) + return c.Status(200).SendString("OK") + }) + + app.Get("/dashboard/feed/sse", func(c *fiber.Ctx) error { c.Set("Content-Type", "text/event-stream") c.Set("Cache-Control", "no-cache") c.Set("Connection", "keep-alive") c.Set("Transfer-Encoding", "chunked") c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { - var i int + // Register the channel to write posts to so server can write to it + key := uuid.New().String() + sseChannel := make(chan models.CreatePostEvent) + aliveChan := time.NewTicker(5 * time.Second) + bc.AddClient(key, sseChannel) + + // A function to cleanup + cleanup := func() { + log.Info("Cleaning up SSE stream ", key) + // Remove sseChannel from the list of channels + bc.RemoveClient(key) + aliveChan.Stop() + } + + // Send the SSE key first so the client can close the connection when it wants to + fmt.Fprintf(w, "data: %s\n\n", key) + w.Flush() + for { - i++ - msg := fmt.Sprintf("%d", i) - fmt.Fprintf(w, "data: Message: %s\n\n", msg) - fmt.Println(msg) - - err := w.Flush() - if err != nil { - // Refreshing page in web browser will establish a new - // SSE connection, but only (the last) one is alive, so - // dead connections must be closed here. - fmt.Printf("Error while flushing: %v. Closing http connection.\n", err) - - break + select { + case <-aliveChan.C: + err := w.Flush() + if err != nil { + cleanup() + return + } + case post := <-sseChannel: + jsonPost, jsonErr := json.Marshal(post.Post) + + if jsonErr != nil { + log.Error("Error marshalling post", jsonErr) + break // Break out of the select statement as we have no post to write + } + + fmt.Fprintf(w, "data: %s\n\n", jsonPost) + err := w.Flush() + if err != nil { + cleanup() + return + } } - time.Sleep(2 * time.Second) } + })) return nil - }) // Serve the Solid dashboard