Skip to content

Commit

Permalink
Support redis cluster
Browse files Browse the repository at this point in the history
- Added `--redis-cluster-nodes` flag
- Display cluster information in redis info page
  • Loading branch information
hibiken committed Sep 7, 2021
1 parent 0082155 commit ce5c86e
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 118 deletions.
91 changes: 61 additions & 30 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"log"
"net/http"
"path/filepath"
"strings"
"time"

"github.com/go-redis/redis/v8"
Expand All @@ -20,13 +21,14 @@ import (

// Command-line flags
var (
flagPort int
flagRedisAddr string
flagRedisDB int
flagRedisPassword string
flagRedisTLS string
flagRedisURL string
flagRedisInsecureTLS bool
flagPort int
flagRedisAddr string
flagRedisDB int
flagRedisPassword string
flagRedisTLS string
flagRedisURL string
flagRedisInsecureTLS bool
flagRedisClusterNodes string
)

func init() {
Expand All @@ -36,7 +38,8 @@ func init() {
flag.StringVar(&flagRedisPassword, "redis-password", "", "password to use when connecting to redis server")
flag.StringVar(&flagRedisTLS, "redis-tls", "", "server name for TLS validation used when connecting to redis server")
flag.StringVar(&flagRedisURL, "redis-url", "", "URL to redis server")
flag.BoolVar(&flagRedisInsecureTLS, "redis-insecure-tls", false, "Disable TLS certificate host checks")
flag.BoolVar(&flagRedisInsecureTLS, "redis-insecure-tls", false, "disable TLS certificate host checks")
flag.StringVar(&flagRedisClusterNodes, "redis-cluster-nodes", "", "comma separated list of host:port addresses of cluster nodes")
}

// staticFileServer implements the http.Handler interface, so we can use it
Expand Down Expand Up @@ -88,20 +91,26 @@ func (srv *staticFileServer) indexFilePath() string {
return filepath.Join(srv.staticDirPath, srv.indexFileName)
}

func getRedisOptionsFromFlags() (*redis.Options, error) {
var err error
var opts *redis.Options
func getRedisOptionsFromFlags() (*redis.UniversalOptions, error) {
var opts redis.UniversalOptions

if flagRedisURL != "" {
opts, err = redis.ParseURL(flagRedisURL)
if err != nil {
return nil, err
}
if flagRedisClusterNodes != "" {
opts.Addrs = strings.Split(flagRedisClusterNodes, ",")
opts.Password = flagRedisPassword
} else {
opts = &redis.Options{
Addr: flagRedisAddr,
DB: flagRedisDB,
Password: flagRedisPassword,
if flagRedisURL != "" {
res, err := redis.ParseURL(flagRedisURL)
if err != nil {
return nil, err
}
opts.Addrs = append(opts.Addrs, res.Addr)
opts.DB = res.DB
opts.Password = res.Password

} else {
opts.Addrs = []string{flagRedisAddr}
opts.DB = flagRedisDB
opts.Password = flagRedisPassword
}
}

Expand All @@ -114,7 +123,7 @@ func getRedisOptionsFromFlags() (*redis.Options, error) {
}
opts.TLSConfig.InsecureSkipVerify = true
}
return opts, nil
return &opts, nil
}

//go:embed ui/build/*
Expand All @@ -128,16 +137,34 @@ func main() {
log.Fatal(err)
}

inspector := asynq.NewInspector(asynq.RedisClientOpt{
Addr: opts.Addr,
DB: opts.DB,
Password: opts.Password,
TLSConfig: opts.TLSConfig,
})
useRedisCluster := flagRedisClusterNodes != ""

var redisConnOpt asynq.RedisConnOpt
if useRedisCluster {
redisConnOpt = asynq.RedisClusterClientOpt{
Addrs: opts.Addrs,
Password: opts.Password,
TLSConfig: opts.TLSConfig,
}
} else {
redisConnOpt = asynq.RedisClientOpt{
Addr: opts.Addrs[0],
DB: opts.DB,
Password: opts.Password,
TLSConfig: opts.TLSConfig,
}
}

inspector := asynq.NewInspector(redisConnOpt)
defer inspector.Close()

rdb := redis.NewClient(opts)
defer rdb.Close()
var redisClient redis.UniversalClient
if useRedisCluster {
redisClient = redis.NewClusterClient(opts.Cluster())
} else {
redisClient = redis.NewClient(opts.Simple())
}
defer redisClient.Close()

router := mux.NewRouter()
router.Use(loggingMiddleware)
Expand Down Expand Up @@ -207,7 +234,11 @@ func main() {
api.HandleFunc("/scheduler_entries/{entry_id}/enqueue_events", newListSchedulerEnqueueEventsHandlerFunc(inspector)).Methods("GET")

// Redis info endpoint.
api.HandleFunc("/redis_info", newRedisInfoHandlerFunc(rdb)).Methods("GET")
if useRedisCluster {
api.HandleFunc("/redis_info", newRedisClusterInfoHandlerFunc(redisClient.(*redis.ClusterClient), inspector)).Methods("GET")
} else {
api.HandleFunc("/redis_info", newRedisInfoHandlerFunc(redisClient.(*redis.Client))).Methods("GET")
}

fs := &staticFileServer{
contents: staticContents,
Expand Down
72 changes: 69 additions & 3 deletions redis_info_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

"github.com/go-redis/redis/v8"
"github.com/hibiken/asynq"
)

// ****************************************************************************
Expand All @@ -18,21 +19,86 @@ type RedisInfoResponse struct {
Addr string `json:"address"`
Info map[string]string `json:"info"`
RawInfo string `json:"raw_info"`
Cluster bool `json:"cluster"`

// Following fields are only set when connected to redis cluster.
RawClusterNodes string `json:"raw_cluster_nodes"`
QueueLocations []*QueueLocationInfo `json:"queue_locations"`
}

type QueueLocationInfo struct {
Queue string `json:"queue"` // queue name
KeySlot int64 `json:"keyslot"` // cluster key slot for the queue
Nodes []string `json:"nodes"` // list of cluster node addresses
}

func newRedisInfoHandlerFunc(rdb *redis.Client) http.HandlerFunc {
func newRedisInfoHandlerFunc(client *redis.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
res, err := rdb.Info(ctx).Result()
res, err := client.Info(ctx).Result()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
info := parseRedisInfo(res)
resp := RedisInfoResponse{
Addr: rdb.Options().Addr,
Addr: client.Options().Addr,
Info: info,
RawInfo: res,
Cluster: false,
}
if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}

func newRedisClusterInfoHandlerFunc(client *redis.ClusterClient, inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
rawClusterInfo, err := client.ClusterInfo(ctx).Result()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
info := parseRedisInfo(rawClusterInfo)
rawClusterNodes, err := client.ClusterNodes(ctx).Result()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
queues, err := inspector.Queues()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var queueLocations []*QueueLocationInfo
for _, qname := range queues {
q := QueueLocationInfo{Queue: qname}
q.KeySlot, err = inspector.ClusterKeySlot(qname)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
nodes, err := inspector.ClusterNodes(qname)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
for _, n := range nodes {
q.Nodes = append(q.Nodes, n.Addr)
}
queueLocations = append(queueLocations, &q)
}

resp := RedisInfoResponse{
Addr: strings.Join(client.Options().Addrs, ","),
Info: info,
RawInfo: rawClusterInfo,
Cluster: true,
RawClusterNodes: rawClusterNodes,
QueueLocations: queueLocations,
}
if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
12 changes: 12 additions & 0 deletions ui/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ export interface RedisInfoResponse {
address: string;
info: RedisInfo;
raw_info: string;
cluster: boolean;

// following fields are set only when cluster=true
raw_cluster_nodes: string;
queue_locations: QueueLocation[] | null;
}

// Describes location of a queue in cluster.
export interface QueueLocation {
queue: string; // queue name
keyslot: number; // cluster keyslot
nodes: string[]; // node addresses
}

// Return value from redis INFO command.
Expand Down
48 changes: 48 additions & 0 deletions ui/src/components/QueueLocationTable.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import React from "react";
import { makeStyles } from "@material-ui/core/styles";
import Table from "@material-ui/core/Table";
import TableBody from "@material-ui/core/TableBody";
import TableCell from "@material-ui/core/TableCell";
import TableContainer from "@material-ui/core/TableContainer";
import TableHead from "@material-ui/core/TableHead";
import TableRow from "@material-ui/core/TableRow";
import { QueueLocation } from "../api";

const useStyles = makeStyles((theme) => ({
table: {
minWidth: 650,
},
}));

interface Props {
queueLocations: QueueLocation[];
}

export default function QueueLocationTable(props: Props) {
const classes = useStyles();

return (
<TableContainer>
<Table className={classes.table} aria-label="queue location table">
<TableHead>
<TableRow>
<TableCell>Queue</TableCell>
<TableCell>KeySlot</TableCell>
<TableCell>Node Addresses</TableCell>
</TableRow>
</TableHead>
<TableBody>
{props.queueLocations.map((loc) => (
<TableRow key={loc.queue}>
<TableCell component="th" scope="row">
{loc.queue}
</TableCell>
<TableCell>{loc.keyslot}</TableCell>
<TableCell>{loc.nodes.join(", ")}</TableCell>
</TableRow>
))}
</TableBody>
</Table>
</TableContainer>
);
}
11 changes: 10 additions & 1 deletion ui/src/reducers/redisInfoReducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import {
GET_REDIS_INFO_SUCCESS,
RedisInfoActionTypes,
} from "../actions/redisInfoActions";
import { RedisInfo } from "../api";
import { QueueLocation, RedisInfo } from "../api";

interface RedisInfoState {
loading: boolean;
error: string;
address: string;
data: RedisInfo | null;
rawData: string | null;
cluster: boolean;
rawClusterNodes: string | null;
queueLocations: QueueLocation[] | null;
}

const initialState: RedisInfoState = {
Expand All @@ -20,6 +23,9 @@ const initialState: RedisInfoState = {
address: "",
data: null,
rawData: null,
cluster: false,
rawClusterNodes: null,
queueLocations: null,
};

export default function redisInfoReducer(
Expand Down Expand Up @@ -47,6 +53,9 @@ export default function redisInfoReducer(
address: action.payload.address,
data: action.payload.info,
rawData: action.payload.raw_info,
cluster: action.payload.cluster,
rawClusterNodes: action.payload.raw_cluster_nodes,
queueLocations: action.payload.queue_locations,
};

default:
Expand Down
Loading

0 comments on commit ce5c86e

Please sign in to comment.