summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEmelia Smith <ThisIsMissEm@users.noreply.github.com>2023-07-28 12:06:29 +0200
committerClaire <claire.github-309c@sitedethib.com>2023-07-31 14:33:27 +0200
commit50f4af28b0f1fde03e1a57583bccc1387d6f08bf (patch)
tree025e14de7e605dc87dd7fb63fc5a614fb34fb9f5
parente655b35d7e4ebf9c5a2ab6cb4bf4e950785cfee0 (diff)
Fix: Streaming server memory leak in HTTP EventSource cleanup (#26228)
-rw-r--r--streaming/index.js25
1 files changed, 16 insertions, 9 deletions
diff --git a/streaming/index.js b/streaming/index.js
index 46e49524e40..ab3a16030f0 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -228,8 +228,14 @@ const startWorker = async (workerId) => {
};
/**
+ * @callback SubscriptionListener
+ * @param {ReturnType<parseJSON>} json of the message
+ * @returns void
+ */
+
+ /**
* @param {string} channel
- * @param {function(string): void} callback
+ * @param {SubscriptionListener} callback
*/
const subscribe = (channel, callback) => {
log.silly(`Adding listener for ${channel}`);
@@ -246,7 +252,7 @@ const startWorker = async (workerId) => {
/**
* @param {string} channel
- * @param {function(Object<string, any>): void} callback
+ * @param {SubscriptionListener} callback
*/
const unsubscribe = (channel, callback) => {
log.silly(`Removing listener for ${channel}`);
@@ -618,9 +624,9 @@ const startWorker = async (workerId) => {
* @param {string[]} ids
* @param {any} req
* @param {function(string, string): void} output
- * @param {function(string[], function(string): void): void} attachCloseHandler
+ * @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
* @param {boolean=} needsFiltering
- * @returns {function(object): void}
+ * @returns {SubscriptionListener}
*/
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
const accountId = req.accountId || req.remoteAddress;
@@ -703,7 +709,7 @@ const startWorker = async (workerId) => {
subscribe(`${redisPrefix}${id}`, listener);
});
- if (attachCloseHandler) {
+ if (typeof attachCloseHandler === 'function') {
attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
}
@@ -740,12 +746,13 @@ const startWorker = async (workerId) => {
/**
* @param {any} req
* @param {function(): void} [closeHandler]
- * @return {function(string[]): void}
+ * @returns {function(string[], SubscriptionListener): void}
*/
- const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
+
+ const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
req.on('close', () => {
ids.forEach(id => {
- unsubscribe(id);
+ unsubscribe(id, listener);
});
if (closeHandler) {
@@ -956,7 +963,7 @@ const startWorker = async (workerId) => {
* @typedef WebSocketSession
* @property {any} socket
* @property {any} request
- * @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions
+ * @property {Object.<string, { listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
*/
/**