import log from 'npmlog'
import url from 'url'
import WebSocket from 'ws'
+import uuid from 'uuid'
const env = process.env.NODE_ENV || 'development'
next()
}
+const setRequestId = (req, res, next) => {
+ req.requestId = uuid.v4()
+ res.header('X-Request-Id', req.requestId)
+
+ next()
+}
+
const accountFromToken = (token, req, next) => {
pgPool.connect((err, client, done) => {
if (err) {
}
const errorMiddleware = (err, req, res, next) => {
- log.error(err)
+ log.error(req.requestId, err)
res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occurred' }))
}
const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
const streamFrom = (redisClient, id, req, output, needsFiltering = false) => {
- log.verbose(`Starting stream from ${id} for ${req.accountId}`)
+ log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`)
redisClient.on('message', (channel, message) => {
const { event, payload, queued_at } = JSON.parse(message)
const now = new Date().getTime()
const delta = now - queued_at;
- log.silly(`Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`)
+ log.silly(req.requestId, `Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`)
output(event, payload)
}
const heartbeat = setInterval(() => res.write(':thump\n'), 15000)
req.on('close', () => {
- log.verbose(`Ending stream for ${req.accountId}`)
+ log.verbose(req.requestId, `Ending stream for ${req.accountId}`)
clearInterval(heartbeat)
redisClient.quit()
})
// Setup stream output to WebSockets
const streamToWs = (req, ws, redisClient) => {
ws.on('close', () => {
- log.verbose(`Ending stream for ${req.accountId}`)
+ log.verbose(req.requestId, `Ending stream for ${req.accountId}`)
redisClient.quit()
})
return (event, payload) => {
+ if (ws.readyState !== ws.OPEN) {
+ log.error(req.requestId, 'Tried writing to closed socket')
+ return
+ }
+
ws.send(JSON.stringify({ event, payload }))
}
}
password: process.env.REDIS_PASSWORD
})
+app.use(setRequestId)
app.use(allowCrossDomain)
app.use(authenticationMiddleware)
app.use(errorMiddleware)
wss.on('connection', ws => {
const location = url.parse(ws.upgradeReq.url, true)
const token = location.query.access_token
- const req = {}
+ const req = { requestId: uuid.v4() }
accountFromToken(token, req, err => {
if (err) {
- log.error(err)
+ log.error(req.requestId, err)
ws.close()
return
}