diff options
author | Emelia Smith <ThisIsMissEm@users.noreply.github.com> | 2023-07-28 12:06:29 +0200 |
---|---|---|
committer | Claire <claire.github-309c@sitedethib.com> | 2023-07-31 14:33:14 +0200 |
commit | 8018d478abccae540e4fa172d6e2e30e88fbd202 (patch) | |
tree | fa52832665c423ddcba13c65d6347f73af60a25e | |
parent | fea56403745cf58c35d5e5193c61fd6e96ad1d6a (diff) |
Fix: Streaming server memory leak in HTTP EventSource cleanup (#26228)
-rw-r--r-- | streaming/index.js | 26 |
1 files changed, 17 insertions, 9 deletions
diff --git a/streaming/index.js b/streaming/index.js index 4d0937bf8a3..816a1e379dc 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -229,8 +229,14 @@ const startWorker = async (workerId) => { }; /** + * @callback SubscriptionListener + * @param {ReturnType<parseJSON>} json of the message + * @returns void + */ + + /** * @param {string} channel - * @param {function(string): void} callback + * @param {SubscriptionListener} callback */ const subscribe = (channel, callback) => { log.silly(`Adding listener for ${channel}`); @@ -247,7 +253,7 @@ const startWorker = async (workerId) => { /** * @param {string} channel - * @param {function(Object<string, any>): void} callback + * @param {SubscriptionListener} callback */ const unsubscribe = (channel, callback) => { log.silly(`Removing listener for ${channel}`); @@ -625,9 +631,9 @@ const startWorker = async (workerId) => { * @param {string[]} ids * @param {any} req * @param {function(string, string): void} output - * @param {function(string[], function(string): void): void} attachCloseHandler + * @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler * @param {boolean=} needsFiltering - * @returns {function(object): void} + * @returns {SubscriptionListener} */ const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => { const accountId = req.accountId || req.remoteAddress; @@ -645,6 +651,7 @@ const startWorker = async (workerId) => { // The listener used to process each message off the redis subscription, // message here is an object with an `event` and `payload` property. Some // events also include a queued_at value, but this is being removed shortly. + /** @type {SubscriptionListener} */ const listener = message => { const { event, payload } = message; @@ -837,7 +844,7 @@ const startWorker = async (workerId) => { subscribe(`${redisPrefix}${id}`, listener); }); - if (attachCloseHandler) { + if (typeof attachCloseHandler === 'function') { attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener); } @@ -874,12 +881,13 @@ const startWorker = async (workerId) => { /** * @param {any} req * @param {function(): void} [closeHandler] - * @return {function(string[]): void} + * @returns {function(string[], SubscriptionListener): void} */ - const streamHttpEnd = (req, closeHandler = undefined) => (ids) => { + + const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => { req.on('close', () => { ids.forEach(id => { - unsubscribe(id); + unsubscribe(id, listener); }); if (closeHandler) { @@ -1118,7 +1126,7 @@ const startWorker = async (workerId) => { * @typedef WebSocketSession * @property {any} socket * @property {any} request - * @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions + * @property {Object.<string, { listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions */ /** |