const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
+ /**
+ * @type {Object.<string, Array.<function(string): void>>}
+ */
+ const subs = {};
+
const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
};
};
+ /**
+ * @param {string} message
+ * @param {string} channel
+ */
+ const onRedisMessage = (message, channel) => {
+ const callbacks = subs[channel];
+
+ log.silly(`New message on channel ${channel}`);
+
+ if (!callbacks) {
+ return;
+ }
+
+ callbacks.forEach(callback => callback(message));
+ };
+
/**
* @param {string} channel
* @param {function(string): void} callback
const subscribe = (channel, callback) => {
log.silly(`Adding listener for ${channel}`);
- redisSubscribeClient.subscribe(channel, callback);
+ subs[channel] = subs[channel] || [];
+
+ if (subs[channel].length === 0) {
+ log.verbose(`Subscribe ${channel}`);
+ redisSubscribeClient.subscribe(channel, onRedisMessage);
+ }
+
+ subs[channel].push(callback);
};
/**
* @param {string} channel
- * @param {function(string): void} callback
*/
const unsubscribe = (channel, callback) => {
log.silly(`Removing listener for ${channel}`);
- redisSubscribeClient.unsubscribe(channel, callback);
+ if (!subs[channel]) {
+ return;
+ }
+
+ subs[channel] = subs[channel].filter(item => item !== callback);
+
+ if (subs[channel].length === 0) {
+ log.verbose(`Unsubscribe ${channel}`);
+ redisSubscribeClient.unsubscribe(channel);
+ delete subs[channel];
+ }
};
const FALSE_VALUES = [