const randomIntUpTo = max => Math.floor(Math.random() * Math.floor(max));
+const knownEventTypes = [
+ 'update',
+ 'delete',
+ 'notification',
+ 'conversation',
+ 'filters_changed',
+];
+
export function connectStream(path, pollingRefresh = null, callbacks = () => ({ onConnect() {}, onDisconnect() {}, onReceive() {} })) {
return (dispatch, getState) => {
const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']);
export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) {
- const params = [ `stream=${stream}` ];
-
- const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken);
-
- ws.onopen = connected;
- ws.onmessage = e => received(JSON.parse(e.data));
- ws.onclose = disconnected;
- ws.onreconnect = reconnected;
+ const params = stream.split('&');
+ stream = params.shift();
+
+ if (streamingAPIBaseURL.startsWith('ws')) {
+ params.unshift(`stream=${stream}`);
+ const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken);
+
+ ws.onopen = connected;
+ ws.onmessage = e => received(JSON.parse(e.data));
+ ws.onclose = disconnected;
+ ws.onreconnect = reconnected;
+
+ return ws;
+ }
+
+ params.push(`access_token=${accessToken}`);
+ const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${stream}?${params.join('&')}`);
+
+ let firstConnect = true;
+ es.onopen = () => {
+ if (firstConnect) {
+ firstConnect = false;
+ connected();
+ } else {
+ reconnected();
+ }
+ };
+ for (let type of knownEventTypes) {
+ es.addEventListener(type, (e) => {
+ received({
+ event: e.type,
+ payload: e.data,
+ });
+ });
+ }
+ es.onerror = disconnected;
- return ws;
+ return es;
};