Remove prefix from streaming server

This commit is contained in:
Matt Jankowski 2024-02-02 16:01:10 -05:00
parent e8cdb5b40d
commit 446f36aa3f
1 changed files with 11 additions and 18 deletions

View File

@ -228,7 +228,6 @@ const pgConfigFromEnv = (env) => {
/**
* @typedef RedisConfiguration
* @property {import('ioredis').RedisOptions} redisParams
* @property {string} redisPrefix
* @property {string|undefined} redisUrl
*/
@ -237,10 +236,6 @@ const pgConfigFromEnv = (env) => {
* @returns {RedisConfiguration} configuration for the Redis connection
*/
const redisConfigFromEnv = (env) => {
// ioredis *can* transparently add prefixes for us, but it doesn't *in some cases*,
// which means we can't use it. But this is something that should be looked into.
const redisPrefix = env.REDIS_NAMESPACE ? `${env.REDIS_NAMESPACE}:` : '';
let redisPort = parseIntFromEnv(env.REDIS_PORT, 6379, 'REDIS_PORT');
let redisDatabase = parseIntFromEnv(env.REDIS_DB, 0, 'REDIS_DB');
@ -259,7 +254,6 @@ const redisConfigFromEnv = (env) => {
return {
redisParams,
redisPrefix,
redisUrl: typeof env.REDIS_URL === 'string' ? env.REDIS_URL : undefined,
};
};
@ -382,7 +376,6 @@ const startServer = async () => {
const redisConfig = redisConfigFromEnv(process.env);
const redisSubscribeClient = await createRedisClient(redisConfig);
const redisClient = await createRedisClient(redisConfig);
const { redisPrefix } = redisConfig;
const metrics = setupMetrics(CHANNEL_NAMES, pgPool);
// TODO: migrate all metrics to metrics.X.method() instead of just X.method()
@ -422,7 +415,7 @@ const startServer = async () => {
const interval = 6 * 60;
const tellSubscribed = () => {
channels.forEach(channel => redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3));
channels.forEach(channel => redisClient.set(`subscribed:${channel}`, '1', 'EX', interval * 3));
};
tellSubscribed();
@ -443,7 +436,7 @@ const startServer = async () => {
const callbacks = subs[channel];
logger.debug(`New message on channel ${redisPrefix}${channel}`);
logger.debug(`New message on channel ${channel}`);
if (!callbacks) {
return;
@ -697,14 +690,14 @@ const startServer = async () => {
});
res.on('close', () => {
unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
unsubscribe(`${redisPrefix}${systemChannelId}`, listener);
unsubscribe(`${accessTokenChannelId}`, listener);
unsubscribe(`${systemChannelId}`, listener);
connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2);
});
subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
subscribe(`${redisPrefix}${systemChannelId}`, listener);
subscribe(`${accessTokenChannelId}`, listener);
subscribe(`${systemChannelId}`, listener);
connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2);
};
@ -1034,11 +1027,11 @@ const startServer = async () => {
};
channelIds.forEach(id => {
subscribe(`${redisPrefix}${id}`, listener);
subscribe(`${id}`, listener);
});
if (typeof attachCloseHandler === 'function') {
attachCloseHandler(channelIds.map(id => `${redisPrefix}${id}`), listener);
attachCloseHandler(channelIds.map(id => `${id}`), listener);
}
return listener;
@ -1385,7 +1378,7 @@ const startServer = async () => {
}
channelIds.forEach(channelId => {
unsubscribe(`${redisPrefix}${channelId}`, subscription.listener);
unsubscribe(`${channelId}`, subscription.listener);
});
connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec();
@ -1429,8 +1422,8 @@ const startServer = async () => {
},
});
subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
subscribe(`${redisPrefix}${systemChannelId}`, listener);
subscribe(`${accessTokenChannelId}`, listener);
subscribe(`${systemChannelId}`, listener);
subscriptions[accessTokenChannelId] = {
channelName: 'system',