Skip to content
This repository has been archived by the owner on Feb 22, 2024. It is now read-only.

Feat/bread/stability #9

Merged
merged 13 commits into from
Oct 10, 2023
Merged
4 changes: 4 additions & 0 deletions cmd/irc-reader/example.config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
loglevel: info

kube:
namespace: default
oauthsecret: twitch-irc-oauth

ratelimit:
join: 20
auth: 20
Expand Down
69 changes: 61 additions & 8 deletions internal/aggregator/emote.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/seventv/api/data/model"
"go.uber.org/zap"

"github.com/seventv/7tv-bot/pkg/types"
)
Expand Down Expand Up @@ -53,10 +54,29 @@ var mx = sync.Mutex{}

type emoteCache struct {
expires time.Time
emotes []model.ActiveEmoteModel
emotes []types.Emote
}

func getEmotesForChannel(channelID string) ([]model.ActiveEmoteModel, error) {
func initCache() {
go func() {
for range time.Tick(1 * time.Minute) {
cleanCache()
}
}()
}

func cleanCache() {
mx.Lock()
defer mx.Unlock()

for channelID, cache := range activeEmotesCache {
if time.Since(cache.expires) > 0 {
delete(activeEmotesCache, channelID)
}
}
}

func getEmotesForChannel(channelID string) ([]types.Emote, error) {
mx.Lock()
defer mx.Unlock()

Expand All @@ -66,19 +86,35 @@ func getEmotesForChannel(channelID string) ([]model.ActiveEmoteModel, error) {
return cache.emotes, nil
}
}
emotes, err := getEmotesByChannelId(channelID)
response, err := getEmotesByChannelId(channelID)
if err != nil {
if errors.Is(err, ErrEmotesNotEnabled) {
// set empty slice, so we don't spam the API with requests in the future
activeEmotesCache[channelID] = emoteCache{
emotes: []model.ActiveEmoteModel{},
emotes: []types.Emote{},
expires: time.Now().Add(5 * time.Minute),
}
return []model.ActiveEmoteModel{}, nil
return []types.Emote{}, nil
}
return nil, err
}

// convert response to save some memory
var emotes []types.Emote
for _, emote := range response {
if emote.Data == nil {
zap.S().Infof("emote %v has no data field, skipping", emote.Name)
continue
}
emotes = append(emotes, types.Emote{
Name: emote.Name,
EmoteID: emote.ID,
Flags: emote.Flags,
State: emote.Data.State,
URL: emote.Data.Host.URL,
})
}

activeEmotesCache[channelID] = emoteCache{
emotes: emotes,
expires: time.Now().Add(5 * time.Minute),
Expand Down Expand Up @@ -117,13 +153,13 @@ func getEmotesByChannelId(channelID string) ([]model.ActiveEmoteModel, error) {
}

if userModel.EmoteSet == nil {
return nil, ErrIncompleteResponse
return nil, ErrEmotesNotEnabled
}

return userModel.EmoteSet.Emotes, nil
}

func getGlobalEmotes() ([]model.ActiveEmoteModel, error) {
func getGlobalEmotes() ([]types.Emote, error) {
mx.Lock()
defer mx.Unlock()

Expand All @@ -134,10 +170,27 @@ func getGlobalEmotes() ([]model.ActiveEmoteModel, error) {
}
}

emotes, err := requestGlobalEmotes()
response, err := requestGlobalEmotes()
if err != nil {
return nil, err
}

// convert response to save some memory
var emotes []types.Emote
for _, emote := range response {
if emote.Data == nil {
zap.S().Infof("emote %v has no data field, skipping", emote.Name)
continue
}
emotes = append(emotes, types.Emote{
Name: emote.Name,
EmoteID: emote.ID,
Flags: emote.Flags,
State: emote.Data.State,
URL: emote.Data.Host.URL,
})
}

activeEmotesCache["global"] = emoteCache{
expires: time.Now().Add(5 * time.Minute),
emotes: emotes,
Expand Down
4 changes: 2 additions & 2 deletions internal/aggregator/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ func (s *Service) ensureStream(js nats.JetStreamContext) error {
cfg := &nats.StreamConfig{
Name: s.cfg.Nats.Stream,
Subjects: []string{s.cfg.Nats.Topic.Raw + ".>"},
MaxAge: 12 * time.Hour,
MaxAge: 1 * time.Hour,
Retention: nats.InterestPolicy,
Discard: nats.DiscardNew,
// TODO: 0 seconds sets this to default value (2 min), find optimal value for our case
Duplicates: 0 * time.Second,
Duplicates: 1 * time.Minute,
}

_, err := js.StreamInfo(cfg.Name)
Expand Down
2 changes: 2 additions & 0 deletions internal/aggregator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,7 @@ func (s *Service) Init() error {
},
)
emotedb.SetCollections(coll)

initCache()
return s.subscribe(context.TODO(), s.handleMessage)
}
4 changes: 4 additions & 0 deletions internal/irc-reader/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ type Config struct {
LogLevel string
Replicas int

Kube struct {
Namespace string
Oauthsecret string
}
RateLimit struct {
Join int64
Auth int64
Expand Down
76 changes: 76 additions & 0 deletions internal/irc-reader/kube.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package irc_reader

import (
"context"
"errors"
"fmt"

"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

var (
ErrNoOAuthToken = errors.New("no OAuth token found in kubernetes secret data")
ErrNoSecret = errors.New("no secret found")
)

func (c *Controller) kubeInit() error {
config, err := rest.InClusterConfig()
if err != nil {
return err
}
c.kube, err = kubernetes.NewForConfig(config)
return err
}

func (c *Controller) watchKube(ctx context.Context, cb func() error) error {
watcher, err := c.kube.CoreV1().Secrets(c.cfg.Kube.Namespace).Watch(
ctx,
metav1.SingleObject(metav1.ObjectMeta{
Name: c.cfg.Kube.Oauthsecret,
}),
)
if err != nil {
return err
}
go func() {
for range watcher.ResultChan() {
err = cb()
if err != nil {
zap.S().Infow("failed to update OAuth token from kubernetes secret", err)
}
}
}()
return nil
}

func (c *Controller) updateOauthFromKubeSecret() error {
oauth, err := c.getOauthFromKubeSecret(context.Background())
if err != nil {
return err
}
c.twitch.UpdateOauth(oauth)
zap.S().Info("updated OAuth token from kubernetes secret")
return nil
}

func (c *Controller) getOauthFromKubeSecret(ctx context.Context) (string, error) {
secret, err := c.kube.CoreV1().Secrets(c.cfg.Kube.Namespace).Get(
ctx,
c.cfg.Kube.Oauthsecret,
metav1.GetOptions{},
)
if err != nil {
return "", err
}
if secret == nil {
return "", ErrNoSecret
}
data, ok := secret.Data["access-token"]
if !ok || len(data) == 0 {
return "", ErrNoOAuthToken
}
return fmt.Sprintf("oauth:%v", string(data)), nil
}
7 changes: 3 additions & 4 deletions internal/irc-reader/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package irc_reader
import (
"encoding/json"
"errors"
"fmt"
"time"

"github.com/nats-io/nats.go"
Expand All @@ -19,12 +18,12 @@ import (
func (c *Controller) ensureStream(js nats.JetStreamContext) error {
cfg := &nats.StreamConfig{
Name: c.cfg.Nats.Stream,
Subjects: []string{fmt.Sprintf("%v.>", c.cfg.Nats.Topic.Raw)},
MaxAge: 12 * time.Hour,
Subjects: []string{c.cfg.Nats.Topic.Raw + ".>"},
MaxAge: 1 * time.Hour,
Retention: nats.InterestPolicy,
Discard: nats.DiscardNew,
// TODO: 0 seconds sets this to default value (2 min), find optimal value for our case
Duplicates: 0 * time.Second,
Duplicates: 1 * time.Minute,
}

_, err := js.StreamInfo(cfg.Name)
Expand Down
28 changes: 27 additions & 1 deletion internal/irc-reader/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/nats-io/nats.go"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes"

"github.com/seventv/7tv-bot/internal/database"
"github.com/seventv/7tv-bot/internal/irc-reader/config"
Expand All @@ -19,6 +20,7 @@ import (
type Controller struct {
cfg *config.Config
jetStream nats.JetStreamContext
kube *kubernetes.Clientset
twitch *manager.IRCManager

shardID int
Expand Down Expand Up @@ -59,8 +61,21 @@ func (c *Controller) Init() error {
return err
}

err = c.kubeInit()
if err != nil {
return err
}

oauth := c.cfg.Twitch.Oauth
if oauth == "" {
oauth, err = c.getOauthFromKubeSecret(context.Background())
if err != nil {
return err
}
}

// initialize twitch IRC manager with ratelimit
c.twitch = manager.New(c.cfg.Twitch.User, c.cfg.Twitch.Oauth).
c.twitch = manager.New(c.cfg.Twitch.User, oauth).
WithLimit(ratelimit.New(
redisClient,
c.cfg.RateLimit.Join,
Expand All @@ -70,9 +85,20 @@ func (c *Controller) Init() error {

// watch for config changes to OAuth
config.OnChange = func() {
if c.cfg.Twitch.Oauth == "" {
return
}
c.twitch.UpdateOauth(c.cfg.Twitch.Oauth)
}

// watch for changes to OAuth in kubernetes secret
if c.cfg.Kube.Oauthsecret != "" {
err = c.watchKube(context.Background(), c.updateOauthFromKubeSecret)
if err != nil {
return err
}
}

// feed back twitch channels that got disconnected to the IRC
go c.handleOrphanedChannels()

Expand Down
13 changes: 5 additions & 8 deletions pkg/database/emotes/emotes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,15 @@ import (
)

func IncrementEmote(ctx context.Context, emote types.CountedEmote) error {
if emote.Emote.Data == nil {
return ErrMissingData
}
res, err := collections.GlobalStats.UpdateOne(
ctx,
bson.D{{"emote_id", emote.Emote.ID}},
bson.D{{"emote_id", emote.Emote.EmoteID}},
bson.M{"$setOnInsert": EmoteCount{
Name: emote.Emote.Name,
EmoteID: emote.Emote.ID,
EmoteID: emote.Emote.EmoteID,
Flags: emote.Emote.Flags,
State: emote.Emote.Data.State,
URL: emote.Emote.Data.Host.URL,
State: emote.Emote.State,
URL: emote.Emote.URL,
CreatedAt: time.Now().UTC(),
UpdatedAt: time.Now().UTC(),
Count: emote.Count,
Expand All @@ -37,7 +34,7 @@ func IncrementEmote(ctx context.Context, emote types.CountedEmote) error {

_, err = collections.GlobalStats.UpdateOne(
ctx,
bson.D{{"emote_id", emote.Emote.ID}},
bson.D{{"emote_id", emote.Emote.EmoteID}},
bson.M{
"$inc": bson.M{
"count": emote.Count,
Expand Down
15 changes: 13 additions & 2 deletions pkg/types/emote.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
package types

import "github.com/seventv/api/data/model"
import (
"github.com/seventv/api/data/model"
"go.mongodb.org/mongo-driver/bson/primitive"
)

type CountedEmote struct {
Count int
Emote model.ActiveEmoteModel
Emote Emote
}

type Emote struct {
Name string `bson:"name"`
EmoteID primitive.ObjectID `bson:"emote_id"`
Flags model.ActiveEmoteFlagModel `bson:"flags"`
State []model.EmoteVersionState `bson:"state,omitempty"`
URL string `bson:"url"`
}
Loading