const os = require('os');
const throng = require('throng');
const dotenv = require('dotenv');
const express = require('express');
const http = require('http');
const redis = require('redis');
const pg = require('pg');
const log = require('npmlog');
const url = require('url');
const { WebSocketServer } = require('@clusterws/cws');
const uuid = require('uuid');
const fs = require('fs');
const env = process.env.NODE_ENV || 'development';
const alwaysRequireAuth = process.env.WHITELIST_MODE === 'true' || process.env.AUTHORIZED_FETCH === 'true';
dotenv.config({
path: env === 'production' ? '.env.production' : '.env',
});
log.level = process.env.LOG_LEVEL || 'verbose';
const dbUrlToConfig = (dbUrl) => {
if (!dbUrl) {
return {};
}
const params = url.parse(dbUrl, true);
const config = {};
if (params.auth) {
[config.user, config.password] = params.auth.split(':');
}
if (params.hostname) {
config.host = params.hostname;
}
if (params.port) {
config.port = params.port;
}
if (params.pathname) {
config.database = params.pathname.split('/')[1];
}
const ssl = params.query && params.query.ssl;
if (ssl && ssl === 'true' || ssl === '1') {
config.ssl = true;
}
return config;
};
const redisUrlToClient = (defaultConfig, redisUrl) => {
const config = defaultConfig;
if (!redisUrl) {
return redis.createClient(config);
}
if (redisUrl.startsWith('unix://')) {
return redis.createClient(redisUrl.slice(7), config);
}
return redis.createClient(Object.assign(config, {
url: redisUrl,
}));
};
const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
const startMaster = () => {
if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
}
log.info(`Starting streaming API server master with ${numWorkers} workers`);
};
const startWorker = (workerId) => {
log.info(`Starting worker ${workerId}`);
const pgConfigs = {
development: {
user: process.env.DB_USER || pg.defaults.user,
password: process.env.DB_PASS || pg.defaults