Skip to content

Commit

Permalink
feat: Keyword based feeds (#22)
Browse files Browse the repository at this point in the history
Allows creating feeds selecting on multiple keywords. To allow creating
feeds on new keywords that go back in time before the feed was created
we now store text and a full text search index to facilitate search
based look up.
  • Loading branch information
snorremd authored Jan 18, 2025
1 parent 9c95821 commit 395fbcb
Show file tree
Hide file tree
Showing 12 changed files with 220 additions and 453 deletions.
37 changes: 4 additions & 33 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ func serveCmd() *cli.Command {
firehoseCtx := context.Context(ctx.Context)
livenessTicker := time.NewTicker(15 * time.Minute)
postChan := make(chan interface{}, 1000)
statisticsChan := make(chan models.StatisticsEvent, 1000)
dbPostChan := make(chan interface{}) // Channel for writing posts to the database
broadcaster := server.NewBroadcaster() // SSE broadcaster
dbPostChan := make(chan interface{}) // Channel for writing posts to the database

dbReader := db.NewReader(database)
seq, err := dbReader.GetSequence()
Expand Down Expand Up @@ -171,10 +169,9 @@ func serveCmd() *cli.Command {

// Create the server
app := server.Server(&server.ServerConfig{
Hostname: hostname,
Reader: dbReader,
Broadcaster: broadcaster,
Feeds: feedMap,
Hostname: hostname,
Reader: dbReader,
Feeds: feedMap,
})

// Some glue code to pass posts from the firehose to the database and/or broadcaster
Expand All @@ -188,30 +185,14 @@ func serveCmd() *cli.Command {
}()
for post := range postChan {
switch post := post.(type) {
// Don't crash if broadcast fails
case models.CreatePostEvent:
dbPostChan <- post
// Broadcast without blocking
go broadcaster.BroadcastCreatePost(post) // Broadcast new post to SSE clients
default:
dbPostChan <- post
}
}
}()

// Glue code to pass statistics events to the broadcaster
go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic in statistics broadcast routine: %v", r)
}
}()

for stats := range statisticsChan {
broadcaster.BroadcastStatistics(stats)
}
}()

go func() {
defer func() {
if r := recover(); r != nil {
Expand All @@ -226,16 +207,6 @@ func serveCmd() *cli.Command {
})
}()

go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic in monitor firehose routine: %v", r)
}
}()
fmt.Println("Starting statistics monitor...")
firehose.MonitorFirehoseStats(ctx.Context, statisticsChan)
}()

go func() {
<-ctx.Done()
log.Info("Context canceled with reason:", ctx.Err())
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type FeedConfig struct {
Description string `toml:"description"`
AvatarPath string `toml:"avatar_path,omitempty"`
Languages []string `toml:"languages"`
Keywords []string `toml:"keywords,omitempty"`
}

type Config struct {
Expand Down
10 changes: 10 additions & 0 deletions db/migrations/20250109194601_add_text_column.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Drop triggers first
DROP TRIGGER IF EXISTS posts_ai;
DROP TRIGGER IF EXISTS posts_ad;
DROP TRIGGER IF EXISTS posts_au;

-- Drop FTS table
DROP TABLE IF EXISTS posts_fts;

-- Remove text column from posts
ALTER TABLE posts DROP COLUMN text;
25 changes: 25 additions & 0 deletions db/migrations/20250109194601_add_text_column.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
ALTER TABLE posts ADD COLUMN text TEXT;

-- Create virtual FTS table
CREATE VIRTUAL TABLE posts_fts USING fts5(
text,
content='posts',
content_rowid='id',
tokenize='unicode61'
);

-- Trigger to keep FTS table in sync on insert
CREATE TRIGGER posts_ai AFTER INSERT ON posts BEGIN
INSERT INTO posts_fts(rowid, text) VALUES (new.id, new.text);
END;

-- Trigger to keep FTS table in sync on delete
CREATE TRIGGER posts_ad AFTER DELETE ON posts BEGIN
INSERT INTO posts_fts(posts_fts, rowid, text) VALUES('delete', old.id, old.text);
END;

-- Trigger to keep FTS table in sync on update
CREATE TRIGGER posts_au AFTER UPDATE ON posts BEGIN
INSERT INTO posts_fts(posts_fts, rowid, text) VALUES('delete', old.id, old.text);
INSERT INTO posts_fts(rowid, text) VALUES (new.id, new.text);
END;
26 changes: 21 additions & 5 deletions db/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"norsky/models"
"strconv"
"strings"
"time"

sqlbuilder "github.com/huandu/go-sqlbuilder"
Expand All @@ -25,22 +26,37 @@ func NewReader(database string) *Reader {
}
}

func (reader *Reader) GetFeed(langs []string, limit int, postId int64) ([]models.FeedPost, error) {
func (reader *Reader) GetFeed(langs []string, keywords []string, limit int, postId int64) ([]models.FeedPost, error) {
sb := sqlbuilder.NewSelectBuilder()
sb.Select("DISTINCT posts.id", "posts.uri").From("posts")

if postId != 0 {
sb.Where(sb.LessThan("posts.id", postId))
}

// Build language conditions if specified
if len(langs) > 0 {
sb.Join("post_languages", "posts.id = post_languages.post_id")
// Build OR conditions for each language
conditions := make([]string, len(langs))
langConditions := make([]string, len(langs))
for i, lang := range langs {
conditions[i] = sb.Equal("post_languages.language", lang)
langConditions[i] = sb.Equal("post_languages.language", lang)
}
sb.Where(sb.Or(conditions...))
sb.Where(sb.Or(langConditions...))
}

// Use FTS search for keywords if specified
if len(keywords) > 0 {
// Join with FTS table and build search query
sb.Join("posts_fts", "posts.id = posts_fts.rowid")
searchTerms := make([]string, len(keywords))
for i, keyword := range keywords {
// Escape quotes and use * for prefix matching
escaped := strings.ReplaceAll(keyword, "'", "''")
searchTerms[i] = fmt.Sprintf("%s*", escaped)
}
// Combine terms with OR
searchQuery := strings.Join(searchTerms, " OR ")
sb.Where(fmt.Sprintf("posts_fts MATCH '%s'", searchQuery))
}

sb.OrderBy("posts.id").Desc()
Expand Down
58 changes: 35 additions & 23 deletions db/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db

import (
"context"
"fmt"
"norsky/models"
"time"

Expand Down Expand Up @@ -66,45 +67,56 @@ func processSeq(db *sql.DB, evt models.ProcessSeqEvent) error {
return nil
}

func createPost(db *sql.DB, post models.Post) error {
func createPost(db *sql.DB, post models.Post) (int64, error) {
log.WithFields(log.Fields{
"uri": post.Uri,
"languages": post.Languages,
"uri": post.Uri,
"languages": post.Languages,
"created_at": time.Unix(post.CreatedAt, 0).Format(time.RFC3339),
// Record lag from when the post was created to when it was processed
"lagSeconds": time.Since(time.Unix(post.CreatedAt, 0)).Seconds(),
}).Info("Creating post")

// Start a transaction since we need to insert into multiple tables
tx, err := db.Begin()
if err != nil {
return 0, fmt.Errorf("transaction error: %w", err)
}
defer tx.Rollback()

// Post insert query
insertPost := sqlbuilder.NewInsertBuilder()
sql, args := insertPost.InsertInto("posts").Cols("uri", "created_at").Values(post.Uri, post.CreatedAt).Build()
insertPost.InsertInto("posts").Cols("uri", "created_at", "text")
insertPost.Values(post.Uri, post.CreatedAt, post.Text)

// Spread args
res, err := db.Exec(sql, args...)
sql, args := insertPost.Build()

result, err := tx.Exec(sql, args...)
if err != nil {
log.Error("Error inserting post", err)
return err
return 0, fmt.Errorf("insert error: %w", err)
}

// Get inserted id
id, err := res.LastInsertId()
postId, err := result.LastInsertId()
if err != nil {
log.Error("Error getting inserted id", err)
return err
return 0, fmt.Errorf("last insert id error: %w", err)
}

// Post languages insert query
insertLangs := sqlbuilder.NewInsertBuilder()
insertLangs.InsertInto("post_languages").Cols("post_id", "language")
// Insert languages
for _, lang := range post.Languages {
insertLangs.Values(id, lang)
}
sql, args = insertLangs.Build()
insertLang := sqlbuilder.NewInsertBuilder()
insertLang.InsertInto("post_languages").Cols("post_id", "language")
insertLang.Values(postId, lang)

_, err = db.Exec(sql, args...)
sql, args := insertLang.Build()
if _, err := tx.Exec(sql, args...); err != nil {
return 0, fmt.Errorf("language insert error: %w", err)
}
}

if err != nil {
log.Error("Error inserting languages", err)
return err
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("commit error: %w", err)
}

return nil
return postId, nil
}

func deletePost(db *sql.DB, post models.Post) error {
Expand Down
16 changes: 16 additions & 0 deletions feeds.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,19 @@ display_name = "Norsk (Norwegian)"
description = "A feed of Bluesky posts written in Norwegian bokmål, nynorsk and sami"
avatar_path = "./assets/avatar.png"
languages = ["nb", "nn", "no", "se"]

[[feeds]]
id = "tv-shows"
display_name = "Norwegian TV Shows"
description = "A feed of Bluesky posts about Norwegian TV shows"
avatar_path = "./assets/avatar.png"
languages = ["nb", "nn", "no"]
keywords = ["tv-serie", "tvserie", "nrk", "tv2", "netflix"]

[[feeds]]
id = "tech"
display_name = "Norwegian Tech"
description = "A feed of Bluesky posts about technology in Norwegian"
avatar_path = "./assets/avatar.png"
languages = ["nb", "nn", "no"]
keywords = ["teknologi", "programmering", "koding", "kunstig intelligens", "ai"]
12 changes: 7 additions & 5 deletions feeds/feeds.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
type Algorithm func(reader *db.Reader, cursor string, limit int) (*models.FeedResponse, error)

// Reuse genericAlgo for all algorithms
func genericAlgo(reader *db.Reader, cursor string, limit int, languages []string) (*models.FeedResponse, error) {
func genericAlgo(reader *db.Reader, cursor string, limit int, languages []string, keywords []string) (*models.FeedResponse, error) {
postId := safeParseCursor(cursor)

posts, err := reader.GetFeed(languages, limit+1, postId)
posts, err := reader.GetFeed(languages, keywords, limit+1, postId)
if err != nil {
log.Error("Error getting feed", err)
return nil, err
Expand Down Expand Up @@ -57,6 +57,7 @@ type Feed struct {
Description string
AvatarPath string
Languages []string
Keywords []string
Algorithm Algorithm
}

Expand All @@ -71,16 +72,17 @@ func InitializeFeeds(cfg *config.Config) map[string]Feed {
Description: feedCfg.Description,
AvatarPath: feedCfg.AvatarPath,
Languages: feedCfg.Languages,
Algorithm: createAlgorithm(feedCfg.Languages),
Keywords: feedCfg.Keywords,
Algorithm: createAlgorithm(feedCfg.Languages, feedCfg.Keywords),
}
}

return feeds
}

// Helper function to create an algorithm based on languages
func createAlgorithm(languages []string) Algorithm {
func createAlgorithm(languages []string, keywords []string) Algorithm {
return func(reader *db.Reader, cursor string, limit int) (*models.FeedResponse, error) {
return genericAlgo(reader, cursor, limit, languages)
return genericAlgo(reader, cursor, limit, languages, keywords)
}
}
Loading

0 comments on commit 395fbcb

Please sign in to comment.