From: Thibaut Girka Date: Tue, 11 Aug 2020 17:19:27 +0000 (+0200) Subject: Merge branch 'master' into glitch-soc/master X-Git-Url: https://git.xn--scling-oua.cat.family/?a=commitdiff_plain;h=78fa15d08ff0b840b8b95f87e99a80a109110530;p=mastodon.git Merge branch 'master' into glitch-soc/master Conflicts: - `streaming/index.js`: Upstream entirely refactored it. Ported our changes to upstream's refactor. Hopefuly. --- 78fa15d08ff0b840b8b95f87e99a80a109110530 diff --cc streaming/index.js index f69064e33,7c0c6a465..f5c9b4224 --- a/streaming/index.js +++ b/streaming/index.js @@@ -267,10 -332,40 +332,41 @@@ const startWorker = (workerId) => const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken; - accountFromToken(token, allowedScopes, req, next); + resolve(accountFromToken(token, req)); + }); + + /** + * @param {any} req + * @return {string} + */ + const channelNameFromPath = req => { + const { path, query } = req; + const onlyMedia = isTruthy(query.only_media); ++ const allowLocalOnly = isTruthy(query.allow_local_only); + + switch(path) { + case '/api/v1/streaming/user': + return 'user'; + case '/api/v1/streaming/user/notification': + return 'user:notification'; + case '/api/v1/streaming/public': + return onlyMedia ? 'public:media' : 'public'; + case '/api/v1/streaming/public/local': + return onlyMedia ? 'public:local:media' : 'public:local'; + case '/api/v1/streaming/public/remote': + return onlyMedia ? 'public:remote:media' : 'public:remote'; + case '/api/v1/streaming/hashtag': + return 'hashtag'; + case '/api/v1/streaming/hashtag/local': + return 'hashtag:local'; + case '/api/v1/streaming/direct': + return 'direct'; + case '/api/v1/streaming/list': + return 'list'; + } }; - const PUBLIC_STREAMS = [ + const PUBLIC_CHANNELS = [ 'public', 'public:media', 'public:local', @@@ -358,12 -500,21 +501,22 @@@ return; } - next(true); + resolve(); }); }); - }; + }); + /** + * @param {string[]} ids + * @param {any} req + * @param {function(string, string): void} output + * @param {function(string[], function(string): void): void} attachCloseHandler + * @param {boolean=} needsFiltering + * @param {boolean=} notificationOnly ++ * @param {boolean=} allowLocalOnly + * @return {function(string): void} + */ - const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => { + const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false, allowLocalOnly = false) => { const accountId = req.accountId || req.remoteAddress; const streamType = notificationOnly ? ' (notification)' : ''; @@@ -545,165 -681,267 +689,281 @@@ app.use(authenticationMiddleware); app.use(errorMiddleware); - app.get('/api/v1/streaming/user', (req, res) => { - const channels = [`timeline:${req.accountId}`]; - - if (req.deviceId) { - channels.push(`timeline:${req.accountId}:${req.deviceId}`); - } - - streamFrom(channels, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channels))); - }); - - app.get('/api/v1/streaming/user/notification', (req, res) => { - streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req), false, true); - }); - - app.get('/api/v1/streaming/public', (req, res) => { - const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true'; - const channel = onlyMedia ? 'timeline:public:media' : 'timeline:public'; - - const allowLocalOnly = req.query.allow_local_only === '1' || req.query.allow_local_only === 'true'; - - streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true, false, allowLocalOnly); - }); - - app.get('/api/v1/streaming/public/local', (req, res) => { - const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true'; - const channel = onlyMedia ? 'timeline:public:local:media' : 'timeline:public:local'; - - streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true, false, true); - }); - - app.get('/api/v1/streaming/public/remote', (req, res) => { - const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true'; - const channel = onlyMedia ? 'timeline:public:remote:media' : 'timeline:public:remote'; - - streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true); - }); - - app.get('/api/v1/streaming/direct', (req, res) => { - const channel = `timeline:direct:${req.accountId}`; - streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)), true); - }); - - app.get('/api/v1/streaming/hashtag', (req, res) => { - const { tag } = req.query; - - if (!tag || tag.length === 0) { - httpNotFound(res); - return; - } - - streamFrom(`timeline:hashtag:${tag.toLowerCase()}`, req, streamToHttp(req, res), streamHttpEnd(req), true); - }); - - app.get('/api/v1/streaming/hashtag/local', (req, res) => { - const { tag } = req.query; + app.get('/api/v1/streaming/*', (req, res) => { + channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => { + const onSend = streamToHttp(req, res); + const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); - if (!tag || tag.length === 0) { - streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.notificationOnly); ++ streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.notificationOnly, options.allowLocalOnly); + }).catch(err => { + log.verbose(req.requestId, 'Subscription error:', err.toString()); httpNotFound(res); - return; - } - - streamFrom(`timeline:hashtag:${tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true); - }); - - app.get('/api/v1/streaming/list', (req, res) => { - const listId = req.query.list; - - authorizeListAccess(listId, req, authorized => { - if (!authorized) { - httpNotFound(res); - return; - } - - const channel = `timeline:list:${listId}`; - streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel))); }); }); const wss = new WebSocketServer({ server, verifyClient: wsVerifyClient }); - wss.on('connection', (ws, req) => { - const location = url.parse(req.url, true); - req.requestId = uuid.v4(); - req.remoteAddress = ws._socket.remoteAddress; - - let channel; - - switch(location.query.stream) { + /** + * @typedef StreamParams + * @property {string} [tag] + * @property {string} [list] + * @property {string} [only_media] + */ + + /** + * @param {any} req + * @param {string} name + * @param {StreamParams} params + * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean, notificationOnly: boolean } }>} + */ + const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => { + switch(name) { case 'user': - channel = [`timeline:${req.accountId}`]; - - if (req.deviceId) { - channel.push(`timeline:${req.accountId}:${req.deviceId}`); - } + resolve({ + channelIds: req.deviceId ? [`timeline:${req.accountId}`, `timeline:${req.accountId}:${req.deviceId}`] : [`timeline:${req.accountId}`], - options: { needsFiltering: false, notificationOnly: false }, ++ options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true }, + }); - streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel))); break; case 'user:notification': - streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), false, true); + resolve({ + channelIds: [`timeline:${req.accountId}`], - options: { needsFiltering: false, notificationOnly: true }, ++ options: { needsFiltering: false, notificationOnly: true, allowLocalOnly: true }, + }); + break; case 'public': - streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(req, ws), true); + resolve({ + channelIds: ['timeline:public'], - options: { needsFiltering: true, notificationOnly: false }, ++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: isTruthy(params.allow_local_only) }, ++ }); ++ + break; + case 'public:allow_local_only': - streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(req, ws), true, false, true); ++ resolve({ ++ channelIds: ['timeline:public'], ++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true }, + }); + break; case 'public:local': - streamFrom('timeline:public:local', req, streamToWs(req, ws), streamWsEnd(req, ws), true, false, true); + resolve({ + channelIds: ['timeline:public:local'], - options: { needsFiltering: true, notificationOnly: false }, ++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true }, + }); + break; case 'public:remote': - streamFrom('timeline:public:remote', req, streamToWs(req, ws), streamWsEnd(req, ws), true); + resolve({ + channelIds: ['timeline:public:remote'], - options: { needsFiltering: true, notificationOnly: false }, ++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: false }, + }); + break; case 'public:media': - streamFrom('timeline:public:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true); + resolve({ + channelIds: ['timeline:public:media'], - options: { needsFiltering: true, notificationOnly: false }, ++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: isTruthy(query.allow_local_only) }, ++ }); ++ + break; + case 'public:allow_local_only:media': - streamFrom('timeline:public:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true, false, true); ++ resolve({ ++ channelIds: ['timeline:public:media'], ++ options: { needsFiltering: true, notificationsOnly: false, allowLocalOnly: true }, + }); + break; case 'public:local:media': - streamFrom('timeline:public:local:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true, false, true); + resolve({ + channelIds: ['timeline:public:local:media'], - options: { needsFiltering: true, notificationOnly: false }, ++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true }, + }); + break; case 'public:remote:media': - streamFrom('timeline:public:remote:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true); + resolve({ + channelIds: ['timeline:public:remote:media'], - options: { needsFiltering: true, notificationOnly: false }, ++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: false }, + }); + break; case 'direct': - channel = `timeline:direct:${req.accountId}`; - streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)), true); + resolve({ + channelIds: [`timeline:direct:${req.accountId}`], - options: { needsFiltering: false, notificationOnly: false }, ++ options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true }, + }); + break; case 'hashtag': - if (!location.query.tag || location.query.tag.length === 0) { - ws.close(); - return; + if (!params.tag || params.tag.length === 0) { + reject('No tag for stream provided'); + } else { + resolve({ + channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}`], - options: { needsFiltering: true, notificationOnly: false }, ++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true }, + }); } - streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true); break; case 'hashtag:local': - if (!location.query.tag || location.query.tag.length === 0) { - ws.close(); - return; + if (!params.tag || params.tag.length === 0) { + reject('No tag for stream provided'); + } else { + resolve({ + channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}:local`], - options: { needsFiltering: true, notificationOnly: false }, ++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true }, + }); } - streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}:local`, req, streamToWs(req, ws), streamWsEnd(req, ws), true); break; case 'list': - const listId = location.query.list; - - authorizeListAccess(listId, req, authorized => { - if (!authorized) { - ws.close(); - return; - } - - channel = `timeline:list:${listId}`; - streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel))); + authorizeListAccess(params.list, req).then(() => { + resolve({ + channelIds: [`timeline:list:${params.list}`], - options: { needsFiltering: false, notificationOnly: false }, ++ options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true }, + }); + }).catch(() => { + reject('Not authorized to stream this list'); }); + break; default: - ws.close(); + reject('Unknown stream type'); + } + }); + + /** + * @param {string} channelName + * @param {StreamParams} params + * @return {string[]} + */ + const streamNameFromChannelName = (channelName, params) => { + if (channelName === 'list') { + return [channelName, params.list]; + } else if (['hashtag', 'hashtag:local'].includes(channelName)) { + return [channelName, params.tag]; + } else { + return [channelName]; + } + }; + + /** + * @typedef WebSocketSession + * @property {any} socket + * @property {any} request + * @property {Object.} subscriptions + */ + + /** + * @param {WebSocketSession} session + * @param {string} channelName + * @param {StreamParams} params + */ + const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) => + checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({ channelIds, options }) => { + if (subscriptions[channelIds.join(';')]) { + return; + } + + const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params)); + const stopHeartbeat = subscriptionHeartbeat(channelIds); - const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.notificationOnly); ++ const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.notificationOnly, options.allowLocalOnly); + + subscriptions[channelIds.join(';')] = { + listener, + stopHeartbeat, + }; + }).catch(err => { + log.verbose(request.requestId, 'Subscription error:', err.toString()); + socket.send(JSON.stringify({ error: err.toString() })); + }); + + /** + * @param {WebSocketSession} session + * @param {string} channelName + * @param {StreamParams} params + */ + const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) => + channelNameToIds(request, channelName, params).then(({ channelIds }) => { + log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`); + + const { listener, stopHeartbeat } = subscriptions[channelIds.join(';')]; + + if (!listener) { + return; + } + + channelIds.forEach(channelId => { + unsubscribe(`${redisPrefix}${channelId}`, listener); + }); + + stopHeartbeat(); + + subscriptions[channelIds.join(';')] = undefined; + }).catch(err => { + log.verbose(request.requestId, 'Unsubscription error:', err); + socket.send(JSON.stringify({ error: err.toString() })); + }); + + /** + * @param {string|string[]} arrayOrString + * @return {string} + */ + const firstParam = arrayOrString => { + if (Array.isArray(arrayOrString)) { + return arrayOrString[0]; + } else { + return arrayOrString; + } + }; + + wss.on('connection', (ws, req) => { + const location = url.parse(req.url, true); + + req.requestId = uuid.v4(); + req.remoteAddress = ws._socket.remoteAddress; + + /** + * @type {WebSocketSession} + */ + const session = { + socket: ws, + request: req, + subscriptions: {}, + }; + + const onEnd = () => { + const keys = Object.keys(session.subscriptions); + + keys.forEach(channelIds => { + const { listener, stopHeartbeat } = session.subscriptions[channelIds]; + + channelIds.split(';').forEach(channelId => { + unsubscribe(`${redisPrefix}${channelId}`, listener); + }); + + stopHeartbeat(); + }); + }; + + ws.on('close', onEnd); + ws.on('error', onEnd); + + ws.on('message', data => { + const { type, stream, ...params } = JSON.parse(data); + + if (type === 'subscribe') { + subscribeWebsocketToChannel(session, firstParam(stream), params); + } else if (type === 'unsubscribe') { + unsubscribeWebsocketFromChannel(session, firstParam(stream), params) + } else { + // Unknown action type + } + }); + + if (location.query.stream) { + subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query); } });