This commit is contained in:
mgabdev
2020-03-04 17:26:01 -05:00
parent 143725b5bd
commit 567894f614
109 changed files with 976 additions and 1643 deletions

View File

@@ -89,27 +89,27 @@ const startWorker = (workerId) => {
const pgConfigs = {
development: {
user: process.env.DB_USER || pg.defaults.user,
user: process.env.DB_USER || pg.defaults.user,
password: process.env.DB_PASS || pg.defaults.password,
database: process.env.DB_NAME || 'gabsocial_development',
host: process.env.DB_HOST || pg.defaults.host,
port: process.env.DB_PORT || pg.defaults.port,
max: 10,
host: process.env.DB_HOST || pg.defaults.host,
port: process.env.DB_PORT || pg.defaults.port,
max: 10,
},
production: {
user: process.env.DB_USER || 'gabsocial',
user: process.env.DB_USER || 'gabsocial',
password: process.env.DB_PASS || '',
database: process.env.DB_NAME || 'gabsocial_production',
host: process.env.DB_HOST || 'localhost',
port: process.env.DB_PORT || 5432,
max: 10,
host: process.env.DB_HOST || 'localhost',
port: process.env.DB_PORT || 5432,
max: 10,
},
};
if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') {
pgConfigs.development.ssl = true;
pgConfigs.production.ssl = true;
pgConfigs.production.ssl = true;
}
const app = express();
@@ -120,9 +120,9 @@ const startWorker = (workerId) => {
const redisNamespace = process.env.REDIS_NAMESPACE || null;
const redisParams = {
host: process.env.REDIS_HOST || '127.0.0.1',
port: process.env.REDIS_PORT || 6379,
db: process.env.REDIS_DB || 0,
host: process.env.REDIS_HOST || '127.0.0.1',
port: process.env.REDIS_PORT || 6379,
db: process.env.REDIS_DB || 0,
password: process.env.REDIS_PASSWORD,
};
@@ -150,12 +150,12 @@ const startWorker = (workerId) => {
});
const subscriptionHeartbeat = (channel) => {
const interval = 6*60;
const interval = 6 * 60;
const tellSubscribed = () => {
redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval*3);
redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3);
};
tellSubscribed();
const heartbeat = setInterval(tellSubscribed, interval*1000);
const heartbeat = setInterval(tellSubscribed, interval * 1000);
return () => {
clearInterval(heartbeat);
};
@@ -327,7 +327,7 @@ const startWorker = (workerId) => {
accountFromRequest(req, next, authRequired, allowedScopes);
};
const errorMiddleware = (err, req, res, {}) => {
const errorMiddleware = (err, req, res, { }) => {
log.error(req.requestId, err.toString());
res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: err.statusCode ? err.toString() : 'An unexpected error occurred' }));
@@ -385,8 +385,8 @@ const startWorker = (workerId) => {
const { event, payload, queued_at } = JSON.parse(message);
const transmit = () => {
const now = new Date().getTime();
const delta = now - queued_at;
const now = new Date().getTime();
const delta = now - queued_at;
const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
@@ -408,9 +408,9 @@ const startWorker = (workerId) => {
return;
}
const unpackedPayload = payload;
const unpackedPayload = payload;
const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
const accountDomain = unpackedPayload.account.acct.split('@')[1];
const accountDomain = unpackedPayload.account.acct.split('@')[1];
if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
@@ -550,14 +550,14 @@ const startWorker = (workerId) => {
app.get('/api/v1/streaming/public', (req, res) => {
const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true';
const channel = onlyMedia ? 'timeline:public:media' : 'timeline:public';
const channel = onlyMedia ? 'timeline:public:media' : 'timeline:public';
streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true);
});
app.get('/api/v1/streaming/public/local', (req, res) => {
const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true';
const channel = onlyMedia ? 'timeline:public:local:media' : 'timeline:public:local';
const channel = onlyMedia ? 'timeline:public:local:media' : 'timeline:public:local';
streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true);
});
@@ -607,84 +607,84 @@ const startWorker = (workerId) => {
wss.on('connection', (ws, req) => {
const location = url.parse(req.url, true);
req.requestId = uuid.v4();
req.requestId = uuid.v4();
req.remoteAddress = ws._socket.remoteAddress;
let channel;
switch(location.query.stream) {
case 'statuscard':
channel = `statuscard:${req.accountId}`;
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
break;
case 'user':
channel = `timeline:${req.accountId}`;
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
break;
case 'user:notification':
streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), false, true);
break;
case 'public':
streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'public:local':
streamFrom('timeline:public:local', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'public:media':
streamFrom('timeline:public:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'public:local:media':
streamFrom('timeline:public:local:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'direct':
channel = `timeline:direct:${req.accountId}`;
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)), true);
break;
case 'hashtag':
if (!location.query.tag || location.query.tag.length === 0) {
ws.close();
return;
}
streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'hashtag:local':
if (!location.query.tag || location.query.tag.length === 0) {
ws.close();
return;
}
streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}:local`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'list':
const listId = location.query.list;
authorizeListAccess(listId, req, authorized => {
if (!authorized) {
switch (location.query.stream) {
case 'statuscard':
channel = `statuscard:${req.accountId}`;
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
break;
case 'user':
channel = `timeline:${req.accountId}`;
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
break;
case 'user:notification':
streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), false, true);
break;
case 'public':
streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'public:local':
streamFrom('timeline:public:local', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'public:media':
streamFrom('timeline:public:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'public:local:media':
streamFrom('timeline:public:local:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'direct':
channel = `timeline:direct:${req.accountId}`;
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)), true);
break;
case 'hashtag':
if (!location.query.tag || location.query.tag.length === 0) {
ws.close();
return;
}
channel = `timeline:list:${listId}`;
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
});
break;
case 'group':
const groupId = location.query.group;
authorizeGroupAccess(groupId, req, authorized => {
if (!authorized) {
streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'hashtag:local':
if (!location.query.tag || location.query.tag.length === 0) {
ws.close();
return;
}
channel = `timeline:group:${groupId}`;
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
});
break;
default:
ws.close();
streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}:local`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'list':
const listId = location.query.list;
authorizeListAccess(listId, req, authorized => {
if (!authorized) {
ws.close();
return;
}
channel = `timeline:list:${listId}`;
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
});
break;
case 'group':
const groupId = location.query.group;
authorizeGroupAccess(groupId, req, authorized => {
if (!authorized) {
ws.close();
return;
}
channel = `timeline:group:${groupId}`;
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
});
break;
default:
ws.close();
}
});