Skip to content

Commit

Permalink
Switch to ioredis
Browse files Browse the repository at this point in the history
  • Loading branch information
dschmidt committed Jul 13, 2023
1 parent ce725af commit 3dbaaf5
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 197 deletions.
13 changes: 0 additions & 13 deletions .yarn/patches/@types-connect-redis-npm-0.0.18-4fd2b614d3

This file was deleted.

1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@
]
},
"resolutions": {
"@types/[email protected]": "patch:@types/connect-redis@npm:0.0.18#.yarn/patches/@types-connect-redis-npm-0.0.18-4fd2b614d3",
"@types/eslint@^7.2.13": "^8.2.0",
"@types/react": "^17",
"@types/webpack-dev-server": "^4",
Expand Down
5 changes: 2 additions & 3 deletions packages/@uppy/companion/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"body-parser": "1.20.0",
"chalk": "4.1.2",
"common-tags": "1.8.2",
"connect-redis": "6.1.3",
"connect-redis": "7.1.0",
"content-disposition": "^0.5.4",
"cookie-parser": "1.4.6",
"cors": "^2.8.5",
Expand All @@ -52,6 +52,7 @@
"got": "11",
"grant": "5.4.21",
"helmet": "^4.6.0",
"ioredis": "5.3.2",
"ipaddr.js": "^2.0.1",
"jsonwebtoken": "8.5.1",
"lodash": "^4.17.21",
Expand All @@ -62,7 +63,6 @@
"ms": "2.1.3",
"node-schedule": "2.1.0",
"prom-client": "14.0.1",
"redis": "4.2.0",
"semver": "7.5.3",
"serialize-error": "^2.1.0",
"serialize-javascript": "^6.0.0",
Expand All @@ -73,7 +73,6 @@
},
"devDependencies": {
"@types/compression": "1.7.0",
"@types/connect-redis": "0.0.18",
"@types/cookie-parser": "1.4.2",
"@types/cors": "2.8.6",
"@types/eslint": "^8.2.0",
Expand Down
6 changes: 3 additions & 3 deletions packages/@uppy/companion/src/companion.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ module.exports.app = (optionsArg = {}) => {
logger.setMaskables(getMaskableSecrets(options))

// create singleton redis client
if (options.redisUrl) {
redis.client(options)
if (options.redisOptions) {
redis.client(options.redisOptions)
}
const emitter = createEmitter(options.redisUrl, options.redisPubSubScope)
const emitter = createEmitter(options.redisOptions, options.redisPubSubScope)

const app = express()

Expand Down
4 changes: 2 additions & 2 deletions packages/@uppy/companion/src/server/emitter/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ let emitter
* Used to transmit events (such as progress, upload completion) from controllers,
* such as the Google Drive 'get' controller, along to the client.
*/
module.exports = (redisUrl, redisPubSubScope) => {
module.exports = (redisOptions, redisPubSubScope) => {
if (!emitter) {
emitter = redisUrl ? redisEmitter(redisUrl, redisPubSubScope) : nodeEmitter()
emitter = redisOptions ? redisEmitter(redisPubSubScope) : nodeEmitter()
}

return emitter
Expand Down
26 changes: 17 additions & 9 deletions packages/@uppy/companion/src/server/emitter/redis-emitter.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
const redis = require('redis')
const { EventEmitter } = require('node:events')

const logger = require('../logger')
const redis = require('../redis')

/**
* This module simulates the builtin events.EventEmitter but with the use of redis.
* This is useful for when companion is running on multiple instances and events need
* to be distributed across.
*/
module.exports = (redisUrl, redisPubSubScope) => {
module.exports = (redisPubSubScope) => {
const prefix = redisPubSubScope ? `${redisPubSubScope}:` : ''
const getPrefixedEventName = (eventName) => `${prefix}${eventName}`
const publisher = redis.createClient({ url: redisUrl })
publisher.on('error', err => logger.error('publisher redis error', err))
const publisher = redis.client().duplicate({ lazyConnect: true })
publisher.on('error', err => logger.error('publisher redis error', err.toString()))
let subscriber

const connectedPromise = publisher.connect().then(() => {
subscriber = publisher.duplicate()
subscriber.on('error', err => logger.error('subscriber redis error', err))
subscriber.on('error', err => logger.error('subscriber redis error', err.toString()))
return subscriber.connect()
})

Expand Down Expand Up @@ -56,12 +56,17 @@ module.exports = (redisUrl, redisPubSubScope) => {
handlersByThisEventName.delete(handler)
if (handlersByThisEventName.size === 0) handlersByEvent.delete(eventName)

return subscriber.pUnsubscribe(getPrefixedEventName(eventName), actualHandler)
subscriber.off('message', actualHandler)
return subscriber.punsubscribe(getPrefixedEventName(eventName))
})
}

function addListener (eventName, handler, _once = false) {
function actualHandler (message) {
function actualHandler (pattern, channel, message) {
if (pattern !== getPrefixedEventName(eventName)) {
return
}

if (_once) removeListener(eventName, handler)
let args
try {
Expand All @@ -79,7 +84,10 @@ module.exports = (redisUrl, redisPubSubScope) => {
}
handlersByThisEventName.set(handler, actualHandler)

runWhenConnected(() => subscriber.pSubscribe(getPrefixedEventName(eventName), actualHandler))
runWhenConnected(() => {
subscriber.on('pmessage', actualHandler)
return subscriber.psubscribe(getPrefixedEventName(eventName))
})
}

/**
Expand Down Expand Up @@ -125,7 +133,7 @@ module.exports = (redisUrl, redisPubSubScope) => {

return runWhenConnected(() => {
handlersByEvent.delete(eventName)
return subscriber.pUnsubscribe(getPrefixedEventName(eventName))
return subscriber.punsubscribe(getPrefixedEventName(eventName))
})
}

Expand Down
25 changes: 7 additions & 18 deletions packages/@uppy/companion/src/server/redis.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const redis = require('redis')
const Redis = require('ioredis').default

const logger = require('./logger')

Expand All @@ -8,33 +8,22 @@ let redisClient
* A Singleton module that provides a single redis client through out
* the lifetime of the server
*
* @param {Record<string, unknown>} [opts] node-redis client options
* @param {Record<string, any>} [opts] node-redis client options
*/
function createClient (opts) {
if (!redisClient) {
// todo remove legacyMode when fixed: https://github.com/tj/connect-redis/issues/361
redisClient = redis.createClient({ ...opts, legacyMode: true })
redisClient = new Redis(opts)

redisClient.on('error', err => logger.error('redis error', err))

;(async () => {
try {
// fire and forget.
// any requests made on the client before connection is established will be auto-queued by node-redis
await redisClient.connect()
} catch (err) {
logger.error(err.message, 'redis.error')
}
})()
redisClient.on('error', err => logger.error('redis error', err.toString()))
}

return redisClient
}

module.exports.client = (companionOptions) => {
if (!companionOptions) {
module.exports.client = (redisOptions) => {
if (!redisOptions) {
return redisClient
}

return createClient({ ...companionOptions.redisOptions, url: companionOptions.redisUrl })
return createClient(redisOptions)
}
2 changes: 1 addition & 1 deletion packages/@uppy/companion/src/server/socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const { STORAGE_PREFIX, shortenToken } = require('./Uploader')
*/
module.exports = (server) => {
const wss = new SocketServer({ server })
const redisClient = redis.client()?.v4
const redisClient = redis.client()

// A new connection is usually created when an upload begins,
// or when connection fails while an upload is on-going and,
Expand Down
15 changes: 12 additions & 3 deletions packages/@uppy/companion/src/standalone/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,20 @@ const getConfigFromEnv = () => {
periodicPingCount: process.env.COMPANION_PERIODIC_PING_COUNT
? parseInt(process.env.COMPANION_PERIODIC_PING_COUNT, 10) : undefined,
filePath: process.env.COMPANION_DATADIR,
redisUrl: process.env.COMPANION_REDIS_URL,
redisPubSubScope: process.env.COMPANION_REDIS_PUBSUB_SCOPE,
// adding redisOptions to keep all companion options easily visible
// redisOptions refers to https://www.npmjs.com/package/redis#options-object-properties
redisOptions: {},
// redisOptions refers to https://redis.github.io/ioredis/index.html#RedisOptions
redisOptions: (() => {
try {
const obj = JSON.parse(process.env.COMPANION_REDIS_OPTIONS)
return obj
} catch (e) {
if (process.env.COMPANION_REDIS_OPTIONS) {
console.log('COMPANION_REDIS_OPTIONS parse error', e)
}
return process.env.COMPANION_REDIS_URL
}
})(),
sendSelfEndpoint: process.env.COMPANION_SELF_ENDPOINT,
uploadUrls: uploadUrls ? uploadUrls.split(',') : null,
secret: getSecret('COMPANION_SECRET'),
Expand Down
3 changes: 1 addition & 2 deletions packages/@uppy/companion/src/standalone/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const morgan = require('morgan')
const { URL } = require('node:url')
const session = require('express-session')
const addRequestId = require('express-request-id')()
const connectRedis = require('connect-redis')
const RedisStore = require('connect-redis').default

const logger = require('../server/logger')
const redis = require('../server/redis')
Expand Down Expand Up @@ -110,7 +110,6 @@ module.exports = function server (inputCompanionOptions) {
}

if (companionOptions.redisUrl) {
const RedisStore = connectRedis(session)
const redisClient = redis.client(companionOptions)
// todo next major: change default prefix to something like "companion-session:" and possibly remove this option
sessionOptions.store = new RedisStore({ client: redisClient, prefix: process.env.COMPANION_REDIS_EXPRESS_SESSION_PREFIX || 'sess:' })
Expand Down
Loading

0 comments on commit 3dbaaf5

Please sign in to comment.