Skip to content

Commit

Permalink
change redis sentinel parse schema
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-goodisman committed Dec 20, 2023
1 parent 71b3f1e commit 45a34f3
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 36 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ services:
fresh -c scripts/fresh-runner.conf
environment:
- OTR_MONGO_URL=mongodb://mongo/dev
- OTR_REDIS_URL=redis://redis,redis://redis-sentinel-master
- OTR_REDIS_URL=redis://redis,redis-sentinel://redis-sentinel:26379?sentinelMasterId=mymaster
- OTR_LOG_DEBUG=true
ports:
- 9000:9000
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/acceptance/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ services:
dockerfile: ${OTR_DOCKERFILE}
environment:
- OTR_MONGO_URL=mongodb://mongo/tests
- OTR_REDIS_URL=redis://redis-sentinel:26379,redis://redis
- OTR_REDIS_URL=redis-sentinel://redis-sentinel:26379?sentinelMasterId=mymaster,redis://redis
- OTR_LOG_DEBUG=true
- OTR_OPLOG_V2_EXTRACT_SUBFIELD_CHANGES=true
depends_on:
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/performance/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ services:
build: ../..
environment:
- OTR_MONGO_URL=mongodb://mongo/tests
- OTR_REDIS_URL=redis://redis-sentinel:26379,redis://redis
- OTR_REDIS_URL=redis-sentinel://redis-sentinel:26379?sentinelMasterId=mymaster,redis://redis
depends_on:
mongo:
condition: service_healthy
Expand Down
98 changes: 98 additions & 0 deletions lib/parse/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package parse

import (
"net/url"
"regexp"
"strconv"
"strings"

"github.com/go-redis/redis/v8"
"github.com/pkg/errors"
)

// parseRedisURL converts an url string, that may be a redis connection string,
// or may be a sentinel protocol pseudo-url, into a set of redis connection options.
func ParseRedisURL(url string, isSentinel bool) (*redis.UniversalOptions, error) {
if isSentinel {
opts, err := parseSentinelURL(url)
return opts, err
}

parsedRedisURL, err := redis.ParseURL(url)
if err != nil {
return nil, errors.Wrap(err, "Error parsing Redis URL")
}

// non-sentinel redis does not use MasterName, so leave it as ""
return &redis.UniversalOptions{
Addrs: []string{parsedRedisURL.Addr},
DB: parsedRedisURL.DB,
Password: parsedRedisURL.Password,
TLSConfig: parsedRedisURL.TLSConfig,
}, nil

}

// match against redis-sentinel://[something@]something[/db]
var urlMatcher *regexp.Regexp = regexp.MustCompile(`redis-sentinel:\/\/(([^@]+)@)?([^/]+)(\/(\d+))?`)

// match against host:port
var endpointMatcher *regexp.Regexp = regexp.MustCompile(`([^:]+):(\d+)`)

// parseSentinelURL converts a sentinel protocol pseudo-url into a set of redis connection numbers.
// we expect sentinel urls to be of the form redis-sentinel://[password@]host:port[,host2:port2][,hostN:portN][/db][?sentinelMasterId=name]
// because of the redis-sentinel:// protocol, the protocol cannot be rediss://
// and therefore there cannot be any tls config for the options returned from this function
func parseSentinelURL(urlString string) (*redis.UniversalOptions, error) {
// the comma-separated list of host:port pairs means this is not a true url, and so must be parsed manually

// parse query params
queryIdx := strings.Index(urlString, "?")
base := urlString
query := ""
if queryIdx >= 0 {
base = urlString[0:queryIdx]
query = urlString[queryIdx+1:]
}
queryParams, err := url.ParseQuery(query)
if err != nil {
return nil, err
}
sentinelMasterId, ok := queryParams["sentinelMasterId"]
var name string = ""
if ok {
name = sentinelMasterId[0]
}

// parse base url
match := urlMatcher.FindStringSubmatch(base)
if match == nil || match[0] != base {
return nil, errors.New("Redis Sentinel URL did not conform to schema")
}
password := match[2]
endpoints := match[3]
dbStr := match[5]
// db is optional
db := 0
if dbStr != "" {
db, err = strconv.Atoi(dbStr)
if err != nil {
return nil, errors.Wrap(err, "Redis Sentinel URL DB is NaN")
}
}

// check endpoints parse
endpointsList := strings.Split(endpoints, ",")
for _, endpoint := range endpointsList {
if endpointMatcher.FindString(endpoint) != endpoint {
return nil, errors.New("Redis Sentinel URL Endpoints List did not conform to schema")
}
}

return &redis.UniversalOptions{
Password: password,
Addrs: endpointsList,
MasterName: name,
DB: db,
}, nil
}
47 changes: 14 additions & 33 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"net/http"
"os"
"os/signal"
"sync"
"strings"
"sync"

"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
Expand All @@ -18,6 +18,7 @@ import (
"github.com/tulip/oplogtoredis/lib/config"
"github.com/tulip/oplogtoredis/lib/log"
"github.com/tulip/oplogtoredis/lib/oplog"
"github.com/tulip/oplogtoredis/lib/parse"
"github.com/tulip/oplogtoredis/lib/redispub"

"github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -81,10 +82,10 @@ func main() {
waitGroup.Add(1)
go func() {
tailer := oplog.Tailer{
MongoClient: mongoSession,
MongoClient: mongoSession,
RedisClients: redisClients,
RedisPrefix: config.RedisMetadataPrefix(),
MaxCatchUp: config.MaxCatchUp(),
RedisPrefix: config.RedisMetadataPrefix(),
MaxCatchUp: config.MaxCatchUp(),
}
tailer.Tail(redisPubs, stopOplogTail)

Expand Down Expand Up @@ -189,46 +190,26 @@ func createRedisClients() ([]redis.UniversalClient, error) {
var ret []redis.UniversalClient

for _, url := range config.RedisURL() {
parsedRedisURL, err := redis.ParseURL(url)
log.Log.Info("Parsed redis url: ", url)
clientOptions, err := parse.ParseRedisURL(url, strings.HasPrefix(url, "redis-sentinel://"))
if err != nil {
return nil, errors.Wrap(err, "parsing redis url")
}
var clientOptions redis.UniversalOptions

if !strings.Contains(url, "sentinel") {
clientOptions = redis.UniversalOptions{
Addrs: []string{parsedRedisURL.Addr},
DB: parsedRedisURL.DB,
Password: parsedRedisURL.Password,
TLSConfig: parsedRedisURL.TLSConfig,
}
}else{
clientOptions = redis.UniversalOptions{
Addrs: []string{parsedRedisURL.Addr},
DB: parsedRedisURL.DB,
Password: parsedRedisURL.Password,
TLSConfig: parsedRedisURL.TLSConfig,
MasterName: "mymaster",
}
}
log.Log.Info("Parsed redis url: ", clientOptions)

if clientOptions.TLSConfig != nil {
clientOptions.TLSConfig = &tls.Config{
InsecureSkipVerify: false,
MinVersion: tls.VersionTLS12,
}
}
client := redis.NewUniversalClient(&clientOptions)
client := redis.NewUniversalClient(clientOptions)
_, err = client.Ping(context.Background()).Result()
if err != nil {
return nil, errors.Wrap(err, "pinging redis")
}
ret = append(ret, client)
}



return ret, nil
}

Expand All @@ -237,13 +218,13 @@ func makeHTTPServer(clients []redis.UniversalClient, mongo *mongo.Client) *http.

mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
redisOK := true
for _,redis := range clients {
for _, redis := range clients {
redisErr := redis.Ping(context.Background()).Err()
redisOK = (redisOK && (redisErr == nil))
if !redisOK {
log.Log.Errorw("Error connecting to Redis during healthz check",
"error", redisErr)
}
redisOK = (redisOK && (redisErr == nil))
if !redisOK {
log.Log.Errorw("Error connecting to Redis during healthz check",
"error", redisErr)
}
}

ctx, cancel := context.WithTimeout(context.Background(), config.MongoConnectTimeout())
Expand Down

0 comments on commit 45a34f3

Please sign in to comment.