diff --git a/cmd/serve.go b/cmd/serve.go index 43975a2..ba952b9 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -114,9 +114,7 @@ func serveCmd() *cli.Command { firehoseCtx := context.Context(ctx.Context) livenessTicker := time.NewTicker(15 * time.Minute) postChan := make(chan interface{}, 1000) - statisticsChan := make(chan models.StatisticsEvent, 1000) - dbPostChan := make(chan interface{}) // Channel for writing posts to the database - broadcaster := server.NewBroadcaster() // SSE broadcaster + dbPostChan := make(chan interface{}) // Channel for writing posts to the database dbReader := db.NewReader(database) seq, err := dbReader.GetSequence() @@ -171,10 +169,9 @@ func serveCmd() *cli.Command { // Create the server app := server.Server(&server.ServerConfig{ - Hostname: hostname, - Reader: dbReader, - Broadcaster: broadcaster, - Feeds: feedMap, + Hostname: hostname, + Reader: dbReader, + Feeds: feedMap, }) // Some glue code to pass posts from the firehose to the database and/or broadcaster @@ -188,30 +185,14 @@ func serveCmd() *cli.Command { }() for post := range postChan { switch post := post.(type) { - // Don't crash if broadcast fails case models.CreatePostEvent: dbPostChan <- post - // Broadcast without blocking - go broadcaster.BroadcastCreatePost(post) // Broadcast new post to SSE clients default: dbPostChan <- post } } }() - // Glue code to pass statistics events to the broadcaster - go func() { - defer func() { - if r := recover(); r != nil { - log.Errorf("Recovered from panic in statistics broadcast routine: %v", r) - } - }() - - for stats := range statisticsChan { - broadcaster.BroadcastStatistics(stats) - } - }() - go func() { defer func() { if r := recover(); r != nil { @@ -226,16 +207,6 @@ func serveCmd() *cli.Command { }) }() - go func() { - defer func() { - if r := recover(); r != nil { - log.Errorf("Recovered from panic in monitor firehose routine: %v", r) - } - }() - fmt.Println("Starting statistics monitor...") - firehose.MonitorFirehoseStats(ctx.Context, statisticsChan) - }() - go func() { <-ctx.Done() log.Info("Context canceled with reason:", ctx.Err()) diff --git a/config/config.go b/config/config.go index 8d4c9e2..9ac13bd 100644 --- a/config/config.go +++ b/config/config.go @@ -13,6 +13,7 @@ type FeedConfig struct { Description string `toml:"description"` AvatarPath string `toml:"avatar_path,omitempty"` Languages []string `toml:"languages"` + Keywords []string `toml:"keywords,omitempty"` } type Config struct { diff --git a/db/migrations/20250109194601_add_text_column.down.sql b/db/migrations/20250109194601_add_text_column.down.sql new file mode 100644 index 0000000..8bc00a0 --- /dev/null +++ b/db/migrations/20250109194601_add_text_column.down.sql @@ -0,0 +1,10 @@ +-- Drop triggers first +DROP TRIGGER IF EXISTS posts_ai; +DROP TRIGGER IF EXISTS posts_ad; +DROP TRIGGER IF EXISTS posts_au; + +-- Drop FTS table +DROP TABLE IF EXISTS posts_fts; + +-- Remove text column from posts +ALTER TABLE posts DROP COLUMN text; diff --git a/db/migrations/20250109194601_add_text_column.up.sql b/db/migrations/20250109194601_add_text_column.up.sql new file mode 100644 index 0000000..dc9b07a --- /dev/null +++ b/db/migrations/20250109194601_add_text_column.up.sql @@ -0,0 +1,25 @@ +ALTER TABLE posts ADD COLUMN text TEXT; + +-- Create virtual FTS table +CREATE VIRTUAL TABLE posts_fts USING fts5( + text, + content='posts', + content_rowid='id', + tokenize='unicode61' +); + +-- Trigger to keep FTS table in sync on insert +CREATE TRIGGER posts_ai AFTER INSERT ON posts BEGIN + INSERT INTO posts_fts(rowid, text) VALUES (new.id, new.text); +END; + +-- Trigger to keep FTS table in sync on delete +CREATE TRIGGER posts_ad AFTER DELETE ON posts BEGIN + INSERT INTO posts_fts(posts_fts, rowid, text) VALUES('delete', old.id, old.text); +END; + +-- Trigger to keep FTS table in sync on update +CREATE TRIGGER posts_au AFTER UPDATE ON posts BEGIN + INSERT INTO posts_fts(posts_fts, rowid, text) VALUES('delete', old.id, old.text); + INSERT INTO posts_fts(rowid, text) VALUES (new.id, new.text); +END; \ No newline at end of file diff --git a/db/reader.go b/db/reader.go index ede6973..6d0afc6 100644 --- a/db/reader.go +++ b/db/reader.go @@ -5,6 +5,7 @@ import ( "fmt" "norsky/models" "strconv" + "strings" "time" sqlbuilder "github.com/huandu/go-sqlbuilder" @@ -25,7 +26,7 @@ func NewReader(database string) *Reader { } } -func (reader *Reader) GetFeed(langs []string, limit int, postId int64) ([]models.FeedPost, error) { +func (reader *Reader) GetFeed(langs []string, keywords []string, limit int, postId int64) ([]models.FeedPost, error) { sb := sqlbuilder.NewSelectBuilder() sb.Select("DISTINCT posts.id", "posts.uri").From("posts") @@ -33,14 +34,29 @@ func (reader *Reader) GetFeed(langs []string, limit int, postId int64) ([]models sb.Where(sb.LessThan("posts.id", postId)) } + // Build language conditions if specified if len(langs) > 0 { sb.Join("post_languages", "posts.id = post_languages.post_id") - // Build OR conditions for each language - conditions := make([]string, len(langs)) + langConditions := make([]string, len(langs)) for i, lang := range langs { - conditions[i] = sb.Equal("post_languages.language", lang) + langConditions[i] = sb.Equal("post_languages.language", lang) } - sb.Where(sb.Or(conditions...)) + sb.Where(sb.Or(langConditions...)) + } + + // Use FTS search for keywords if specified + if len(keywords) > 0 { + // Join with FTS table and build search query + sb.Join("posts_fts", "posts.id = posts_fts.rowid") + searchTerms := make([]string, len(keywords)) + for i, keyword := range keywords { + // Escape quotes and use * for prefix matching + escaped := strings.ReplaceAll(keyword, "'", "''") + searchTerms[i] = fmt.Sprintf("%s*", escaped) + } + // Combine terms with OR + searchQuery := strings.Join(searchTerms, " OR ") + sb.Where(fmt.Sprintf("posts_fts MATCH '%s'", searchQuery)) } sb.OrderBy("posts.id").Desc() diff --git a/db/writer.go b/db/writer.go index 22bc79e..052469d 100644 --- a/db/writer.go +++ b/db/writer.go @@ -2,6 +2,7 @@ package db import ( "context" + "fmt" "norsky/models" "time" @@ -66,45 +67,56 @@ func processSeq(db *sql.DB, evt models.ProcessSeqEvent) error { return nil } -func createPost(db *sql.DB, post models.Post) error { +func createPost(db *sql.DB, post models.Post) (int64, error) { log.WithFields(log.Fields{ - "uri": post.Uri, - "languages": post.Languages, + "uri": post.Uri, + "languages": post.Languages, + "created_at": time.Unix(post.CreatedAt, 0).Format(time.RFC3339), + // Record lag from when the post was created to when it was processed + "lagSeconds": time.Since(time.Unix(post.CreatedAt, 0)).Seconds(), }).Info("Creating post") + + // Start a transaction since we need to insert into multiple tables + tx, err := db.Begin() + if err != nil { + return 0, fmt.Errorf("transaction error: %w", err) + } + defer tx.Rollback() + // Post insert query insertPost := sqlbuilder.NewInsertBuilder() - sql, args := insertPost.InsertInto("posts").Cols("uri", "created_at").Values(post.Uri, post.CreatedAt).Build() + insertPost.InsertInto("posts").Cols("uri", "created_at", "text") + insertPost.Values(post.Uri, post.CreatedAt, post.Text) - // Spread args - res, err := db.Exec(sql, args...) + sql, args := insertPost.Build() + + result, err := tx.Exec(sql, args...) if err != nil { - log.Error("Error inserting post", err) - return err + return 0, fmt.Errorf("insert error: %w", err) } - // Get inserted id - id, err := res.LastInsertId() + postId, err := result.LastInsertId() if err != nil { - log.Error("Error getting inserted id", err) - return err + return 0, fmt.Errorf("last insert id error: %w", err) } - // Post languages insert query - insertLangs := sqlbuilder.NewInsertBuilder() - insertLangs.InsertInto("post_languages").Cols("post_id", "language") + // Insert languages for _, lang := range post.Languages { - insertLangs.Values(id, lang) - } - sql, args = insertLangs.Build() + insertLang := sqlbuilder.NewInsertBuilder() + insertLang.InsertInto("post_languages").Cols("post_id", "language") + insertLang.Values(postId, lang) - _, err = db.Exec(sql, args...) + sql, args := insertLang.Build() + if _, err := tx.Exec(sql, args...); err != nil { + return 0, fmt.Errorf("language insert error: %w", err) + } + } - if err != nil { - log.Error("Error inserting languages", err) - return err + if err := tx.Commit(); err != nil { + return 0, fmt.Errorf("commit error: %w", err) } - return nil + return postId, nil } func deletePost(db *sql.DB, post models.Post) error { diff --git a/feeds.example.toml b/feeds.example.toml index ffd7479..f5fd7ad 100644 --- a/feeds.example.toml +++ b/feeds.example.toml @@ -25,3 +25,19 @@ display_name = "Norsk (Norwegian)" description = "A feed of Bluesky posts written in Norwegian bokmål, nynorsk and sami" avatar_path = "./assets/avatar.png" languages = ["nb", "nn", "no", "se"] + +[[feeds]] +id = "tv-shows" +display_name = "Norwegian TV Shows" +description = "A feed of Bluesky posts about Norwegian TV shows" +avatar_path = "./assets/avatar.png" +languages = ["nb", "nn", "no"] +keywords = ["tv-serie", "tvserie", "nrk", "tv2", "netflix"] + +[[feeds]] +id = "tech" +display_name = "Norwegian Tech" +description = "A feed of Bluesky posts about technology in Norwegian" +avatar_path = "./assets/avatar.png" +languages = ["nb", "nn", "no"] +keywords = ["teknologi", "programmering", "koding", "kunstig intelligens", "ai"] diff --git a/feeds/feeds.go b/feeds/feeds.go index 88efc5f..9755200 100644 --- a/feeds/feeds.go +++ b/feeds/feeds.go @@ -13,10 +13,10 @@ import ( type Algorithm func(reader *db.Reader, cursor string, limit int) (*models.FeedResponse, error) // Reuse genericAlgo for all algorithms -func genericAlgo(reader *db.Reader, cursor string, limit int, languages []string) (*models.FeedResponse, error) { +func genericAlgo(reader *db.Reader, cursor string, limit int, languages []string, keywords []string) (*models.FeedResponse, error) { postId := safeParseCursor(cursor) - posts, err := reader.GetFeed(languages, limit+1, postId) + posts, err := reader.GetFeed(languages, keywords, limit+1, postId) if err != nil { log.Error("Error getting feed", err) return nil, err @@ -57,6 +57,7 @@ type Feed struct { Description string AvatarPath string Languages []string + Keywords []string Algorithm Algorithm } @@ -71,7 +72,8 @@ func InitializeFeeds(cfg *config.Config) map[string]Feed { Description: feedCfg.Description, AvatarPath: feedCfg.AvatarPath, Languages: feedCfg.Languages, - Algorithm: createAlgorithm(feedCfg.Languages), + Keywords: feedCfg.Keywords, + Algorithm: createAlgorithm(feedCfg.Languages, feedCfg.Keywords), } } @@ -79,8 +81,8 @@ func InitializeFeeds(cfg *config.Config) map[string]Feed { } // Helper function to create an algorithm based on languages -func createAlgorithm(languages []string) Algorithm { +func createAlgorithm(languages []string, keywords []string) Algorithm { return func(reader *db.Reader, cursor string, limit int) (*models.FeedResponse, error) { - return genericAlgo(reader, cursor, limit, languages) + return genericAlgo(reader, cursor, limit, languages, keywords) } } diff --git a/firehose/firehose.go b/firehose/firehose.go index 51189eb..b9f47b7 100644 --- a/firehose/firehose.go +++ b/firehose/firehose.go @@ -8,52 +8,70 @@ import ( "io" "net/http" "norsky/models" - "runtime" "strings" "sync" - "sync/atomic" "time" "unicode" "unicode/utf8" "log/slog" + "net" + "github.com/bluesky-social/indigo/api/atproto" appbsky "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/events" - "github.com/bluesky-social/indigo/events/schedulers/autoscaling" + "github.com/bluesky-social/indigo/events/schedulers/sequential" lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/repo" "github.com/bluesky-social/indigo/repomgr" "github.com/cenkalti/backoff/v4" "github.com/gorilla/websocket" lingua "github.com/pemistahl/lingua-go" + "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" log "github.com/sirupsen/logrus" ) // Some constants to optimize the firehose +// Allow these to grow to 10MB const ( - wsReadBufferSize = 1024 * 16 // 16KB - wsWriteBufferSize = 1024 * 16 // 16KB - eventBufferSize = 10000 // Increase from 1000 + wsReadBufferSize = 1024 * 1024 * 2 // 2MB + wsWriteBufferSize = 1024 * 1024 * 2 // 2MB + wsReadTimeout = 90 * time.Second // Increased from 60s + wsWriteTimeout = 15 * time.Second // Increased from 10s + wsPingInterval = 30 * time.Second // Reduced from 60s ) -// We use all languages so as to reliably separate Norwegian from other European languages -var detector lingua.LanguageDetector +// Add these metrics +var ( + wsMessageProcessingTime = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "norsky_ws_message_processing_seconds", + Help: "Time spent processing websocket messages", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), + }) + + wsMessageBacklog = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "norsky_ws_message_backlog", + Help: "Number of messages waiting to be processed", + }) +) -func InitDetector() { - if detector == nil { - detector = lingua.NewLanguageDetectorBuilder().FromLanguages(lingua.AllLanguages()...).WithMinimumRelativeDistance(0.25).Build() - } +func init() { + prometheus.MustRegister(wsMessageProcessingTime) + prometheus.MustRegister(wsMessageBacklog) } -// Keep track of processed event and posts count to show stats in the web interface -var ( - processedEvents int64 - processedPosts int64 -) +func NewLanguageDetector(targetLangs []lingua.Language) lingua.LanguageDetector { + // Always include English plus target languages + languages := lingua.AllLanguages() + + return lingua.NewLanguageDetectorBuilder(). + FromLanguages(languages...). + WithMinimumRelativeDistance(0.25). + Build() +} // Add a pool for the FeedPost struct to reduce GC pressure // Instead of allocating new FeedPost structs for every post, @@ -175,7 +193,6 @@ func ContainsRepetitivePattern(text string) bool { minRepeats = 2 } if repeats >= minRepeats { - log.Debugf("Found repeating pattern '%v' (%d times)", pattern, repeats) return true } } else { @@ -247,19 +264,16 @@ func ContainsSpamContent(text string) bool { // If more than 5 hashtags, consider it spam if hashtagCount > 5 { - log.Infof("Skipping spam post with many hashtags: %s", text) return true } // If more than 5 mentions, consider it spam if mentionCount > 5 { - log.Infof("Skipping spam post with many mentions: %s", text) return true } // Check for repeated hashtags or mentions (common spam pattern) if strings.Count(text, "##") > 0 || strings.Count(text, "@@") > 0 { - log.Infof("Skipping spam post with repeated hashtags/mentions: %s", text) return true } @@ -270,7 +284,6 @@ func ContainsSpamContent(text string) bool { symbolRatio := float64(hashtagCount+mentionCount) / float64(len(words)) // If more than 50% of words are hashtags or mentions combined, consider it spam if symbolRatio > 0.5 { - log.Infof("Skipping spam post with high hashtag/mention ratio: %s", text) return true } } @@ -288,11 +301,10 @@ type FirehoseConfig struct { // Subscribe to the firehose using the Firehose struct as a receiver func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Ticker, seq int64, config FirehoseConfig) { - InitDetector() - address := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" headers := http.Header{} headers.Set("User-Agent", "NorSky: https://github.com/snorremd/norsky") + headers.Set("Accept-Encoding", "gzip") if seq >= 0 { log.Info("Starting from sequence: ", seq) @@ -303,12 +315,16 @@ func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Tick dialer := websocket.Dialer{ ReadBufferSize: wsReadBufferSize, WriteBufferSize: wsWriteBufferSize, - HandshakeTimeout: 30 * time.Second, + HandshakeTimeout: 45 * time.Second, + NetDialContext: (&net.Dialer{ + Timeout: 45 * time.Second, + KeepAlive: 45 * time.Second, + }).DialContext, } backoff := backoff.NewExponentialBackOff() - backoff.InitialInterval = 1 * time.Second - backoff.MaxInterval = 600 * time.Second + backoff.InitialInterval = 100 * time.Millisecond + backoff.MaxInterval = 30 * time.Second backoff.Multiplier = 1.5 backoff.MaxElapsedTime = 0 @@ -330,14 +346,14 @@ func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Tick backoff.Reset() // Set initial deadlines - conn.SetReadDeadline(time.Now().Add(60 * time.Second)) - conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + conn.SetReadDeadline(time.Now().Add(wsReadTimeout)) + conn.SetWriteDeadline(time.Now().Add(wsWriteTimeout)) - // Start ping ticker - pingTicker := time.NewTicker(60 * time.Second) + // Start ping ticker with shorter interval + pingTicker := time.NewTicker(wsPingInterval) defer pingTicker.Stop() - // Start ping goroutine + // Update ping goroutine go func() { for { select { @@ -345,13 +361,13 @@ func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Tick return case <-pingTicker.C: log.Debug("Sending ping to check connection") - if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil { + if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(wsWriteTimeout)); err != nil { log.Warn("Ping failed, closing connection for restart: ", err) conn.Close() return } // Reset read deadline after successful ping - if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { + if err := conn.SetReadDeadline(time.Now().Add(wsReadTimeout)); err != nil { log.Warn("Failed to set read deadline, closing connection: ", err) conn.Close() return @@ -360,6 +376,12 @@ func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Tick } }() + // Add connection close handler + conn.SetCloseHandler(func(code int, text string) error { + log.Infof("WebSocket connection closed with code %d: %s", code, text) + return nil + }) + // Remove pong handler since server doesn't respond // Keep ping handler for completeness conn.SetPingHandler(func(appData string) error { @@ -367,16 +389,20 @@ func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Tick return conn.SetReadDeadline(time.Now().Add(60 * time.Second)) }) - scheduler := autoscaling.NewScheduler( - autoscaling.AutoscaleSettings{ - MaxConcurrency: runtime.NumCPU(), - Concurrency: 2, - AutoscaleFrequency: 5 * time.Second, - ThroughputBucketDuration: 1 * time.Second, - ThroughputBucketCount: 10, - }, + scheduler := sequential.NewScheduler( + //runtime.NumCPU(), + //100, + // autoscaling.AutoscaleSettings{ + + // MaxConcurrency: runtime.NumCPU() * 2, + // Concurrency: runtime.NumCPU() * 2, + // AutoscaleFrequency: 10 * time.Second, + // ThroughputBucketDuration: 2 * time.Second, + // ThroughputBucketCount: 15, + // }, conn.RemoteAddr().String(), eventProcessor(postChan, ctx, ticker, config).EventHandler) + err = events.HandleRepoStream(ctx, conn, scheduler, slog.Default()) // If error sleep @@ -401,27 +427,6 @@ func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Tick } } -func MonitorFirehoseStats(ctx context.Context, statisticsChan chan models.StatisticsEvent) { - ticker := time.NewTicker(5 * time.Second) - for { - select { - case <-ticker.C: - // Send statistics event - statisticsChan <- models.StatisticsEvent{ - // Divide by 5 and round to get average per second - EventsPerSecond: atomic.LoadInt64(&processedEvents) / 5, - PostsPerSecond: atomic.LoadInt64(&processedPosts) / 5, - } - // Reset processed events and posts - atomic.StoreInt64(&processedEvents, 0) - atomic.StoreInt64(&processedPosts, 0) - case <-ctx.Done(): - log.Info("Stopping statistics ticker") - return - } - } -} - // Add new types to help organize the code type PostProcessor struct { postChan chan interface{} @@ -430,51 +435,37 @@ type PostProcessor struct { config FirehoseConfig targetLanguages []lingua.Language supportedLanguages map[lingua.Language]string + languageDetector lingua.LanguageDetector // We should have one detector per worker } // Rename to DetectLanguage since it's no longer Norwegian-specific func (p *PostProcessor) DetectLanguage(text string, currentLangs []string, targetLangs []lingua.Language) (bool, []string) { - detectedLang, exists := detector.DetectLanguageOf(text) - if !exists || detectedLang == lingua.English { - return false, currentLangs - } - - // Check if detected language is one of our target languages - isTargetLang := false - for _, lang := range targetLangs { - if detectedLang == lang { - isTargetLang = true - break - } - } - if !isTargetLang { - return false, currentLangs - } - - // Get confidence scores for target languages var highestConf float64 - var detectedLingua lingua.Language + var detectedLang lingua.Language - for _, lang := range targetLangs { - conf := detector.ComputeLanguageConfidence(text, lang) + // Check confidence for English and all target languages + for _, lang := range append([]lingua.Language{lingua.English}, targetLangs...) { + conf := p.languageDetector.ComputeLanguageConfidence(text, lang) if conf > highestConf { highestConf = conf - detectedLingua = lang + detectedLang = lang } - log.Infof("%s confidence: %.2f (threshold: %.2f)", - lang.String(), conf, p.config.ConfidenceThreshold) } - if highestConf < p.config.ConfidenceThreshold { + // If confidence is too low or detected language is English, skip + if highestConf < p.config.ConfidenceThreshold || detectedLang == lingua.English { return false, currentLangs } + log.Infof("%s confidence: %.2f (threshold: %.2f)", + detectedLang.String(), highestConf, p.config.ConfidenceThreshold) + // Create new slice to avoid modifying the input updatedLangs := make([]string, len(currentLangs)) copy(updatedLangs, currentLangs) // Map lingua language to ISO code - langCode := linguaToISO(detectedLingua, p.supportedLanguages) + langCode := linguaToISO(detectedLang, p.supportedLanguages) if langCode != "" && !lo.Contains(updatedLangs, langCode) { updatedLangs = append(updatedLangs, langCode) } @@ -502,30 +493,23 @@ func isoToLingua(code string, languages map[lingua.Language]string) (lingua.Lang // Handle post processing logic func (p *PostProcessor) processPost(evt *atproto.SyncSubscribeRepos_Commit, op *atproto.SyncSubscribeRepos_RepoOp, record *appbsky.FeedPost) error { + // Get URI uri := fmt.Sprintf("at://%s/%s", evt.Repo, op.Path) - // 1. Check word count first (cheapest operation - just string splitting) words := strings.Fields(record.Text) if len(words) < 4 { - log.Debugf("Skipping short post with only %d words: %s", len(words), uri) return nil } - // 3. Check letter ratio (fast character counting) if !HasEnoughLetters(record.Text) { - log.Debugf("Skipping post with insufficient letter ratio: %s", uri) return nil } - // 4. Check for repetitive patterns (string analysis) if ContainsRepetitivePattern(record.Text) { - log.Debugf("Skipping post with repetitive pattern: %s", uri) return nil } - // 5. Check for spam content (string matching) if ContainsSpamContent(record.Text) { - log.Debugf("Skipping spam post: %s", uri) return nil } @@ -614,12 +598,11 @@ func eventProcessor(postChan chan interface{}, context context.Context, ticker * config: config, targetLanguages: targetLangs, supportedLanguages: supportedLangs, + languageDetector: NewLanguageDetector(targetLangs), } return &events.RepoStreamCallbacks{ RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { - atomic.AddInt64(&processedEvents, 1) - rr, err := repo.ReadRepoFromCar(context, bytes.NewReader(evt.Blocks)) if err != nil { return fmt.Errorf("failed to read repo from car: %w", err) @@ -636,8 +619,6 @@ func eventProcessor(postChan chan interface{}, context context.Context, ticker * continue } - atomic.AddInt64(&processedPosts, 1) - // Get and decode record _, rec, err := rr.GetRecord(context, op.Path) if err != nil { @@ -675,11 +656,6 @@ func eventProcessor(postChan chan interface{}, context context.Context, ticker * } } -// GetDetector returns the package-level detector for testing -func GetDetector() lingua.LanguageDetector { - return detector -} - // Rename and modify the function to just get supported languages func getSupportedLanguages() map[lingua.Language]string { languages := make(map[lingua.Language]string) diff --git a/frontend/App.tsx b/frontend/App.tsx index 919ceae..a669603 100644 --- a/frontend/App.tsx +++ b/frontend/App.tsx @@ -258,73 +258,6 @@ const langToName = (lang: string): string => { } }; -interface PostFirehoseProps { - post: Accessor; - className?: string; -} - -const PostFirehose: Component = ({ post, className }) => { - - // Match regex to get the profile and post id - // URI example: at://did:plc:opkjeuzx2lego6a7gueytryu/app.bsky.feed.post/3kcbxsslpu623 - // profile = did:plc:opkjeuzx2lego6a7gueytryu - // post = 3kcbxsslpu623 - - const bskyLink = (post: Post) => { - const regex = /at:\/\/(did:plc:[a-z0-9]+)\/app.bsky.feed.post\/([a-z0-9]+)/; - const [profile, postId] = regex.exec(post.uri)!.slice(1); - return `https://bsky.app/profile/${profile}/post/${postId}`; - } - - return ( - -

Recent posts

-
- {post() ? ( -
-
-

{formatRelative(new Date(post().createdAt * 1000), new Date()) }

-

- {post().languages.map(langToName).join(", ")} -

-
-

{post().text}

- - {/* Link to post on Bluesky */} - -
- ): null} -
-
- ); -}; - -const StatisticStat = ({ - label, - value, - className, -}: { - label: string; - value: Accessor; - className?: string; -}) => { - return ( - -

{label}

-

{value()}

-

per second

-
- ); -}; - const Header = () => { return (
{ const App: Component = () => { const [key, setKey] = createSignal(); // Used to politely close the event source const [post, setPost] = createSignal(); - const [eventsPerSecond, setEventsPerSecond] = createSignal(0); - const [postsPerSecond, setPostsPerSecond] = createSignal(0); const [eventSource, setEventSource] = createSignal(null); onMount(() => { @@ -370,13 +301,6 @@ const App: Component = () => { setPost(data); }); - es.addEventListener("statistics", (e: MessageEvent) => { - const data = JSON.parse(e.data); - console.log("Received statistics", data); - setEventsPerSecond(data.eventsPerSecond); - setPostsPerSecond(data.postsPerSecond); - }); - // Add error handling es.addEventListener("error", (e) => { console.error("EventSource error:", e); @@ -412,21 +336,10 @@ const App: Component = () => { <>
- - -
); diff --git a/models/models.go b/models/models.go index 8366157..29f8452 100644 --- a/models/models.go +++ b/models/models.go @@ -45,8 +45,3 @@ type PostsAggregatedByTime struct { Time time.Time `json:"time"` Count int64 `json:"count"` } - -type StatisticsEvent struct { - EventsPerSecond int64 `json:"eventsPerSecond"` - PostsPerSecond int64 `json:"postsPerSecond"` -} diff --git a/server/server.go b/server/server.go index 0b75db1..c33bdb9 100644 --- a/server/server.go +++ b/server/server.go @@ -1,30 +1,27 @@ package server import ( - "bufio" "embed" - "encoding/json" "fmt" "net/http" "norsky/db" "norsky/feeds" - "norsky/models" "strconv" "strings" - "sync" "time" "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/atproto/syntax" "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/middleware/adaptor" "github.com/gofiber/fiber/v2/middleware/cache" "github.com/gofiber/fiber/v2/middleware/compress" "github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/filesystem" "github.com/gofiber/fiber/v2/middleware/requestid" - "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" - "github.com/valyala/fasthttp" ) //go:embed dist/* @@ -38,100 +35,29 @@ type ServerConfig struct { // The reader to use for reading posts Reader *db.Reader - // Broadcast channels to pass posts to SSE clients - Broadcaster *Broadcaster - // Add feeds to config Feeds map[string]feeds.Feed } -// Make it sync -type Broadcaster struct { - sync.RWMutex - createPostClients map[string]chan models.CreatePostEvent - statisticsClients map[string]chan models.StatisticsEvent -} - -// Constructor -func NewBroadcaster() *Broadcaster { - return &Broadcaster{ - createPostClients: make(map[string]chan models.CreatePostEvent, 10000), - statisticsClients: make(map[string]chan models.StatisticsEvent, 10000), - } -} - -func (b *Broadcaster) BroadcastCreatePost(post models.CreatePostEvent) { - for id, client := range b.createPostClients { - select { - case client <- post: // Non-blocking send - default: - log.Warnf("Client channel full, skipping stats for client: %v", id) - } - } -} - -func (b *Broadcaster) BroadcastStatistics(stats models.StatisticsEvent) { - b.RLock() // Assuming you have a mutex for client safety - defer b.RUnlock() - - for id, client := range b.statisticsClients { - select { - case client <- stats: // Non-blocking send - default: - log.Warnf("Client channel full, skipping stats for client: %v", id) - } - } -} - -// Function to add a client to the broadcaster -func (b *Broadcaster) AddClient(key string, createPostClient chan models.CreatePostEvent, statisticsClient chan models.StatisticsEvent) { - b.Lock() - defer b.Unlock() - b.createPostClients[key] = createPostClient - b.statisticsClients[key] = statisticsClient - log.WithFields(log.Fields{ - "key": key, - "count": len(b.createPostClients), - }).Info("Adding client to broadcaster") -} - -// Function to remove a client from the broadcaster -func (b *Broadcaster) RemoveClient(key string) { - b.Lock() - defer b.Unlock() - - // Remove from createPostClients - if client, ok := b.createPostClients[key]; ok { // Check if the client exists - close(client) // Safely close the channel - delete(b.createPostClients, key) // Remove from the map - } - - // Remove from statisticsClients - if client, ok := b.statisticsClients[key]; ok { // Check if the client exists - close(client) // Safely close the channel - delete(b.statisticsClients, key) // Remove from the map - } - - log.WithFields(log.Fields{ - "key": key, - "count": len(b.createPostClients), - }).Info("Removed client from broadcaster") -} +var ( + // Define custom metrics + customPostsProcessed = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "norsky_posts_processed_total", + Help: "Total number of posts processed by language", + }, + []string{"language"}, + ) +) -func (b *Broadcaster) Shutdown() { - log.Info("Shutting down broadcaster") - b.Lock() - defer b.Unlock() - for _, client := range b.createPostClients { - close(client) - } +func init() { + // Register custom metrics + prometheus.MustRegister(customPostsProcessed) } // Returns a fiber.App instance to be used as an HTTP server for the norsky feed func Server(config *ServerConfig) *fiber.App { - bc := config.Broadcaster - app := fiber.New() // Middleware to track the latency of each request @@ -299,105 +225,6 @@ func Server(config *ServerConfig) *fiber.App { return c.Status(200).JSON(postsPerTime) }) - app.Delete("/dashboard/feed/sse", func(c *fiber.Ctx) error { - // Get the feed query parameters and parse the limit - key := c.Query("key", "") - bc.RemoveClient(key) - return c.Status(200).SendString("OK") - }) - - app.Get("/dashboard/feed/sse", func(c *fiber.Ctx) error { - c.Set("Content-Type", "text/event-stream") - c.Set("Cache-Control", "no-cache") - c.Set("Connection", "keep-alive") - c.Set("Transfer-Encoding", "chunked") - - // Unique client key - key := uuid.New().String() - sseCreatePostChannel := make(chan models.CreatePostEvent, 10) // Buffered channel - sseStatisticsChannel := make(chan models.StatisticsEvent, 10) - aliveChan := time.NewTicker(5 * time.Second) - - defer aliveChan.Stop() - - // Register the client - bc.AddClient(key, sseCreatePostChannel, sseStatisticsChannel) - - // Cleanup function - cleanup := func() { - log.Infof("Cleaning up SSE stream for client: %s", key) - bc.RemoveClient(key) - } - - // Use StreamWriter to manage SSE streaming - c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { - defer cleanup() - - // Send initial event with client key - fmt.Fprintf(w, "event: init\ndata: %s\n\n", key) - if err := w.Flush(); err != nil { - log.Errorf("Failed to send init event: %v", err) - return - } - - // Start streaming loop - for { - select { - case <-aliveChan.C: - // Send keep-alive pings - if _, err := fmt.Fprintf(w, "event: ping\ndata: \n\n"); err != nil { - log.Warnf("Failed to send ping to client %s: %v", key, err) - return - } - if err := w.Flush(); err != nil { - log.Warnf("Failed to flush ping for client %s: %v", key, err) - return - } - - case post, ok := <-sseCreatePostChannel: - if !ok { - log.Warnf("CreatePostChannel closed for client %s", key) - return - } - jsonPost, err := json.Marshal(post.Post) - if err != nil { - log.Errorf("Error marshalling post for client %s: %v", key, err) - continue - } - if _, err := fmt.Fprintf(w, "event: create-post\ndata: %s\n\n", jsonPost); err != nil { - log.Warnf("Failed to send create-post event to client %s: %v", key, err) - return - } - if err := w.Flush(); err != nil { - log.Warnf("Failed to flush create-post event for client %s: %v", key, err) - return - } - - case stats, ok := <-sseStatisticsChannel: - if !ok { - log.Warnf("StatisticsChannel closed for client %s", key) - return - } - jsonStats, err := json.Marshal(stats) - if err != nil { - log.Errorf("Error marshalling stats for client %s: %v", key, err) - continue - } - if _, err := fmt.Fprintf(w, "event: statistics\ndata: %s\n\n", jsonStats); err != nil { - log.Warnf("Failed to send statistics event to client %s: %v", key, err) - return - } - if err := w.Flush(); err != nil { - log.Warnf("Failed to flush statistics event for client %s: %v", key, err) - return - } - } - } - })) - - return nil - }) - // Serve the Solid dashboard app.Use("/", filesystem.New(filesystem.Config{ Browse: false, @@ -406,5 +233,8 @@ func Server(config *ServerConfig) *fiber.App { PathPrefix: "/dist", })) + // Add Prometheus metrics endpoint + app.Get("/metrics", adaptor.HTTPHandler(promhttp.Handler())) + return app }