summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorLerk <lukas@k40s.net>2021-12-25 21:55:06 +0000
committerGitHub <noreply@github.com>2021-12-25 22:55:06 +0100
commit4d1eaf3e6e45a2141fd0e7d8deb2e4485485a177 (patch)
tree3ed5040880e24097885a7b1a4cec3949c9ef4789 /streaming
parentfad37dd1bc8504fe6f2939f5d935cc1e5d264458 (diff)
Finish update of node-redis (#17183)
* fix streaming redis client * use console.error instead of console.log * follow node-redis migration guide https://github.com/redis/node-redis/blob/master/docs/v3-to-v4.md * fix config options for node-redis * keep indentation * Update streaming/index.js Co-authored-by: Yamagishi Kazutoshi <ykzts@desire.sh> Co-authored-by: Yamagishi Kazutoshi <ykzts@desire.sh>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/index.js134
1 files changed, 61 insertions, 73 deletions
diff --git a/streaming/index.js b/streaming/index.js
index 8b7477a44c8..74cbf4c2c45 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -63,20 +63,29 @@ const dbUrlToConfig = (dbUrl) => {
* @param {Object.<string, any>} defaultConfig
* @param {string} redisUrl
*/
-const redisUrlToClient = (defaultConfig, redisUrl) => {
+const redisUrlToClient = async (defaultConfig, redisUrl) => {
const config = defaultConfig;
+ let client;
+
if (!redisUrl) {
- return redis.createClient(config);
+ client = redis.createClient(config);
+ } else if (redisUrl.startsWith('unix://')) {
+ client = redis.createClient(Object.assign(config, {
+ socket: {
+ path: redisUrl.slice(7),
+ },
+ }));
+ } else {
+ client = redis.createClient(Object.assign(config, {
+ url: redisUrl,
+ }));
}
- if (redisUrl.startsWith('unix://')) {
- return redis.createClient(redisUrl.slice(7), config);
- }
+ client.on('error', (err) => log.error('Redis Client Error!', err));
+ await client.connect();
- return redis.createClient(Object.assign(config, {
- url: redisUrl,
- }));
+ return client;
};
const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
@@ -102,7 +111,7 @@ const startMaster = () => {
log.warn(`Starting streaming API server master with ${numWorkers} workers`);
};
-const startWorker = (workerId) => {
+const startWorker = async (workerId) => {
log.warn(`Starting worker ${workerId}`);
const pgConfigs = {
@@ -127,7 +136,7 @@ const startWorker = (workerId) => {
if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') {
pgConfigs.development.ssl = true;
- pgConfigs.production.ssl = true;
+ pgConfigs.production.ssl = true;
}
const app = express();
@@ -139,9 +148,11 @@ const startWorker = (workerId) => {
const redisNamespace = process.env.REDIS_NAMESPACE || null;
const redisParams = {
- host: process.env.REDIS_HOST || '127.0.0.1',
- port: process.env.REDIS_PORT || 6379,
- db: process.env.REDIS_DB || 0,
+ socket: {
+ host: process.env.REDIS_HOST || '127.0.0.1',
+ port: process.env.REDIS_PORT || 6379,
+ },
+ database: process.env.REDIS_DB || 0,
password: process.env.REDIS_PASSWORD || undefined,
};
@@ -151,25 +162,8 @@ const startWorker = (workerId) => {
const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
- const redisSubscribeClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
- const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
-
- /**
- * @type {Object.<string, Array.<function(string): void>>}
- */
- const subs = {};
-
- redisSubscribeClient.on('message', (channel, message) => {
- const callbacks = subs[channel];
-
- log.silly(`New message on channel ${channel}`);
-
- if (!callbacks) {
- return;
- }
-
- callbacks.forEach(callback => callback(message));
- });
+ const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
+ const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
/**
* @param {string[]} channels
@@ -197,34 +191,16 @@ const startWorker = (workerId) => {
*/
const subscribe = (channel, callback) => {
log.silly(`Adding listener for ${channel}`);
- subs[channel] = subs[channel] || [];
-
- if (subs[channel].length === 0) {
- log.verbose(`Subscribe ${channel}`);
- redisSubscribeClient.subscribe(channel);
- }
- subs[channel].push(callback);
+ redisSubscribeClient.subscribe(channel, callback);
};
/**
* @param {string} channel
- * @param {function(string): void} callback
*/
- const unsubscribe = (channel, callback) => {
- log.silly(`Removing listener for ${channel}`);
+ const unsubscribe = (channel) => {
- if (!subs[channel]) {
- return;
- }
-
- subs[channel] = subs[channel].filter(item => item !== callback);
-
- if (subs[channel].length === 0) {
- log.verbose(`Unsubscribe ${channel}`);
- redisSubscribeClient.unsubscribe(channel);
- delete subs[channel];
- }
+ redisSubscribeClient.unsubscribe(channel);
};
const FALSE_VALUES = [
@@ -365,7 +341,7 @@ const startWorker = (workerId) => {
const { path, query } = req;
const onlyMedia = isTruthy(query.only_media);
- switch(path) {
+ switch (path) {
case '/api/v1/streaming/user':
return 'user';
case '/api/v1/streaming/user/notification':
@@ -496,7 +472,7 @@ const startWorker = (workerId) => {
const listener = createSystemMessageListener(req, {
- onKill () {
+ onKill() {
res.end();
},
@@ -548,7 +524,7 @@ const startWorker = (workerId) => {
};
/**
- * @param {array}
+ * @param {array} arr
* @param {number=} shift
* @return {string}
*/
@@ -590,7 +566,7 @@ const startWorker = (workerId) => {
* @return {function(string): void}
*/
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
- const accountId = req.accountId || req.remoteAddress;
+ const accountId = req.accountId || req.remoteAddress;
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
@@ -602,8 +578,8 @@ const startWorker = (workerId) => {
const { event, payload, queued_at } = json;
const transmit = () => {
- const now = new Date().getTime();
- const delta = now - queued_at;
+ const now = new Date().getTime();
+ const delta = now - queued_at;
const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
@@ -617,9 +593,9 @@ const startWorker = (workerId) => {
return;
}
- const unpackedPayload = payload;
+ const unpackedPayload = payload;
const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
- const accountDomain = unpackedPayload.account.acct.split('@')[1];
+ const accountDomain = unpackedPayload.account.acct.split('@')[1];
if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
@@ -639,7 +615,15 @@ const startWorker = (workerId) => {
}
const queries = [
- client.query(`SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) OR (account_id = $2 AND target_account_id = $1) UNION SELECT 1 FROM mutes WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
+ client.query(`SELECT 1
+ FROM blocks
+ WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)}))
+ OR (account_id = $2 AND target_account_id = $1)
+ UNION
+ SELECT 1
+ FROM mutes
+ WHERE account_id = $1
+ AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
];
if (accountDomain) {
@@ -702,12 +686,12 @@ const startWorker = (workerId) => {
/**
* @param {any} req
* @param {function(): void} [closeHandler]
- * @return {function(string[], function(string): void)}
+ * @return {function(string[]): void}
*/
- const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
+ const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
req.on('close', () => {
ids.forEach(id => {
- unsubscribe(id, listener);
+ unsubscribe(id);
});
if (closeHandler) {
@@ -754,7 +738,7 @@ const startWorker = (workerId) => {
app.get('/api/v1/streaming/*', (req, res) => {
channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => {
const onSend = streamToHttp(req, res);
- const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
+ const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering);
}).catch(err => {
@@ -797,7 +781,7 @@ const startWorker = (workerId) => {
* @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
*/
const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
- switch(name) {
+ switch (name) {
case 'user':
resolve({
channelIds: channelsForUserStream(req),
@@ -927,14 +911,17 @@ const startWorker = (workerId) => {
* @param {StreamParams} params
*/
const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) =>
- checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({ channelIds, options }) => {
+ checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({
+ channelIds,
+ options,
+ }) => {
if (subscriptions[channelIds.join(';')]) {
return;
}
- const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
+ const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
const stopHeartbeat = subscriptionHeartbeat(channelIds);
- const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering);
+ const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering);
subscriptions[channelIds.join(';')] = {
listener,
@@ -982,7 +969,7 @@ const startWorker = (workerId) => {
const listener = createSystemMessageListener(request, {
- onKill () {
+ onKill() {
socket.close();
},
@@ -992,7 +979,8 @@ const startWorker = (workerId) => {
subscriptions[systemChannelId] = {
listener,
- stopHeartbeat: () => {},
+ stopHeartbeat: () => {
+ },
};
};
@@ -1011,7 +999,7 @@ const startWorker = (workerId) => {
wss.on('connection', (ws, req) => {
const location = url.parse(req.url, true);
- req.requestId = uuid.v4();
+ req.requestId = uuid.v4();
req.remoteAddress = ws._socket.remoteAddress;
ws.isAlive = true;