summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
author猫吸血鬼ディフリス / 猫ロキP <deflis@gmail.com>2017-06-21 03:41:41 +0900
committerEugen Rochko <eugen@zeonfederated.com>2017-06-20 20:41:41 +0200
commitd8ec83280637e53ded67d4938a198cbeb9e8db05 (patch)
tree536e65d18508e00d72cf75da152e0489e432fc6b
parentbab5a18232a163b0c3c6a245f7f95d50d7022b36 (diff)
Fix streaming server. Redis connection subscribe for each channel. (#3828)
-rw-r--r--streaming/index.js12
1 files changed, 9 insertions, 3 deletions
diff --git a/streaming/index.js b/streaming/index.js
index 270ed6f7034..5afdd59619c 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -115,7 +115,7 @@ const startWorker = (workerId) => {
const subs = {};
- redisSubscribeClient.on('pmessage', (_, channel, message) => {
+ redisSubscribeClient.on('message', (channel, message) => {
const callbacks = subs[channel];
log.silly(`New message on channel ${channel}`);
@@ -127,8 +127,6 @@ const startWorker = (workerId) => {
callbacks.forEach(callback => callback(message));
});
- redisSubscribeClient.psubscribe(`${redisPrefix}timeline:*`);
-
const subscriptionHeartbeat = (channel) => {
const interval = 6*60;
const tellSubscribed = () => {
@@ -144,12 +142,20 @@ const startWorker = (workerId) => {
const subscribe = (channel, callback) => {
log.silly(`Adding listener for ${channel}`);
subs[channel] = subs[channel] || [];
+ if (subs[channel].length === 0) {
+ log.verbose(`Subscribe ${channel}`);
+ redisSubscribeClient.subscribe(channel);
+ }
subs[channel].push(callback);
};
const unsubscribe = (channel, callback) => {
log.silly(`Removing listener for ${channel}`);
subs[channel] = subs[channel].filter(item => item !== callback);
+ if (subs[channel].length === 0) {
+ log.verbose(`Unsubscribe ${channel}`);
+ redisSubscribeClient.unsubscribe(channel);
+ }
};
const allowCrossDomain = (req, res, next) => {