diff --git a/docker-compose.yml b/docker-compose.yml index c1e1e7d3..9e3d6816 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,7 +20,7 @@ services: fresh -c scripts/fresh-runner.conf environment: - OTR_MONGO_URL=mongodb://mongo/dev - - OTR_REDIS_URL=redis://redis,redis://redis-sentinel-master + - OTR_REDIS_URL=redis://redis,redis-sentinel://redis-sentinel:26379?sentinelMasterId=mymaster - OTR_LOG_DEBUG=true ports: - 9000:9000 diff --git a/integration-tests/acceptance/docker-compose.yml b/integration-tests/acceptance/docker-compose.yml index db0bf8f9..04ff4fa6 100644 --- a/integration-tests/acceptance/docker-compose.yml +++ b/integration-tests/acceptance/docker-compose.yml @@ -39,7 +39,7 @@ services: dockerfile: ${OTR_DOCKERFILE} environment: - OTR_MONGO_URL=mongodb://mongo/tests - - OTR_REDIS_URL=redis://redis-sentinel:26379,redis://redis + - OTR_REDIS_URL=redis-sentinel://redis-sentinel:26379?sentinelMasterId=mymaster,redis://redis - OTR_LOG_DEBUG=true - OTR_OPLOG_V2_EXTRACT_SUBFIELD_CHANGES=true depends_on: diff --git a/integration-tests/performance/docker-compose.yml b/integration-tests/performance/docker-compose.yml index 38c03978..d09bf424 100644 --- a/integration-tests/performance/docker-compose.yml +++ b/integration-tests/performance/docker-compose.yml @@ -37,7 +37,7 @@ services: build: ../.. environment: - OTR_MONGO_URL=mongodb://mongo/tests - - OTR_REDIS_URL=redis://redis-sentinel:26379,redis://redis + - OTR_REDIS_URL=redis-sentinel://redis-sentinel:26379?sentinelMasterId=mymaster,redis://redis depends_on: mongo: condition: service_healthy diff --git a/lib/parse/main.go b/lib/parse/main.go new file mode 100644 index 00000000..ac054994 --- /dev/null +++ b/lib/parse/main.go @@ -0,0 +1,98 @@ +package parse + +import ( + "net/url" + "regexp" + "strconv" + "strings" + + "github.com/go-redis/redis/v8" + "github.com/pkg/errors" +) + +// parseRedisURL converts an url string, that may be a redis connection string, +// or may be a sentinel protocol pseudo-url, into a set of redis connection options. +func ParseRedisURL(url string, isSentinel bool) (*redis.UniversalOptions, error) { + if isSentinel { + opts, err := parseSentinelURL(url) + return opts, err + } + + parsedRedisURL, err := redis.ParseURL(url) + if err != nil { + return nil, errors.Wrap(err, "Error parsing Redis URL") + } + + // non-sentinel redis does not use MasterName, so leave it as "" + return &redis.UniversalOptions{ + Addrs: []string{parsedRedisURL.Addr}, + DB: parsedRedisURL.DB, + Password: parsedRedisURL.Password, + TLSConfig: parsedRedisURL.TLSConfig, + }, nil + +} + +// match against redis-sentinel://[something@]something[/db] +var urlMatcher *regexp.Regexp = regexp.MustCompile(`redis-sentinel:\/\/(([^@]+)@)?([^/]+)(\/(\d+))?`) + +// match against host:port +var endpointMatcher *regexp.Regexp = regexp.MustCompile(`([^:]+):(\d+)`) + +// parseSentinelURL converts a sentinel protocol pseudo-url into a set of redis connection numbers. +// we expect sentinel urls to be of the form redis-sentinel://[password@]host:port[,host2:port2][,hostN:portN][/db][?sentinelMasterId=name] +// because of the redis-sentinel:// protocol, the protocol cannot be rediss:// +// and therefore there cannot be any tls config for the options returned from this function +func parseSentinelURL(urlString string) (*redis.UniversalOptions, error) { + // the comma-separated list of host:port pairs means this is not a true url, and so must be parsed manually + + // parse query params + queryIdx := strings.Index(urlString, "?") + base := urlString + query := "" + if queryIdx >= 0 { + base = urlString[0:queryIdx] + query = urlString[queryIdx+1:] + } + queryParams, err := url.ParseQuery(query) + if err != nil { + return nil, err + } + sentinelMasterId, ok := queryParams["sentinelMasterId"] + var name string = "" + if ok { + name = sentinelMasterId[0] + } + + // parse base url + match := urlMatcher.FindStringSubmatch(base) + if match == nil || match[0] != base { + return nil, errors.New("Redis Sentinel URL did not conform to schema") + } + password := match[2] + endpoints := match[3] + dbStr := match[5] + // db is optional + db := 0 + if dbStr != "" { + db, err = strconv.Atoi(dbStr) + if err != nil { + return nil, errors.Wrap(err, "Redis Sentinel URL DB is NaN") + } + } + + // check endpoints parse + endpointsList := strings.Split(endpoints, ",") + for _, endpoint := range endpointsList { + if endpointMatcher.FindString(endpoint) != endpoint { + return nil, errors.New("Redis Sentinel URL Endpoints List did not conform to schema") + } + } + + return &redis.UniversalOptions{ + Password: password, + Addrs: endpointsList, + MasterName: name, + DB: db, + }, nil +} diff --git a/main.go b/main.go index a0bed861..bcc67a47 100644 --- a/main.go +++ b/main.go @@ -8,8 +8,8 @@ import ( "net/http" "os" "os/signal" - "sync" "strings" + "sync" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" @@ -18,6 +18,7 @@ import ( "github.com/tulip/oplogtoredis/lib/config" "github.com/tulip/oplogtoredis/lib/log" "github.com/tulip/oplogtoredis/lib/oplog" + "github.com/tulip/oplogtoredis/lib/parse" "github.com/tulip/oplogtoredis/lib/redispub" "github.com/go-redis/redis/v8" @@ -81,10 +82,10 @@ func main() { waitGroup.Add(1) go func() { tailer := oplog.Tailer{ - MongoClient: mongoSession, + MongoClient: mongoSession, RedisClients: redisClients, - RedisPrefix: config.RedisMetadataPrefix(), - MaxCatchUp: config.MaxCatchUp(), + RedisPrefix: config.RedisMetadataPrefix(), + MaxCatchUp: config.MaxCatchUp(), } tailer.Tail(redisPubs, stopOplogTail) @@ -189,29 +190,11 @@ func createRedisClients() ([]redis.UniversalClient, error) { var ret []redis.UniversalClient for _, url := range config.RedisURL() { - parsedRedisURL, err := redis.ParseURL(url) - log.Log.Info("Parsed redis url: ", url) + clientOptions, err := parse.ParseRedisURL(url, strings.HasPrefix(url, "redis-sentinel://")) if err != nil { return nil, errors.Wrap(err, "parsing redis url") } - var clientOptions redis.UniversalOptions - - if !strings.Contains(url, "sentinel") { - clientOptions = redis.UniversalOptions{ - Addrs: []string{parsedRedisURL.Addr}, - DB: parsedRedisURL.DB, - Password: parsedRedisURL.Password, - TLSConfig: parsedRedisURL.TLSConfig, - } - }else{ - clientOptions = redis.UniversalOptions{ - Addrs: []string{parsedRedisURL.Addr}, - DB: parsedRedisURL.DB, - Password: parsedRedisURL.Password, - TLSConfig: parsedRedisURL.TLSConfig, - MasterName: "mymaster", - } - } + log.Log.Info("Parsed redis url: ", clientOptions) if clientOptions.TLSConfig != nil { clientOptions.TLSConfig = &tls.Config{ @@ -219,7 +202,7 @@ func createRedisClients() ([]redis.UniversalClient, error) { MinVersion: tls.VersionTLS12, } } - client := redis.NewUniversalClient(&clientOptions) + client := redis.NewUniversalClient(clientOptions) _, err = client.Ping(context.Background()).Result() if err != nil { return nil, errors.Wrap(err, "pinging redis") @@ -227,8 +210,6 @@ func createRedisClients() ([]redis.UniversalClient, error) { ret = append(ret, client) } - - return ret, nil } @@ -237,13 +218,13 @@ func makeHTTPServer(clients []redis.UniversalClient, mongo *mongo.Client) *http. mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { redisOK := true - for _,redis := range clients { + for _, redis := range clients { redisErr := redis.Ping(context.Background()).Err() - redisOK = (redisOK && (redisErr == nil)) - if !redisOK { - log.Log.Errorw("Error connecting to Redis during healthz check", - "error", redisErr) - } + redisOK = (redisOK && (redisErr == nil)) + if !redisOK { + log.Log.Errorw("Error connecting to Redis during healthz check", + "error", redisErr) + } } ctx, cancel := context.WithTimeout(context.Background(), config.MongoConnectTimeout())