diff options
Diffstat (limited to 'streaming/index.js')
-rw-r--r-- | streaming/index.js | 124 |
1 files changed, 112 insertions, 12 deletions
diff --git a/streaming/index.js b/streaming/index.js index 7072d0bd7..3279bd94e 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -81,6 +81,19 @@ const redisUrlToClient = (defaultConfig, redisUrl) => { const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1)); +/** + * @param {string} json + * @return {Object.<string, any>|null} + */ +const parseJSON = (json) => { + try { + return JSON.parse(json); + } catch (err) { + log.error(err); + return null; + } +}; + const startMaster = () => { if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) { log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.'); @@ -217,13 +230,13 @@ const startWorker = (workerId) => { const FALSE_VALUES = [ false, 0, - "0", - "f", - "F", - "false", - "FALSE", - "off", - "OFF" + '0', + 'f', + 'F', + 'false', + 'FALSE', + 'off', + 'OFF', ]; /** @@ -281,7 +294,7 @@ const startWorker = (workerId) => { return; } - client.query('SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => { + client.query('SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => { done(); if (err) { @@ -297,6 +310,7 @@ const startWorker = (workerId) => { return; } + req.accessTokenId = result.rows[0].id; req.scopes = result.rows[0].scopes.split(' '); req.accountId = result.rows[0].account_id; req.chosenLanguages = result.rows[0].chosen_languages; @@ -363,6 +377,8 @@ const startWorker = (workerId) => { return 'direct'; case '/api/v1/streaming/list': return 'list'; + default: + return undefined; } }; @@ -438,6 +454,55 @@ const startWorker = (workerId) => { }; /** + * @typedef SystemMessageHandlers + * @property {function(): void} onKill + */ + + /** + * @param {any} req + * @param {SystemMessageHandlers} eventHandlers + * @return {function(string): void} + */ + const createSystemMessageListener = (req, eventHandlers) => { + return message => { + const json = parseJSON(message); + + if (!json) return; + + const { event } = json; + + log.silly(req.requestId, `System message for ${req.accountId}: ${event}`); + + if (event === 'kill') { + log.verbose(req.requestId, `Closing connection for ${req.accountId} due to expired access token`); + eventHandlers.onKill(); + } + }; + }; + + /** + * @param {any} req + * @param {any} res + */ + const subscribeHttpToSystemChannel = (req, res) => { + const systemChannelId = `timeline:access_token:${req.accessTokenId}`; + + const listener = createSystemMessageListener(req, { + + onKill () { + res.end(); + }, + + }); + + res.on('close', () => { + unsubscribe(`${redisPrefix}${systemChannelId}`, listener); + }); + + subscribe(`${redisPrefix}${systemChannelId}`, listener); + }; + + /** * @param {any} req * @param {any} res * @param {function(Error=): void} next @@ -449,6 +514,8 @@ const startWorker = (workerId) => { } accountFromRequest(req, alwaysRequireAuth).then(() => checkScopes(req, channelNameFromPath(req))).then(() => { + subscribeHttpToSystemChannel(req, res); + }).then(() => { next(); }).catch(err => { next(err); @@ -465,7 +532,8 @@ const startWorker = (workerId) => { log.error(req.requestId, err.toString()); if (res.headersSent) { - return next(err); + next(err); + return; } res.writeHead(err.status || 500, { 'Content-Type': 'application/json' }); @@ -522,7 +590,11 @@ const startWorker = (workerId) => { log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}${streamType}`); const listener = message => { - const { event, payload, queued_at } = JSON.parse(message); + const json = parseJSON(message); + + if (!json) return; + + const { event, payload, queued_at } = json; const transmit = () => { const now = new Date().getTime(); @@ -888,6 +960,28 @@ const startWorker = (workerId) => { }); /** + * @param {WebSocketSession} session + */ + const subscribeWebsocketToSystemChannel = ({ socket, request, subscriptions }) => { + const systemChannelId = `timeline:access_token:${request.accessTokenId}`; + + const listener = createSystemMessageListener(request, { + + onKill () { + socket.close(); + }, + + }); + + subscribe(`${redisPrefix}${systemChannelId}`, listener); + + subscriptions[systemChannelId] = { + listener, + stopHeartbeat: () => {}, + }; + }; + + /** * @param {string|string[]} arrayOrString * @return {string} */ @@ -932,17 +1026,23 @@ const startWorker = (workerId) => { ws.on('error', onEnd); ws.on('message', data => { - const { type, stream, ...params } = JSON.parse(data); + const json = parseJSON(data); + + if (!json) return; + + const { type, stream, ...params } = json; if (type === 'subscribe') { subscribeWebsocketToChannel(session, firstParam(stream), params); } else if (type === 'unsubscribe') { - unsubscribeWebsocketFromChannel(session, firstParam(stream), params) + unsubscribeWebsocketFromChannel(session, firstParam(stream), params); } else { // Unknown action type } }); + subscribeWebsocketToSystemChannel(session); + if (location.query.stream) { subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query); } |