diff options
author | Emelia Smith <ThisIsMissEm@users.noreply.github.com> | 2023-07-28 12:06:29 +0200 |
---|---|---|
committer | Claire <claire.github-309c@sitedethib.com> | 2023-07-31 14:33:27 +0200 |
commit | 50f4af28b0f1fde03e1a57583bccc1387d6f08bf (patch) | |
tree | 025e14de7e605dc87dd7fb63fc5a614fb34fb9f5 | |
parent | e655b35d7e4ebf9c5a2ab6cb4bf4e950785cfee0 (diff) |
Fix: Streaming server memory leak in HTTP EventSource cleanup (#26228)
-rw-r--r-- | streaming/index.js | 25 |
1 files changed, 16 insertions, 9 deletions
diff --git a/streaming/index.js b/streaming/index.js index 46e49524e40..ab3a16030f0 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -228,8 +228,14 @@ const startWorker = async (workerId) => { }; /** + * @callback SubscriptionListener + * @param {ReturnType<parseJSON>} json of the message + * @returns void + */ + + /** * @param {string} channel - * @param {function(string): void} callback + * @param {SubscriptionListener} callback */ const subscribe = (channel, callback) => { log.silly(`Adding listener for ${channel}`); @@ -246,7 +252,7 @@ const startWorker = async (workerId) => { /** * @param {string} channel - * @param {function(Object<string, any>): void} callback + * @param {SubscriptionListener} callback */ const unsubscribe = (channel, callback) => { log.silly(`Removing listener for ${channel}`); @@ -618,9 +624,9 @@ const startWorker = async (workerId) => { * @param {string[]} ids * @param {any} req * @param {function(string, string): void} output - * @param {function(string[], function(string): void): void} attachCloseHandler + * @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler * @param {boolean=} needsFiltering - * @returns {function(object): void} + * @returns {SubscriptionListener} */ const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => { const accountId = req.accountId || req.remoteAddress; @@ -703,7 +709,7 @@ const startWorker = async (workerId) => { subscribe(`${redisPrefix}${id}`, listener); }); - if (attachCloseHandler) { + if (typeof attachCloseHandler === 'function') { attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener); } @@ -740,12 +746,13 @@ const startWorker = async (workerId) => { /** * @param {any} req * @param {function(): void} [closeHandler] - * @return {function(string[]): void} + * @returns {function(string[], SubscriptionListener): void} */ - const streamHttpEnd = (req, closeHandler = undefined) => (ids) => { + + const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => { req.on('close', () => { ids.forEach(id => { - unsubscribe(id); + unsubscribe(id, listener); }); if (closeHandler) { @@ -956,7 +963,7 @@ const startWorker = async (workerId) => { * @typedef WebSocketSession * @property {any} socket * @property {any} request - * @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions + * @property {Object.<string, { listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions */ /** |