diff --git a/.prettierignore b/.prettierignore index 6b2f0c1889..e9f288ed00 100644 --- a/.prettierignore +++ b/.prettierignore @@ -83,3 +83,4 @@ AUTHORS.md # Process a few selected JS files !lint-staged.config.js +!/streaming/*.js diff --git a/streaming/errors.js b/streaming/errors.js index 6c44d2cb8f..164826238d 100644 --- a/streaming/errors.js +++ b/streaming/errors.js @@ -29,7 +29,7 @@ export class RequestError extends Error { */ constructor(message) { super(message); - this.name = "RequestError"; + this.name = 'RequestError'; this.status = 400; } } @@ -40,7 +40,7 @@ export class AuthenticationError extends Error { */ constructor(message) { super(message); - this.name = "AuthenticationError"; + this.name = 'AuthenticationError'; this.status = 401; } } diff --git a/streaming/index.js b/streaming/index.js index 154ecbc02c..2e138a861d 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -1,3 +1,4 @@ +/* eslint-disable indent */ // @ts-check import fs from 'node:fs'; @@ -14,8 +15,18 @@ import pg from 'pg'; import pgConnectionString from 'pg-connection-string'; import WebSocket from 'ws'; -import { AuthenticationError, RequestError, extractStatusAndMessage as extractErrorStatusAndMessage } from './errors.js'; -import { logger, httpLogger, initializeLogLevel, attachWebsocketHttpLogger, createWebsocketLogger } from './logging.js'; +import { + AuthenticationError, + RequestError, + extractStatusAndMessage as extractErrorStatusAndMessage, +} from './errors.js'; +import { + logger, + httpLogger, + initializeLogLevel, + attachWebsocketHttpLogger, + createWebsocketLogger, +} from './logging.js'; import { setupMetrics } from './metrics.js'; import { isTruthy, normalizeHashtag, firstParam } from './utils.js'; @@ -24,13 +35,11 @@ const environment = process.env.NODE_ENV || 'development'; // Correctly detect and load .env or .env.production file based on environment: const dotenvFile = environment === 'production' ? '.env.production' : '.env'; const dotenvFilePath = path.resolve( - url.fileURLToPath( - new URL(path.join('..', dotenvFile), import.meta.url) - ) + url.fileURLToPath(new URL(path.join('..', dotenvFile), import.meta.url)), ); dotenv.config({ - path: dotenvFilePath + path: dotenvFilePath, }); initializeLogLevel(process.env, environment); @@ -86,9 +95,15 @@ const parseJSON = (json, req) => { */ if (req) { if (req.accountId) { - req.log.error({ err }, `Error parsing message from user ${req.accountId}`); + req.log.error( + { err }, + `Error parsing message from user ${req.accountId}`, + ); } else { - req.log.error({ err }, `Error parsing message from ${req.remoteAddress}`); + req.log.error( + { err }, + `Error parsing message from ${req.remoteAddress}`, + ); } } else { logger.error({ err }, `Error parsing message from redis`); @@ -129,7 +144,7 @@ const pgConfigFromEnv = (env) => { password: env.DB_PASS || pg.defaults.password, database: env.DB_NAME || 'mastodon_development', host: env.DB_HOST || pg.defaults.host, - port: parseIntFromEnv(env.DB_PORT, pg.defaults.port ?? 5432, 'DB_PORT') + port: parseIntFromEnv(env.DB_PORT, pg.defaults.port ?? 5432, 'DB_PORT'), }, production: { @@ -137,7 +152,7 @@ const pgConfigFromEnv = (env) => { password: env.DB_PASS || '', database: env.DB_NAME || 'mastodon_production', host: env.DB_HOST || 'localhost', - port: parseIntFromEnv(env.DB_PORT, 5432, 'DB_PORT') + port: parseIntFromEnv(env.DB_PORT, 5432, 'DB_PORT'), }, }; @@ -159,25 +174,34 @@ const pgConfigFromEnv = (env) => { // https://github.com/brianc/node-postgres/issues/2280 // // FIXME: clean up once brianc/node-postgres#3128 lands - if (typeof parsedUrl.password === 'string') baseConfig.password = parsedUrl.password; + if (typeof parsedUrl.password === 'string') + baseConfig.password = parsedUrl.password; if (typeof parsedUrl.host === 'string') baseConfig.host = parsedUrl.host; if (typeof parsedUrl.user === 'string') baseConfig.user = parsedUrl.user; if (typeof parsedUrl.port === 'string') { const parsedPort = parseInt(parsedUrl.port, 10); if (isNaN(parsedPort)) { - throw new Error('Invalid port specified in DATABASE_URL environment variable'); + throw new Error( + 'Invalid port specified in DATABASE_URL environment variable', + ); } baseConfig.port = parsedPort; } - if (typeof parsedUrl.database === 'string') baseConfig.database = parsedUrl.database; - if (typeof parsedUrl.options === 'string') baseConfig.options = parsedUrl.options; + if (typeof parsedUrl.database === 'string') + baseConfig.database = parsedUrl.database; + if (typeof parsedUrl.options === 'string') + baseConfig.options = parsedUrl.options; // The pg-connection-string type definition isn't correct, as parsedUrl.ssl // can absolutely be an Object, this is to work around these incorrect // types, including the casting of parsedUrl.ssl to Record if (typeof parsedUrl.ssl === 'boolean') { baseConfig.ssl = parsedUrl.ssl; - } else if (typeof parsedUrl.ssl === 'object' && !Array.isArray(parsedUrl.ssl) && parsedUrl.ssl !== null) { + } else if ( + typeof parsedUrl.ssl === 'object' && + !Array.isArray(parsedUrl.ssl) && + parsedUrl.ssl !== null + ) { /** @type {Record} */ const sslOptions = parsedUrl.ssl; baseConfig.ssl = {}; @@ -196,17 +220,17 @@ const pgConfigFromEnv = (env) => { baseConfig = pgConfigs[environment]; if (env.DB_SSLMODE) { - switch(env.DB_SSLMODE) { - case 'disable': - case '': - baseConfig.ssl = false; - break; - case 'no-verify': - baseConfig.ssl = { rejectUnauthorized: false }; - break; - default: - baseConfig.ssl = {}; - break; + switch (env.DB_SSLMODE) { + case 'disable': + case '': + baseConfig.ssl = false; + break; + case 'no-verify': + baseConfig.ssl = { rejectUnauthorized: false }; + break; + default: + baseConfig.ssl = {}; + break; } } } else { @@ -283,7 +307,7 @@ const CHANNEL_NAMES = [ 'user:notification', 'list', 'direct', - ...PUBLIC_CHANNELS + ...PUBLIC_CHANNELS, ]; const startServer = async () => { @@ -292,13 +316,18 @@ const startServer = async () => { const wss = new WebSocket.Server({ noServer: true }); // Set the X-Request-Id header on WebSockets: - wss.on("headers", function onHeaders(headers, req) { + wss.on('headers', function onHeaders(headers, req) { headers.push(`X-Request-Id: ${req.id}`); }); const app = express(); - app.set('trust proxy', process.env.TRUSTED_PROXY_IP ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) : 'loopback,uniquelocal'); + app.set( + 'trust proxy', + process.env.TRUSTED_PROXY_IP + ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) + : 'loopback,uniquelocal', + ); app.use(httpLogger); app.use(cors()); @@ -312,7 +341,7 @@ const startServer = async () => { // logger. This decorates the `request` object. attachWebsocketHttpLogger(request); - request.log.info("HTTP Upgrade Requested"); + request.log.info('HTTP Upgrade Requested'); /** @param {Error} err */ const onSocketError = (err) => { @@ -330,15 +359,15 @@ const startServer = async () => { // Unfortunately for using the on('upgrade') setup, we need to manually // write a HTTP Response to the Socket to close the connection upgrade // attempt, so the following code is to handle all of that. - const {statusCode, errorMessage } = extractErrorStatusAndMessage(err); + const { statusCode, errorMessage } = extractErrorStatusAndMessage(err); /** @type {Record} */ const headers = { - 'Connection': 'close', + Connection: 'close', 'Content-Type': 'text/plain', 'Content-Length': 0, 'X-Request-Id': request.id, - 'X-Error-Message': errorMessage + 'X-Error-Message': errorMessage, }; // Ensure the socket is closed once we've finished writing to it: @@ -347,16 +376,25 @@ const startServer = async () => { }); // Write the HTTP response manually: - socket.end(`HTTP/1.1 ${statusCode} ${http.STATUS_CODES[statusCode]}\r\n${Object.keys(headers).map((key) => `${key}: ${headers[key]}`).join('\r\n')}\r\n\r\n`); + socket.end( + `HTTP/1.1 ${statusCode} ${http.STATUS_CODES[statusCode]}\r\n${Object.keys( + headers, + ) + .map((key) => `${key}: ${headers[key]}`) + .join('\r\n')}\r\n\r\n`, + ); // Finally, log the error: - request.log.error({ - err, - res: { - statusCode, - headers - } - }, errorMessage); + request.log.error( + { + err, + res: { + statusCode, + headers, + }, + }, + errorMessage, + ); return; } @@ -365,7 +403,9 @@ const startServer = async () => { socket.removeListener('error', onSocketError); wss.handleUpgrade(request, socket, head, function done(ws) { - request.log.info("Authenticated request & upgraded to WebSocket connection"); + request.log.info( + 'Authenticated request & upgraded to WebSocket connection', + ); const wsLogger = createWebsocketLogger(request, resolvedAccount); @@ -418,11 +458,18 @@ const startServer = async () => { * @param {string[]} channels * @returns {function(): void} */ - const subscriptionHeartbeat = channels => { + const subscriptionHeartbeat = (channels) => { const interval = 6 * 60; const tellSubscribed = () => { - channels.forEach(channel => redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3)); + channels.forEach((channel) => + redisClient.set( + `${redisPrefix}subscribed:${channel}`, + '1', + 'EX', + interval * 3, + ), + ); }; tellSubscribed(); @@ -452,9 +499,9 @@ const startServer = async () => { const json = parseJSON(message, null); if (!json) return; - callbacks.forEach(callback => callback(json)); + callbacks.forEach((callback) => callback(json)); }; - redisSubscribeClient.on("message", onRedisMessage); + redisSubscribeClient.on('message', onRedisMessage); /** * @callback SubscriptionListener @@ -496,7 +543,7 @@ const startServer = async () => { return; } - subs[channel] = subs[channel].filter(item => item !== callback); + subs[channel] = subs[channel].filter((item) => item !== callback); if (subs[channel].length === 0) { logger.debug(`Unsubscribe ${channel}`); @@ -517,99 +564,108 @@ const startServer = async () => { * @returns {boolean} */ const isInScope = (req, necessaryScopes) => - req.scopes.some(scope => necessaryScopes.includes(scope)); + req.scopes.some((scope) => necessaryScopes.includes(scope)); /** * @param {string} token * @param {any} req * @returns {Promise} */ - const accountFromToken = (token, req) => new Promise((resolve, reject) => { - pgPool.connect((err, client, done) => { - if (err) { - reject(err); - return; - } - - // @ts-ignore - client.query('SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => { - done(); - + const accountFromToken = (token, req) => + new Promise((resolve, reject) => { + pgPool.connect((err, client, done) => { if (err) { reject(err); return; } - if (result.rows.length === 0) { - reject(new AuthenticationError('Invalid access token')); - return; - } + // @ts-ignore + client.query( + 'SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', + [token], + (err, result) => { + done(); - req.accessTokenId = result.rows[0].id; - req.scopes = result.rows[0].scopes.split(' '); - req.accountId = result.rows[0].account_id; - req.chosenLanguages = result.rows[0].chosen_languages; - req.deviceId = result.rows[0].device_id; + if (err) { + reject(err); + return; + } - resolve({ - accessTokenId: result.rows[0].id, - scopes: result.rows[0].scopes.split(' '), - accountId: result.rows[0].account_id, - chosenLanguages: result.rows[0].chosen_languages, - deviceId: result.rows[0].device_id - }); + if (result.rows.length === 0) { + reject(new AuthenticationError('Invalid access token')); + return; + } + + req.accessTokenId = result.rows[0].id; + req.scopes = result.rows[0].scopes.split(' '); + req.accountId = result.rows[0].account_id; + req.chosenLanguages = result.rows[0].chosen_languages; + req.deviceId = result.rows[0].device_id; + + resolve({ + accessTokenId: result.rows[0].id, + scopes: result.rows[0].scopes.split(' '), + accountId: result.rows[0].account_id, + chosenLanguages: result.rows[0].chosen_languages, + deviceId: result.rows[0].device_id, + }); + }, + ); }); }); - }); /** * @param {any} req * @returns {Promise} */ - const accountFromRequest = (req) => new Promise((resolve, reject) => { - const authorization = req.headers.authorization; - const location = url.parse(req.url, true); - const accessToken = location.query.access_token || req.headers['sec-websocket-protocol']; + const accountFromRequest = (req) => + new Promise((resolve, reject) => { + const authorization = req.headers.authorization; + const location = url.parse(req.url, true); + const accessToken = + location.query.access_token || req.headers['sec-websocket-protocol']; - if (!authorization && !accessToken) { - reject(new AuthenticationError('Missing access token')); - return; - } + if (!authorization && !accessToken) { + reject(new AuthenticationError('Missing access token')); + return; + } - const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken; + const token = authorization + ? authorization.replace(/^Bearer /, '') + : accessToken; - resolve(accountFromToken(token, req)); - }); + resolve(accountFromToken(token, req)); + }); /** * @param {any} req * @returns {string|undefined} */ - const channelNameFromPath = req => { + const channelNameFromPath = (req) => { const { path, query } = req; const onlyMedia = isTruthy(query.only_media); switch (path) { - case '/api/v1/streaming/user': - return 'user'; - case '/api/v1/streaming/user/notification': - return 'user:notification'; - case '/api/v1/streaming/public': - return onlyMedia ? 'public:media' : 'public'; - case '/api/v1/streaming/public/local': - return onlyMedia ? 'public:local:media' : 'public:local'; - case '/api/v1/streaming/public/remote': - return onlyMedia ? 'public:remote:media' : 'public:remote'; - case '/api/v1/streaming/hashtag': - return 'hashtag'; - case '/api/v1/streaming/hashtag/local': - return 'hashtag:local'; - case '/api/v1/streaming/direct': - return 'direct'; - case '/api/v1/streaming/list': - return 'list'; - default: - return undefined; + case '/api/v1/streaming/user': + return 'user'; + case '/api/v1/streaming/user/notification': + return 'user:notification'; + case '/api/v1/streaming/public': + return onlyMedia ? 'public:media' : 'public'; + case '/api/v1/streaming/public/local': + return onlyMedia ? 'public:local:media' : 'public:local'; + case '/api/v1/streaming/public/remote': + return onlyMedia ? 'public:remote:media' : 'public:remote'; + case '/api/v1/streaming/hashtag': + return 'hashtag'; + case '/api/v1/streaming/hashtag/local': + return 'hashtag:local'; + case '/api/v1/streaming/direct': + return 'direct'; + case '/api/v1/streaming/list': + return 'list'; + default: + return undefined; } }; @@ -619,38 +675,48 @@ const startServer = async () => { * @param {string|undefined} channelName * @returns {Promise.} */ - const checkScopes = (req, logger, channelName) => new Promise((resolve, reject) => { - logger.debug(`Checking OAuth scopes for ${channelName}`); + const checkScopes = (req, logger, channelName) => + new Promise((resolve, reject) => { + logger.debug(`Checking OAuth scopes for ${channelName}`); - // When accessing public channels, no scopes are needed - if (channelName && PUBLIC_CHANNELS.includes(channelName)) { - resolve(); - return; - } + // When accessing public channels, no scopes are needed + if (channelName && PUBLIC_CHANNELS.includes(channelName)) { + resolve(); + return; + } - // The `read` scope has the highest priority, if the token has it - // then it can access all streams - const requiredScopes = ['read']; + // The `read` scope has the highest priority, if the token has it + // then it can access all streams + const requiredScopes = ['read']; - // When accessing specifically the notifications stream, - // we need a read:notifications, while in all other cases, - // we can allow access with read:statuses. Mind that the - // user stream will not contain notifications unless - // the token has either read or read:notifications scope - // as well, this is handled separately. - if (channelName === 'user:notification') { - requiredScopes.push('read:notifications'); - } else { - requiredScopes.push('read:statuses'); - } + // When accessing specifically the notifications stream, + // we need a read:notifications, while in all other cases, + // we can allow access with read:statuses. Mind that the + // user stream will not contain notifications unless + // the token has either read or read:notifications scope + // as well, this is handled separately. + if (channelName === 'user:notification') { + requiredScopes.push('read:notifications'); + } else { + requiredScopes.push('read:statuses'); + } - if (req.scopes && requiredScopes.some(requiredScope => req.scopes.includes(requiredScope))) { - resolve(); - return; - } + if ( + req.scopes && + requiredScopes.some((requiredScope) => + req.scopes.includes(requiredScope), + ) + ) { + resolve(); + return; + } - reject(new AuthenticationError('Access token does not have the required scopes')); - }); + reject( + new AuthenticationError( + 'Access token does not have the required scopes', + ), + ); + }); /** * @typedef SystemMessageHandlers @@ -663,7 +729,7 @@ const startServer = async () => { * @returns {SubscriptionListener} */ const createSystemMessageListener = (req, eventHandlers) => { - return message => { + return (message) => { if (!message?.event) { return; } @@ -673,7 +739,9 @@ const startServer = async () => { req.log.debug(`System message for ${req.accountId}: ${event}`); if (event === 'kill') { - req.log.debug(`Closing connection for ${req.accountId} due to expired access token`); + req.log.debug( + `Closing connection for ${req.accountId} due to expired access token`, + ); eventHandlers.onKill(); } else if (event === 'filters_changed') { req.log.debug(`Invalidating filters cache for ${req.accountId}`); @@ -700,7 +768,9 @@ const startServer = async () => { unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener); unsubscribe(`${redisPrefix}${systemChannelId}`, listener); - connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2); + connectedChannels + .labels({ type: 'eventsource', channel: 'system' }) + .dec(2); }); subscribe(`${redisPrefix}${accessTokenChannelId}`, listener); @@ -729,13 +799,17 @@ const startServer = async () => { return; } - accountFromRequest(req).then(() => checkScopes(req, req.log, channelName)).then(() => { - subscribeHttpToSystemChannel(req, res); - }).then(() => { - next(); - }).catch(err => { - next(err); - }); + accountFromRequest(req) + .then(() => checkScopes(req, req.log, channelName)) + .then(() => { + subscribeHttpToSystemChannel(req, res); + }) + .then(() => { + next(); + }) + .catch((err) => { + next(err); + }); }; /** @@ -752,7 +826,7 @@ const startServer = async () => { return; } - const {statusCode, errorMessage } = extractErrorStatusAndMessage(err); + const { statusCode, errorMessage } = extractErrorStatusAndMessage(err); res.writeHead(statusCode, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: errorMessage })); @@ -764,35 +838,45 @@ const startServer = async () => { * @returns {string} */ // @ts-ignore - const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', '); + const placeholders = (arr, shift = 0) => + arr.map((_, i) => `$${i + 1 + shift}`).join(', '); /** * @param {string} listId * @param {any} req * @returns {Promise.} */ - const authorizeListAccess = (listId, req) => new Promise((resolve, reject) => { - const { accountId } = req; + const authorizeListAccess = (listId, req) => + new Promise((resolve, reject) => { + const { accountId } = req; - pgPool.connect((err, client, done) => { - if (err) { - reject(); - return; - } - - // @ts-ignore - client.query('SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1', [listId], (err, result) => { - done(); - - if (err || result.rows.length === 0 || result.rows[0].account_id !== accountId) { + pgPool.connect((err, client, done) => { + if (err) { reject(); return; } - resolve(); + // @ts-ignore + client.query( + 'SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1', + [listId], + (err, result) => { + done(); + + if ( + err || + result.rows.length === 0 || + result.rows[0].account_id !== accountId + ) { + reject(); + return; + } + + resolve(); + }, + ); }); }); - }); /** * @param {string[]} channelIds @@ -804,7 +888,15 @@ const startServer = async () => { * @param {boolean=} needsFiltering * @returns {SubscriptionListener} */ - const streamFrom = (channelIds, req, log, output, attachCloseHandler, destinationType, needsFiltering = false) => { + const streamFrom = ( + channelIds, + req, + log, + output, + attachCloseHandler, + destinationType, + needsFiltering = false, + ) => { log.info({ channelIds }, `Starting stream`); /** @@ -813,11 +905,15 @@ const startServer = async () => { */ const transmit = (event, payload) => { // TODO: Replace "string"-based delete payloads with object payloads: - const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload; + const encodedPayload = + typeof payload === 'object' ? JSON.stringify(payload) : payload; messagesSent.labels({ type: destinationType }).inc(1); - log.debug({ event, payload }, `Transmitting ${event} to ${req.accountId}`); + log.debug( + { event, payload }, + `Transmitting ${event} to ${req.accountId}`, + ); output(event, encodedPayload); }; @@ -827,7 +923,7 @@ const startServer = async () => { // events also include a queued_at value, but this is being removed shortly. /** @type {SubscriptionListener} */ - const listener = message => { + const listener = (message) => { if (!message?.event || !message?.payload) { return; } @@ -844,7 +940,10 @@ const startServer = async () => { // // The channels that need filtering are determined in the function // `channelNameToIds` defined below: - if (!needsFiltering || (event !== 'update' && event !== 'status.update')) { + if ( + !needsFiltering || + (event !== 'update' && event !== 'status.update') + ) { transmit(event, payload); return; } @@ -853,8 +952,14 @@ const startServer = async () => { // filtering of statuses: // Filter based on language: - if (Array.isArray(req.chosenLanguages) && payload.language !== null && req.chosenLanguages.indexOf(payload.language) === -1) { - log.debug(`Message ${payload.id} filtered by language (${payload.language})`); + if ( + Array.isArray(req.chosenLanguages) && + payload.language !== null && + req.chosenLanguages.indexOf(payload.language) === -1 + ) { + log.debug( + `Message ${payload.id} filtered by language (${payload.language})`, + ); return; } @@ -866,7 +971,9 @@ const startServer = async () => { // Filter based on domain blocks, blocks, mutes, or custom filters: // @ts-ignore - const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id)); + const targetAccountIds = [payload.account.id].concat( + payload.mentions.map((item) => item.id), + ); const accountDomain = payload.account.acct.split('@')[1]; // TODO: Move this logic out of the message handling loop @@ -878,7 +985,8 @@ const startServer = async () => { const queries = [ // @ts-ignore - client.query(`SELECT 1 + client.query( + `SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) OR (account_id = $2 AND target_account_id = $1) @@ -886,159 +994,208 @@ const startServer = async () => { SELECT 1 FROM mutes WHERE account_id = $1 - AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, payload.account.id].concat(targetAccountIds)), + AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, + [req.accountId, payload.account.id].concat(targetAccountIds), + ), ]; if (accountDomain) { // @ts-ignore - queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain])); + queries.push( + client.query( + 'SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', + [req.accountId, accountDomain], + ), + ); } // @ts-ignore if (!payload.filtered && !req.cachedFilters) { // @ts-ignore - queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId])); + queries.push( + client.query( + 'SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', + [req.accountId], + ), + ); } - Promise.all(queries).then(values => { - releasePgConnection(); + Promise.all(queries) + .then((values) => { + releasePgConnection(); - // Handling blocks & mutes and domain blocks: If one of those applies, - // then we don't transmit the payload of the event to the client - if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) { - return; - } + // Handling blocks & mutes and domain blocks: If one of those applies, + // then we don't transmit the payload of the event to the client + if ( + values[0].rows.length > 0 || + (accountDomain && values[1].rows.length > 0) + ) { + return; + } - // If the payload already contains the `filtered` property, it means - // that filtering has been applied on the ruby on rails side, as - // such, we don't need to construct or apply the filters in streaming: - if (Object.hasOwn(payload, "filtered")) { - transmit(event, payload); - return; - } - - // Handling for constructing the custom filters and caching them on the request - // TODO: Move this logic out of the message handling lifecycle - // @ts-ignore - if (!req.cachedFilters) { - const filterRows = values[accountDomain ? 2 : 1].rows; + // If the payload already contains the `filtered` property, it means + // that filtering has been applied on the ruby on rails side, as + // such, we don't need to construct or apply the filters in streaming: + if (Object.hasOwn(payload, 'filtered')) { + transmit(event, payload); + return; + } + // Handling for constructing the custom filters and caching them on the request + // TODO: Move this logic out of the message handling lifecycle // @ts-ignore - req.cachedFilters = filterRows.reduce((cache, filter) => { - if (cache[filter.id]) { - cache[filter.id].keywords.push([filter.keyword, filter.whole_word]); - } else { - cache[filter.id] = { - keywords: [[filter.keyword, filter.whole_word]], - expires_at: filter.expires_at, - filter: { - id: filter.id, - title: filter.title, - context: filter.context, - expires_at: filter.expires_at, - // filter.filter_action is the value from the - // custom_filters.action database column, it is an integer - // representing a value in an enum defined by Ruby on Rails: - // - // enum { warn: 0, hide: 1 } - filter_action: ['warn', 'hide'][filter.filter_action], - }, - }; - } + if (!req.cachedFilters) { + const filterRows = values[accountDomain ? 2 : 1].rows; - return cache; - }, {}); - - // Construct the regular expressions for the custom filters: This - // needs to be done in a separate loop as the database returns one - // filterRow per keyword, so we need all the keywords before - // constructing the regular expression - // @ts-ignore - Object.keys(req.cachedFilters).forEach((key) => { // @ts-ignore - req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => { - let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); - - if (whole_word) { - if (/^[\w]/.test(expr)) { - expr = `\\b${expr}`; - } - - if (/[\w]$/.test(expr)) { - expr = `${expr}\\b`; - } + req.cachedFilters = filterRows.reduce((cache, filter) => { + if (cache[filter.id]) { + cache[filter.id].keywords.push([ + filter.keyword, + filter.whole_word, + ]); + } else { + cache[filter.id] = { + keywords: [[filter.keyword, filter.whole_word]], + expires_at: filter.expires_at, + filter: { + id: filter.id, + title: filter.title, + context: filter.context, + expires_at: filter.expires_at, + // filter.filter_action is the value from the + // custom_filters.action database column, it is an integer + // representing a value in an enum defined by Ruby on Rails: + // + // enum { warn: 0, hide: 1 } + filter_action: ['warn', 'hide'][filter.filter_action], + }, + }; } - return expr; - }).join('|'), 'i'); - }); - } + return cache; + }, {}); - // Apply cachedFilters against the payload, constructing a - // `filter_results` array of FilterResult entities - // @ts-ignore - if (req.cachedFilters) { - const status = payload; - // TODO: Calculate searchableContent in Ruby on Rails: + // Construct the regular expressions for the custom filters: This + // needs to be done in a separate loop as the database returns one + // filterRow per keyword, so we need all the keywords before + // constructing the regular expression + // @ts-ignore + Object.keys(req.cachedFilters).forEach((key) => { + // @ts-ignore + req.cachedFilters[key].regexp = new RegExp( + req.cachedFilters[key].keywords + .map(([keyword, whole_word]) => { + let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); + + if (whole_word) { + if (/^[\w]/.test(expr)) { + expr = `\\b${expr}`; + } + + if (/[\w]$/.test(expr)) { + expr = `${expr}\\b`; + } + } + + return expr; + }) + .join('|'), + 'i', + ); + }); + } + + // Apply cachedFilters against the payload, constructing a + // `filter_results` array of FilterResult entities // @ts-ignore - const searchableContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(//g, '\n').replace(/<\/p>

/g, '\n\n'); - const searchableTextContent = JSDOM.fragment(searchableContent).textContent; + if (req.cachedFilters) { + const status = payload; + // TODO: Calculate searchableContent in Ruby on Rails: + // @ts-ignore + const searchableContent = [ + status.spoiler_text || '', + status.content, + ] + .concat( + status.poll && status.poll.options + ? status.poll.options.map((option) => option.title) + : [], + ) + .concat(status.media_attachments.map((att) => att.description)) + .join('\n\n') + .replace(//g, '\n') + .replace(/<\/p>

/g, '\n\n'); + const searchableTextContent = + JSDOM.fragment(searchableContent).textContent; - const now = new Date(); - // @ts-ignore - const filter_results = Object.values(req.cachedFilters).reduce((results, cachedFilter) => { - // Check the filter hasn't expired before applying: - if (cachedFilter.expires_at !== null && cachedFilter.expires_at < now) { - return results; - } + const now = new Date(); + // @ts-ignore + const filter_results = Object.values(req.cachedFilters).reduce( + (results, cachedFilter) => { + // Check the filter hasn't expired before applying: + if ( + cachedFilter.expires_at !== null && + cachedFilter.expires_at < now + ) { + return results; + } - // Just in-case JSDOM fails to find textContent in searchableContent - if (!searchableTextContent) { - return results; - } + // Just in-case JSDOM fails to find textContent in searchableContent + if (!searchableTextContent) { + return results; + } - const keyword_matches = searchableTextContent.match(cachedFilter.regexp); - if (keyword_matches) { - // results is an Array of FilterResult; status_matches is always - // null as we only are only applying the keyword-based custom - // filters, not the status-based custom filters. - // https://docs.joinmastodon.org/entities/FilterResult/ - results.push({ - filter: cachedFilter.filter, - keyword_matches, - status_matches: null - }); - } + const keyword_matches = searchableTextContent.match( + cachedFilter.regexp, + ); + if (keyword_matches) { + // results is an Array of FilterResult; status_matches is always + // null as we only are only applying the keyword-based custom + // filters, not the status-based custom filters. + // https://docs.joinmastodon.org/entities/FilterResult/ + results.push({ + filter: cachedFilter.filter, + keyword_matches, + status_matches: null, + }); + } - return results; - }, []); + return results; + }, + [], + ); - // Send the payload + the FilterResults as the `filtered` property - // to the streaming connection. To reach this code, the `event` must - // have been either `update` or `status.update`, meaning the - // `payload` is a Status entity, which has a `filtered` property: - // - // filtered: https://docs.joinmastodon.org/entities/Status/#filtered - transmit(event, { - ...payload, - filtered: filter_results - }); - } else { - transmit(event, payload); - } - }).catch(err => { - log.error(err); - releasePgConnection(); - }); + // Send the payload + the FilterResults as the `filtered` property + // to the streaming connection. To reach this code, the `event` must + // have been either `update` or `status.update`, meaning the + // `payload` is a Status entity, which has a `filtered` property: + // + // filtered: https://docs.joinmastodon.org/entities/Status/#filtered + transmit(event, { + ...payload, + filtered: filter_results, + }); + } else { + transmit(event, payload); + } + }) + .catch((err) => { + log.error(err); + releasePgConnection(); + }); }); }; - channelIds.forEach(id => { + channelIds.forEach((id) => { subscribe(`${redisPrefix}${id}`, listener); }); if (typeof attachCloseHandler === 'function') { - attachCloseHandler(channelIds.map(id => `${redisPrefix}${id}`), listener); + attachCloseHandler( + channelIds.map((id) => `${redisPrefix}${id}`), + listener, + ); } return listener; @@ -1056,7 +1213,9 @@ const startServer = async () => { // In theory we'll always have a channel name, but channelNameFromPath can return undefined: if (typeof channelName === 'string') { - connectedChannels.labels({ type: 'eventsource', channel: channelName }).inc(); + connectedChannels + .labels({ type: 'eventsource', channel: channelName }) + .inc(); } res.setHeader('Content-Type', 'text/event-stream'); @@ -1075,7 +1234,9 @@ const startServer = async () => { connectedClients.labels({ type: 'eventsource' }).dec(); // In theory we'll always have a channel name, but channelNameFromPath can return undefined: if (typeof channelName === 'string') { - connectedChannels.labels({ type: 'eventsource', channel: channelName }).dec(); + connectedChannels + .labels({ type: 'eventsource', channel: channelName }) + .dec(); } clearInterval(heartbeat); @@ -1093,17 +1254,19 @@ const startServer = async () => { * @returns {function(string[], SubscriptionListener): void} */ - const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => { - req.on('close', () => { - ids.forEach(id => { - unsubscribe(id, listener); - }); + const streamHttpEnd = + (req, closeHandler = undefined) => + (ids, listener) => { + req.on('close', () => { + ids.forEach((id) => { + unsubscribe(id, listener); + }); - if (closeHandler) { - closeHandler(); - } - }); - }; + if (closeHandler) { + closeHandler(); + } + }); + }; /** * @param {http.IncomingMessage} req @@ -1121,7 +1284,7 @@ const startServer = async () => { ws.send(message, (/** @type {Error|undefined} */ err) => { if (err) { - req.log.error({err}, `Failed to send to websocket`); + req.log.error({ err }, `Failed to send to websocket`); } }); }; @@ -1129,7 +1292,7 @@ const startServer = async () => { /** * @param {http.ServerResponse} res */ - const httpNotFound = res => { + const httpNotFound = (res) => { res.writeHead(404, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Not found' })); }; @@ -1152,20 +1315,30 @@ const startServer = async () => { return; } - channelNameToIds(req, channelName, req.query).then(({ channelIds, options }) => { - const onSend = streamToHttp(req, res); - const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); + channelNameToIds(req, channelName, req.query) + .then(({ channelIds, options }) => { + const onSend = streamToHttp(req, res); + const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); - // @ts-ignore - streamFrom(channelIds, req, req.log, onSend, onEnd, 'eventsource', options.needsFiltering); - }).catch(err => { - const {statusCode, errorMessage } = extractErrorStatusAndMessage(err); + // @ts-ignore + streamFrom( + channelIds, + req, + req.log, + onSend, + onEnd, + 'eventsource', + options.needsFiltering, + ); + }) + .catch((err) => { + const { statusCode, errorMessage } = extractErrorStatusAndMessage(err); - res.log.info({ err }, 'Eventsource subscription error'); + res.log.info({ err }, 'Eventsource subscription error'); - res.writeHead(statusCode, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: errorMessage })); - }); + res.writeHead(statusCode, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: errorMessage })); + }); }); /** @@ -1179,7 +1352,7 @@ const startServer = async () => { * @param {any} req * @returns {string[]} */ - const channelsForUserStream = req => { + const channelsForUserStream = (req) => { const arr = [`timeline:${req.accountId}`]; if (isInScope(req, ['crypto']) && req.deviceId) { @@ -1199,113 +1372,120 @@ const startServer = async () => { * @param {StreamParams} params * @returns {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>} */ - const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => { - switch (name) { - case 'user': - resolve({ - channelIds: channelsForUserStream(req), - options: { needsFiltering: false }, - }); + const channelNameToIds = (req, name, params) => + new Promise((resolve, reject) => { + switch (name) { + case 'user': + resolve({ + channelIds: channelsForUserStream(req), + options: { needsFiltering: false }, + }); - break; - case 'user:notification': - resolve({ - channelIds: [`timeline:${req.accountId}:notifications`], - options: { needsFiltering: false }, - }); + break; + case 'user:notification': + resolve({ + channelIds: [`timeline:${req.accountId}:notifications`], + options: { needsFiltering: false }, + }); - break; - case 'public': - resolve({ - channelIds: ['timeline:public'], - options: { needsFiltering: true }, - }); + break; + case 'public': + resolve({ + channelIds: ['timeline:public'], + options: { needsFiltering: true }, + }); - break; - case 'public:local': - resolve({ - channelIds: ['timeline:public:local'], - options: { needsFiltering: true }, - }); + break; + case 'public:local': + resolve({ + channelIds: ['timeline:public:local'], + options: { needsFiltering: true }, + }); - break; - case 'public:remote': - resolve({ - channelIds: ['timeline:public:remote'], - options: { needsFiltering: true }, - }); + break; + case 'public:remote': + resolve({ + channelIds: ['timeline:public:remote'], + options: { needsFiltering: true }, + }); - break; - case 'public:media': - resolve({ - channelIds: ['timeline:public:media'], - options: { needsFiltering: true }, - }); + break; + case 'public:media': + resolve({ + channelIds: ['timeline:public:media'], + options: { needsFiltering: true }, + }); - break; - case 'public:local:media': - resolve({ - channelIds: ['timeline:public:local:media'], - options: { needsFiltering: true }, - }); + break; + case 'public:local:media': + resolve({ + channelIds: ['timeline:public:local:media'], + options: { needsFiltering: true }, + }); - break; - case 'public:remote:media': - resolve({ - channelIds: ['timeline:public:remote:media'], - options: { needsFiltering: true }, - }); + break; + case 'public:remote:media': + resolve({ + channelIds: ['timeline:public:remote:media'], + options: { needsFiltering: true }, + }); - break; - case 'direct': - resolve({ - channelIds: [`timeline:direct:${req.accountId}`], - options: { needsFiltering: false }, - }); + break; + case 'direct': + resolve({ + channelIds: [`timeline:direct:${req.accountId}`], + options: { needsFiltering: false }, + }); - break; - case 'hashtag': - if (!params.tag) { - reject(new RequestError('Missing tag name parameter')); - } else { - resolve({ - channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`], - options: { needsFiltering: true }, - }); + break; + case 'hashtag': + if (!params.tag) { + reject(new RequestError('Missing tag name parameter')); + } else { + resolve({ + channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`], + options: { needsFiltering: true }, + }); + } + + break; + case 'hashtag:local': + if (!params.tag) { + reject(new RequestError('Missing tag name parameter')); + } else { + resolve({ + channelIds: [ + `timeline:hashtag:${normalizeHashtag(params.tag)}:local`, + ], + options: { needsFiltering: true }, + }); + } + + break; + case 'list': + if (!params.list) { + reject(new RequestError('Missing list name parameter')); + return; + } + + authorizeListAccess(params.list, req) + .then(() => { + resolve({ + channelIds: [`timeline:list:${params.list}`], + options: { needsFiltering: false }, + }); + }) + .catch(() => { + reject( + new AuthenticationError('Not authorized to stream this list'), + ); + }); + + break; + default: + reject(new RequestError('Unknown stream type')); } - - break; - case 'hashtag:local': - if (!params.tag) { - reject(new RequestError('Missing tag name parameter')); - } else { - resolve({ - channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}:local`], - options: { needsFiltering: true }, - }); - } - - break; - case 'list': - if (!params.list) { - reject(new RequestError('Missing list name parameter')); - return; - } - - authorizeListAccess(params.list, req).then(() => { - resolve({ - channelIds: [`timeline:list:${params.list}`], - options: { needsFiltering: false }, - }); - }).catch(() => { - reject(new AuthenticationError('Not authorized to stream this list')); - }); - - break; - default: - reject(new RequestError('Unknown stream type')); - } - }); + }); /** * @param {string} channelName @@ -1315,7 +1495,10 @@ const startServer = async () => { const streamNameFromChannelName = (channelName, params) => { if (channelName === 'list' && params.list) { return [channelName, params.list]; - } else if (['hashtag', 'hashtag:local'].includes(channelName) && params.tag) { + } else if ( + ['hashtag', 'hashtag:local'].includes(channelName) && + params.tag + ) { return [channelName, params.tag]; } else { return [channelName]; @@ -1336,46 +1519,69 @@ const startServer = async () => { * @param {StreamParams} params * @returns {void} */ - const subscribeWebsocketToChannel = ({ websocket, request, logger, subscriptions }, channelName, params) => { - checkScopes(request, logger, channelName).then(() => channelNameToIds(request, channelName, params)).then(({ - channelIds, - options, - }) => { - if (subscriptions[channelIds.join(';')]) { - return; - } + const subscribeWebsocketToChannel = ( + { websocket, request, logger, subscriptions }, + channelName, + params, + ) => { + checkScopes(request, logger, channelName) + .then(() => channelNameToIds(request, channelName, params)) + .then(({ channelIds, options }) => { + if (subscriptions[channelIds.join(';')]) { + return; + } - const onSend = streamToWs(request, websocket, streamNameFromChannelName(channelName, params)); - const stopHeartbeat = subscriptionHeartbeat(channelIds); - const listener = streamFrom(channelIds, request, logger, onSend, undefined, 'websocket', options.needsFiltering); + const onSend = streamToWs( + request, + websocket, + streamNameFromChannelName(channelName, params), + ); + const stopHeartbeat = subscriptionHeartbeat(channelIds); + const listener = streamFrom( + channelIds, + request, + logger, + onSend, + undefined, + 'websocket', + options.needsFiltering, + ); - connectedChannels.labels({ type: 'websocket', channel: channelName }).inc(); + connectedChannels + .labels({ type: 'websocket', channel: channelName }) + .inc(); - subscriptions[channelIds.join(';')] = { - channelName, - listener, - stopHeartbeat, - }; - }).catch(err => { - const {statusCode, errorMessage } = extractErrorStatusAndMessage(err); + subscriptions[channelIds.join(';')] = { + channelName, + listener, + stopHeartbeat, + }; + }) + .catch((err) => { + const { statusCode, errorMessage } = extractErrorStatusAndMessage(err); - logger.error({ err }, 'Websocket subscription error'); + logger.error({ err }, 'Websocket subscription error'); - // If we have a socket that is alive and open still, send the error back to the client: - if (websocket.isAlive && websocket.readyState === websocket.OPEN) { - websocket.send(JSON.stringify({ - error: errorMessage, - status: statusCode - })); - } - }); + // If we have a socket that is alive and open still, send the error back to the client: + if (websocket.isAlive && websocket.readyState === websocket.OPEN) { + websocket.send( + JSON.stringify({ + error: errorMessage, + status: statusCode, + }), + ); + } + }); }; /** * @param {WebSocketSession} session * @param {string[]} channelIds */ - const removeSubscription = ({ request, logger, subscriptions }, channelIds) => { + const removeSubscription = ( + { request, logger, subscriptions }, + channelIds, + ) => { logger.info({ channelIds, accountId: request.accountId }, `Ending stream`); const subscription = subscriptions[channelIds.join(';')]; @@ -1384,11 +1590,13 @@ const startServer = async () => { return; } - channelIds.forEach(channelId => { + channelIds.forEach((channelId) => { unsubscribe(`${redisPrefix}${channelId}`, subscription.listener); }); - connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec(); + connectedChannels + .labels({ type: 'websocket', channel: subscription.channelName }) + .dec(); subscription.stopHeartbeat(); delete subscriptions[channelIds.join(';')]; @@ -1403,23 +1611,31 @@ const startServer = async () => { const unsubscribeWebsocketFromChannel = (session, channelName, params) => { const { websocket, request, logger } = session; - channelNameToIds(request, channelName, params).then(({ channelIds }) => { - removeSubscription(session, channelIds); - }).catch(err => { - logger.error({err}, 'Websocket unsubscribe error'); + channelNameToIds(request, channelName, params) + .then(({ channelIds }) => { + removeSubscription(session, channelIds); + }) + .catch((err) => { + logger.error({ err }, 'Websocket unsubscribe error'); - // If we have a socket that is alive and open still, send the error back to the client: - if (websocket.isAlive && websocket.readyState === websocket.OPEN) { - // TODO: Use a better error response here - websocket.send(JSON.stringify({ error: "Error unsubscribing from channel" })); - } - }); + // If we have a socket that is alive and open still, send the error back to the client: + if (websocket.isAlive && websocket.readyState === websocket.OPEN) { + // TODO: Use a better error response here + websocket.send( + JSON.stringify({ error: 'Error unsubscribing from channel' }), + ); + } + }); }; /** * @param {WebSocketSession} session */ - const subscribeWebsocketToSystemChannel = ({ websocket, request, subscriptions }) => { + const subscribeWebsocketToSystemChannel = ({ + websocket, + request, + subscriptions, + }) => { const accessTokenChannelId = `timeline:access_token:${request.accessTokenId}`; const systemChannelId = `timeline:system:${request.accountId}`; @@ -1435,15 +1651,13 @@ const startServer = async () => { subscriptions[accessTokenChannelId] = { channelName: 'system', listener, - stopHeartbeat: () => { - }, + stopHeartbeat: () => {}, }; subscriptions[systemChannelId] = { channelName: 'system', listener, - stopHeartbeat: () => { - }, + stopHeartbeat: () => {}, }; connectedChannels.labels({ type: 'websocket', channel: 'system' }).inc(2); @@ -1479,7 +1693,7 @@ const startServer = async () => { ws.on('close', function onWebsocketClose() { const subscriptions = Object.keys(session.subscriptions); - subscriptions.forEach(channelIds => { + subscriptions.forEach((channelIds) => { removeSubscription(session, channelIds.split(';')); }); @@ -1506,7 +1720,10 @@ const startServer = async () => { ws.on('message', (data, isBinary) => { if (isBinary) { log.warn('Received binary data, closing connection'); - ws.close(1003, 'The mastodon streaming server does not support binary messages'); + ws.close( + 1003, + 'The mastodon streaming server does not support binary messages', + ); return; } const message = data.toString('utf8'); @@ -1532,14 +1749,18 @@ const startServer = async () => { const location = req.url && url.parse(req.url, true); if (location && location.query.stream) { - subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query); + subscribeWebsocketToChannel( + session, + firstParam(location.query.stream), + location.query, + ); } } wss.on('connection', onConnection); setInterval(() => { - wss.clients.forEach(ws => { + wss.clients.forEach((ws) => { // @ts-ignore if (ws.isAlive === false) { ws.terminate(); @@ -1552,7 +1773,7 @@ const startServer = async () => { }); }, 30000); - attachServerWithConfig(server, address => { + attachServerWithConfig(server, (address) => { logger.info(`Streaming API now listening on ${address}`); }); @@ -1580,7 +1801,7 @@ const startServer = async () => { * @param {function(string): void} [onSuccess] */ const attachServerWithConfig = (server, onSuccess) => { - if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) { + if (process.env.SOCKET || (process.env.PORT && isNaN(+process.env.PORT))) { server.listen(process.env.SOCKET || process.env.PORT, () => { if (onSuccess) { fs.chmodSync(server.address(), 0o666); @@ -1588,11 +1809,15 @@ const attachServerWithConfig = (server, onSuccess) => { } }); } else { - server.listen(+(process.env.PORT || 4000), process.env.BIND || '127.0.0.1', () => { - if (onSuccess) { - onSuccess(`${server.address().address}:${server.address().port}`); - } - }); + server.listen( + +(process.env.PORT || 4000), + process.env.BIND || '127.0.0.1', + () => { + if (onSuccess) { + onSuccess(`${server.address().address}:${server.address().port}`); + } + }, + ); } }; diff --git a/streaming/logging.js b/streaming/logging.js index e1c552c22e..181b6df576 100644 --- a/streaming/logging.js +++ b/streaming/logging.js @@ -31,18 +31,21 @@ function sanitizeRequestLog(req) { const log = pinoHttpSerializers.req(req); if (typeof log.url === 'string' && log.url.includes('access_token')) { // Doorkeeper uses SecureRandom.urlsafe_base64 per RFC 6749 / RFC 6750 - log.url = log.url.replace(/(access_token)=([a-zA-Z0-9\-_]+)/gi, '$1=[Redacted]'); + log.url = log.url.replace( + /(access_token)=([a-zA-Z0-9\-_]+)/gi, + '$1=[Redacted]', + ); } return log; } export const logger = pino({ - name: "streaming", + name: 'streaming', // Reformat the log level to a string: formatters: { level: (label) => { return { - level: label + level: label, }; }, }, @@ -54,17 +57,17 @@ export const logger = pino({ 'req.headers["sec-websocket-protocol"]', 'req.headers.authorization', 'req.headers.cookie', - 'req.query.access_token' - ] - } + 'req.query.access_token', + ], + }, }); export const httpLogger = pinoHttp({ logger, genReqId: generateRequestId, serializers: { - req: sanitizeRequestLog - } + req: sanitizeRequestLog, + }, }); /** @@ -90,11 +93,11 @@ export function createWebsocketLogger(request, resolvedAccount) { return logger.child({ req: { - id: request.id + id: request.id, }, account: { - id: resolvedAccount.accountId ?? null - } + id: resolvedAccount.accountId ?? null, + }, }); } @@ -104,7 +107,10 @@ export function createWebsocketLogger(request, resolvedAccount) { * @param {string} environment */ export function initializeLogLevel(env, environment) { - if (env.LOG_LEVEL && Object.keys(logger.levels.values).includes(env.LOG_LEVEL)) { + if ( + env.LOG_LEVEL && + Object.keys(logger.levels.values).includes(env.LOG_LEVEL) + ) { logger.level = env.LOG_LEVEL; } else if (environment === 'development') { logger.level = 'debug'; diff --git a/streaming/metrics.js b/streaming/metrics.js index a029d778fc..90c5601fec 100644 --- a/streaming/metrics.js +++ b/streaming/metrics.js @@ -55,7 +55,7 @@ export function setupMetrics(channels, pgPool) { const connectedChannels = new metrics.Gauge({ name: 'connected_channels', help: 'The number of channels the streaming server is streaming to', - labelNames: [ 'type', 'channel' ] + labelNames: ['type', 'channel'], }); const redisSubscriptions = new metrics.Gauge({ @@ -65,13 +65,13 @@ export function setupMetrics(channels, pgPool) { const redisMessagesReceived = new metrics.Counter({ name: 'redis_messages_received_total', - help: 'The total number of messages the streaming server has received from redis subscriptions' + help: 'The total number of messages the streaming server has received from redis subscriptions', }); const messagesSent = new metrics.Counter({ name: 'messages_sent_total', help: 'The total number of messages the streaming server sent to clients per connection type', - labelNames: [ 'type' ] + labelNames: ['type'], }); // Prime the gauges so we don't loose metrics between restarts: @@ -80,7 +80,7 @@ export function setupMetrics(channels, pgPool) { connectedClients.set({ type: 'eventsource' }, 0); // For each channel, initialize the gauges at zero; There's only a finite set of channels available - channels.forEach(( channel ) => { + channels.forEach((channel) => { connectedChannels.set({ type: 'websocket', channel }, 0); connectedChannels.set({ type: 'eventsource', channel }, 0); }); diff --git a/streaming/utils.js b/streaming/utils.js index 4610bf660d..6b8e13ffff 100644 --- a/streaming/utils.js +++ b/streaming/utils.js @@ -1,16 +1,6 @@ // @ts-check -const FALSE_VALUES = [ - false, - 0, - '0', - 'f', - 'F', - 'false', - 'FALSE', - 'off', - 'OFF', -]; +const FALSE_VALUES = [false, 0, '0', 'f', 'F', 'false', 'FALSE', 'off', 'OFF']; /** * @param {any} value @@ -24,8 +14,10 @@ export function isTruthy(value) { * See app/lib/ascii_folder.rb for the canon definitions * of these constants */ -const NON_ASCII_CHARS = 'ÀÁÂÃÄÅàáâãäåĀāĂ㥹ÇçĆćĈĉĊċČčÐðĎďĐđÈÉÊËèéêëĒēĔĕĖėĘęĚěĜĝĞğĠġĢģĤĥĦħÌÍÎÏìíîïĨĩĪīĬĭĮįİıĴĵĶķĸĹĺĻļĽľĿŀŁłÑñŃńŅņŇňʼnŊŋÒÓÔÕÖØòóôõöøŌōŎŏŐőŔŕŖŗŘřŚśŜŝŞşŠšſŢţŤťŦŧÙÚÛÜùúûüŨũŪūŬŭŮůŰűŲųŴŵÝýÿŶŷŸŹźŻżŽž'; -const EQUIVALENT_ASCII_CHARS = 'AAAAAAaaaaaaAaAaAaCcCcCcCcCcDdDdDdEEEEeeeeEeEeEeEeEeGgGgGgGgHhHhIIIIiiiiIiIiIiIiIiJjKkkLlLlLlLlLlNnNnNnNnnNnOOOOOOooooooOoOoOoRrRrRrSsSsSsSssTtTtTtUUUUuuuuUuUuUuUuUuUuWwYyyYyYZzZzZz'; +const NON_ASCII_CHARS = + 'ÀÁÂÃÄÅàáâãäåĀāĂ㥹ÇçĆćĈĉĊċČčÐðĎďĐđÈÉÊËèéêëĒēĔĕĖėĘęĚěĜĝĞğĠġĢģĤĥĦħÌÍÎÏìíîïĨĩĪīĬĭĮįİıĴĵĶķĸĹĺĻļĽľĿŀŁłÑñŃńŅņŇňʼnŊŋÒÓÔÕÖØòóôõöøŌōŎŏŐőŔŕŖŗŘřŚśŜŝŞşŠšſŢţŤťŦŧÙÚÛÜùúûüŨũŪūŬŭŮůŰűŲųŴŵÝýÿŶŷŸŹźŻżŽž'; +const EQUIVALENT_ASCII_CHARS = + 'AAAAAAaaaaaaAaAaAaCcCcCcCcCcDdDdDdEEEEeeeeEeEeEeEeEeGgGgGgGgHhHhIIIIiiiiIiIiIiIiIiJjKkkLlLlLlLlLlNnNnNnNnnNnOOOOOOooooooOoOoOoRrRrRrSsSsSsSssTtTtTtUUUUuuuuUuUuUuUuUuUuWwYyyYyYZzZzZz'; /** * @param {string} str @@ -34,7 +26,7 @@ const EQUIVALENT_ASCII_CHARS = 'AAAAAAaaaaaaAaAaAaCcCcCcCcCcDdDdDdEEEEeeeeEeEeEe export function foldToASCII(str) { const regex = new RegExp(NON_ASCII_CHARS.split('').join('|'), 'g'); - return str.replace(regex, function(match) { + return str.replace(regex, function (match) { const index = NON_ASCII_CHARS.indexOf(match); return EQUIVALENT_ASCII_CHARS[index]; }); @@ -45,7 +37,10 @@ export function foldToASCII(str) { * @returns {string} */ export function normalizeHashtag(str) { - return foldToASCII(str.normalize('NFKC').toLowerCase()).replace(/[^\p{L}\p{N}_\u00b7\u200c]/gu, ''); + return foldToASCII(str.normalize('NFKC').toLowerCase()).replace( + /[^\p{L}\p{N}_\u00b7\u200c]/gu, + '', + ); } /**