const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken;
- accountFromToken(token, allowedScopes, req, next);
+ resolve(accountFromToken(token, req));
+ });
+
+ /**
+ * @param {any} req
+ * @return {string}
+ */
+ const channelNameFromPath = req => {
+ const { path, query } = req;
+ const onlyMedia = isTruthy(query.only_media);
++ const allowLocalOnly = isTruthy(query.allow_local_only);
+
+ switch(path) {
+ case '/api/v1/streaming/user':
+ return 'user';
+ case '/api/v1/streaming/user/notification':
+ return 'user:notification';
+ case '/api/v1/streaming/public':
+ return onlyMedia ? 'public:media' : 'public';
+ case '/api/v1/streaming/public/local':
+ return onlyMedia ? 'public:local:media' : 'public:local';
+ case '/api/v1/streaming/public/remote':
+ return onlyMedia ? 'public:remote:media' : 'public:remote';
+ case '/api/v1/streaming/hashtag':
+ return 'hashtag';
+ case '/api/v1/streaming/hashtag/local':
+ return 'hashtag:local';
+ case '/api/v1/streaming/direct':
+ return 'direct';
+ case '/api/v1/streaming/list':
+ return 'list';
+ }
};
- const PUBLIC_STREAMS = [
+ const PUBLIC_CHANNELS = [
'public',
'public:media',
'public:local',
return;
}
- next(true);
+ resolve();
});
});
- };
+ });
- const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
+ /**
+ * @param {string[]} ids
+ * @param {any} req
+ * @param {function(string, string): void} output
+ * @param {function(string[], function(string): void): void} attachCloseHandler
+ * @param {boolean=} needsFiltering
+ * @param {boolean=} notificationOnly
++ * @param {boolean=} allowLocalOnly
+ * @return {function(string): void}
+ */
+ const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false, allowLocalOnly = false) => {
const accountId = req.accountId || req.remoteAddress;
const streamType = notificationOnly ? ' (notification)' : '';
app.use(authenticationMiddleware);
app.use(errorMiddleware);
- app.get('/api/v1/streaming/user', (req, res) => {
- const channels = [`timeline:${req.accountId}`];
-
- if (req.deviceId) {
- channels.push(`timeline:${req.accountId}:${req.deviceId}`);
- }
-
- streamFrom(channels, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channels)));
- });
-
- app.get('/api/v1/streaming/user/notification', (req, res) => {
- streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req), false, true);
- });
-
- app.get('/api/v1/streaming/public', (req, res) => {
- const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true';
- const channel = onlyMedia ? 'timeline:public:media' : 'timeline:public';
-
- const allowLocalOnly = req.query.allow_local_only === '1' || req.query.allow_local_only === 'true';
-
- streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true, false, allowLocalOnly);
- });
-
- app.get('/api/v1/streaming/public/local', (req, res) => {
- const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true';
- const channel = onlyMedia ? 'timeline:public:local:media' : 'timeline:public:local';
-
- streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true, false, true);
- });
-
- app.get('/api/v1/streaming/public/remote', (req, res) => {
- const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true';
- const channel = onlyMedia ? 'timeline:public:remote:media' : 'timeline:public:remote';
-
- streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true);
- });
-
- app.get('/api/v1/streaming/direct', (req, res) => {
- const channel = `timeline:direct:${req.accountId}`;
- streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)), true);
- });
-
- app.get('/api/v1/streaming/hashtag', (req, res) => {
- const { tag } = req.query;
-
- if (!tag || tag.length === 0) {
- httpNotFound(res);
- return;
- }
-
- streamFrom(`timeline:hashtag:${tag.toLowerCase()}`, req, streamToHttp(req, res), streamHttpEnd(req), true);
- });
-
- app.get('/api/v1/streaming/hashtag/local', (req, res) => {
- const { tag } = req.query;
+ app.get('/api/v1/streaming/*', (req, res) => {
+ channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => {
+ const onSend = streamToHttp(req, res);
+ const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
- if (!tag || tag.length === 0) {
- streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.notificationOnly);
++ streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.notificationOnly, options.allowLocalOnly);
+ }).catch(err => {
+ log.verbose(req.requestId, 'Subscription error:', err.toString());
httpNotFound(res);
- return;
- }
-
- streamFrom(`timeline:hashtag:${tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true);
- });
-
- app.get('/api/v1/streaming/list', (req, res) => {
- const listId = req.query.list;
-
- authorizeListAccess(listId, req, authorized => {
- if (!authorized) {
- httpNotFound(res);
- return;
- }
-
- const channel = `timeline:list:${listId}`;
- streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)));
});
});
const wss = new WebSocketServer({ server, verifyClient: wsVerifyClient });
- wss.on('connection', (ws, req) => {
- const location = url.parse(req.url, true);
- req.requestId = uuid.v4();
- req.remoteAddress = ws._socket.remoteAddress;
-
- let channel;
-
- switch(location.query.stream) {
+ /**
+ * @typedef StreamParams
+ * @property {string} [tag]
+ * @property {string} [list]
+ * @property {string} [only_media]
+ */
+
+ /**
+ * @param {any} req
+ * @param {string} name
+ * @param {StreamParams} params
+ * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean, notificationOnly: boolean } }>}
+ */
+ const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
+ switch(name) {
case 'user':
- channel = [`timeline:${req.accountId}`];
-
- if (req.deviceId) {
- channel.push(`timeline:${req.accountId}:${req.deviceId}`);
- }
+ resolve({
+ channelIds: req.deviceId ? [`timeline:${req.accountId}`, `timeline:${req.accountId}:${req.deviceId}`] : [`timeline:${req.accountId}`],
- options: { needsFiltering: false, notificationOnly: false },
++ options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true },
+ });
- streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
break;
case 'user:notification':
- streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), false, true);
+ resolve({
+ channelIds: [`timeline:${req.accountId}`],
- options: { needsFiltering: false, notificationOnly: true },
++ options: { needsFiltering: false, notificationOnly: true, allowLocalOnly: true },
+ });
+
break;
case 'public':
- streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
+ resolve({
+ channelIds: ['timeline:public'],
- options: { needsFiltering: true, notificationOnly: false },
++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: isTruthy(params.allow_local_only) },
++ });
++
+ break;
+ case 'public:allow_local_only':
- streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(req, ws), true, false, true);
++ resolve({
++ channelIds: ['timeline:public'],
++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
+ });
+
break;
case 'public:local':
- streamFrom('timeline:public:local', req, streamToWs(req, ws), streamWsEnd(req, ws), true, false, true);
+ resolve({
+ channelIds: ['timeline:public:local'],
- options: { needsFiltering: true, notificationOnly: false },
++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
+ });
+
break;
case 'public:remote':
- streamFrom('timeline:public:remote', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
+ resolve({
+ channelIds: ['timeline:public:remote'],
- options: { needsFiltering: true, notificationOnly: false },
++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: false },
+ });
+
break;
case 'public:media':
- streamFrom('timeline:public:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
+ resolve({
+ channelIds: ['timeline:public:media'],
- options: { needsFiltering: true, notificationOnly: false },
++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: isTruthy(query.allow_local_only) },
++ });
++
+ break;
+ case 'public:allow_local_only:media':
- streamFrom('timeline:public:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true, false, true);
++ resolve({
++ channelIds: ['timeline:public:media'],
++ options: { needsFiltering: true, notificationsOnly: false, allowLocalOnly: true },
+ });
+
break;
case 'public:local:media':
- streamFrom('timeline:public:local:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true, false, true);
+ resolve({
+ channelIds: ['timeline:public:local:media'],
- options: { needsFiltering: true, notificationOnly: false },
++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
+ });
+
break;
case 'public:remote:media':
- streamFrom('timeline:public:remote:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
+ resolve({
+ channelIds: ['timeline:public:remote:media'],
- options: { needsFiltering: true, notificationOnly: false },
++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: false },
+ });
+
break;
case 'direct':
- channel = `timeline:direct:${req.accountId}`;
- streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)), true);
+ resolve({
+ channelIds: [`timeline:direct:${req.accountId}`],
- options: { needsFiltering: false, notificationOnly: false },
++ options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true },
+ });
+
break;
case 'hashtag':
- if (!location.query.tag || location.query.tag.length === 0) {
- ws.close();
- return;
+ if (!params.tag || params.tag.length === 0) {
+ reject('No tag for stream provided');
+ } else {
+ resolve({
+ channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}`],
- options: { needsFiltering: true, notificationOnly: false },
++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
+ });
}
- streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'hashtag:local':
- if (!location.query.tag || location.query.tag.length === 0) {
- ws.close();
- return;
+ if (!params.tag || params.tag.length === 0) {
+ reject('No tag for stream provided');
+ } else {
+ resolve({
+ channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}:local`],
- options: { needsFiltering: true, notificationOnly: false },
++ options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
+ });
}
- streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}:local`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'list':
- const listId = location.query.list;
-
- authorizeListAccess(listId, req, authorized => {
- if (!authorized) {
- ws.close();
- return;
- }
-
- channel = `timeline:list:${listId}`;
- streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
+ authorizeListAccess(params.list, req).then(() => {
+ resolve({
+ channelIds: [`timeline:list:${params.list}`],
- options: { needsFiltering: false, notificationOnly: false },
++ options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true },
+ });
+ }).catch(() => {
+ reject('Not authorized to stream this list');
});
+
break;
default:
- ws.close();
+ reject('Unknown stream type');
+ }
+ });
+
+ /**
+ * @param {string} channelName
+ * @param {StreamParams} params
+ * @return {string[]}
+ */
+ const streamNameFromChannelName = (channelName, params) => {
+ if (channelName === 'list') {
+ return [channelName, params.list];
+ } else if (['hashtag', 'hashtag:local'].includes(channelName)) {
+ return [channelName, params.tag];
+ } else {
+ return [channelName];
+ }
+ };
+
+ /**
+ * @typedef WebSocketSession
+ * @property {any} socket
+ * @property {any} request
+ * @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions
+ */
+
+ /**
+ * @param {WebSocketSession} session
+ * @param {string} channelName
+ * @param {StreamParams} params
+ */
+ const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) =>
+ checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({ channelIds, options }) => {
+ if (subscriptions[channelIds.join(';')]) {
+ return;
+ }
+
+ const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
+ const stopHeartbeat = subscriptionHeartbeat(channelIds);
- const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.notificationOnly);
++ const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.notificationOnly, options.allowLocalOnly);
+
+ subscriptions[channelIds.join(';')] = {
+ listener,
+ stopHeartbeat,
+ };
+ }).catch(err => {
+ log.verbose(request.requestId, 'Subscription error:', err.toString());
+ socket.send(JSON.stringify({ error: err.toString() }));
+ });
+
+ /**
+ * @param {WebSocketSession} session
+ * @param {string} channelName
+ * @param {StreamParams} params
+ */
+ const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) =>
+ channelNameToIds(request, channelName, params).then(({ channelIds }) => {
+ log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`);
+
+ const { listener, stopHeartbeat } = subscriptions[channelIds.join(';')];
+
+ if (!listener) {
+ return;
+ }
+
+ channelIds.forEach(channelId => {
+ unsubscribe(`${redisPrefix}${channelId}`, listener);
+ });
+
+ stopHeartbeat();
+
+ subscriptions[channelIds.join(';')] = undefined;
+ }).catch(err => {
+ log.verbose(request.requestId, 'Unsubscription error:', err);
+ socket.send(JSON.stringify({ error: err.toString() }));
+ });
+
+ /**
+ * @param {string|string[]} arrayOrString
+ * @return {string}
+ */
+ const firstParam = arrayOrString => {
+ if (Array.isArray(arrayOrString)) {
+ return arrayOrString[0];
+ } else {
+ return arrayOrString;
+ }
+ };
+
+ wss.on('connection', (ws, req) => {
+ const location = url.parse(req.url, true);
+
+ req.requestId = uuid.v4();
+ req.remoteAddress = ws._socket.remoteAddress;
+
+ /**
+ * @type {WebSocketSession}
+ */
+ const session = {
+ socket: ws,
+ request: req,
+ subscriptions: {},
+ };
+
+ const onEnd = () => {
+ const keys = Object.keys(session.subscriptions);
+
+ keys.forEach(channelIds => {
+ const { listener, stopHeartbeat } = session.subscriptions[channelIds];
+
+ channelIds.split(';').forEach(channelId => {
+ unsubscribe(`${redisPrefix}${channelId}`, listener);
+ });
+
+ stopHeartbeat();
+ });
+ };
+
+ ws.on('close', onEnd);
+ ws.on('error', onEnd);
+
+ ws.on('message', data => {
+ const { type, stream, ...params } = JSON.parse(data);
+
+ if (type === 'subscribe') {
+ subscribeWebsocketToChannel(session, firstParam(stream), params);
+ } else if (type === 'unsubscribe') {
+ unsubscribeWebsocketFromChannel(session, firstParam(stream), params)
+ } else {
+ // Unknown action type
+ }
+ });
+
+ if (location.query.stream) {
+ subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query);
}
});