Removed streaming for group, group_collection, hashtag, link timelines

• Removed:
- streaming for group, group_collection, hashtag, link timelines
This commit is contained in:
mgabdev 2020-11-04 21:09:27 -06:00
parent 880dad7b5a
commit f2f3382677
7 changed files with 3 additions and 210 deletions

View File

@ -52,11 +52,7 @@ export function connectTimelineStream (timelineId, path, pollingRefresh = null,
export const connectUserStream = () => connectTimelineStream('home', 'user'); export const connectUserStream = () => connectTimelineStream('home', 'user');
export const connectProStream = () => connectTimelineStream('pro', 'pro'); export const connectProStream = () => connectTimelineStream('pro', 'pro');
export const connectLinkStream = (linkId, accept) => connectTimelineStream(`link:${linkId}`, `link&linkId=${linkId}`, null, accept);
export const connectHashtagStream = (id, tag, accept) => connectTimelineStream(`hashtag:${id}`, `hashtag&tag=${tag}`, null, accept);
export const connectListStream = id => connectTimelineStream(`list:${id}`, `list&list=${id}`); export const connectListStream = id => connectTimelineStream(`list:${id}`, `list&list=${id}`);
export const connectGroupStream = id => connectTimelineStream(`group:${id}`, `group&group=${id}`);
export const connectGroupCollectionStream = (collectionType, sortBy) => connectTimelineStream(`group_collection:${collectionType}:${sortBy}`, `group_collection&group_collection=${collectionType}`);
export const connectStatusUpdateStream = () => { export const connectStatusUpdateStream = () => {
return connectStream('statuscard', null, (dispatch, getState) => { return connectStream('statuscard', null, (dispatch, getState) => {

View File

@ -4,7 +4,6 @@ import { connect } from 'react-redux'
import { injectIntl, defineMessages } from 'react-intl' import { injectIntl, defineMessages } from 'react-intl'
import { List as ImmutableList } from 'immutable' import { List as ImmutableList } from 'immutable'
import { me } from '../initial_state' import { me } from '../initial_state'
import { connectGroupCollectionStream } from '../actions/streaming'
import { import {
clearTimeline, clearTimeline,
expandGroupCollectionTimeline, expandGroupCollectionTimeline,
@ -30,28 +29,6 @@ class GroupCollectionTimeline extends React.PureComponent {
loadCount: 0, loadCount: 0,
} }
_unsubscribe() {
if (this.disconnect && !!me) {
this.disconnect()
this.disconnect = null
}
}
_subscribe = () => {
if (!!me) {
const {
collectionType,
sortByValue,
sortByTopValue,
} = this.props
if (collectionType === 'member' && sortByValue === 'new') {
const sortBy = getSortBy(sortByValue, sortByTopValue)
this.disconnect = this.props.onConnectGroupCollectionStream(collectionType, sortBy)
}
}
}
componentDidMount() { componentDidMount() {
const { const {
collectionType, collectionType,
@ -66,8 +43,6 @@ class GroupCollectionTimeline extends React.PureComponent {
} else { } else {
const sortBy = getSortBy(sortByValue, sortByTopValue) const sortBy = getSortBy(sortByValue, sortByTopValue)
this.props.onExpandGroupCollectionTimeline(collectionType, { sortBy }) this.props.onExpandGroupCollectionTimeline(collectionType, { sortBy })
this._subscribe()
} }
} }
@ -75,17 +50,11 @@ class GroupCollectionTimeline extends React.PureComponent {
if (prevProps.sortByValue !== this.props.sortByValue || if (prevProps.sortByValue !== this.props.sortByValue ||
prevProps.sortByTopValue !== this.props.sortByTopValue || prevProps.sortByTopValue !== this.props.sortByTopValue ||
prevProps.collectionType !== this.props.collectionType) { prevProps.collectionType !== this.props.collectionType) {
this._unsubscribe()
this._subscribe()
this.props.onClearTimeline(`group_collection:${prevProps.collectionType}`) this.props.onClearTimeline(`group_collection:${prevProps.collectionType}`)
this.handleLoadMore() this.handleLoadMore()
} }
} }
componentWillUnmount() {
this._unsubscribe()
}
handleLoadMore = (maxId) => { handleLoadMore = (maxId) => {
const { const {
collectionType, collectionType,
@ -162,9 +131,6 @@ const mapStateToProps = (state) => {
} }
const mapDispatchToProps = (dispatch) => ({ const mapDispatchToProps = (dispatch) => ({
onConnectGroupCollectionStream(collectionType, sortBy) {
dispatch(connectGroupCollectionStream(collectionType, sortBy))
},
onClearTimeline(timeline) { onClearTimeline(timeline) {
dispatch(clearTimeline(timeline)) dispatch(clearTimeline(timeline))
}, },
@ -181,7 +147,6 @@ const mapDispatchToProps = (dispatch) => ({
GroupCollectionTimeline.propTypes = { GroupCollectionTimeline.propTypes = {
params: PropTypes.object.isRequired, params: PropTypes.object.isRequired,
onConnectGroupCollectionStream: PropTypes.func.isRequired,
onClearTimeline: PropTypes.func.isRequired, onClearTimeline: PropTypes.func.isRequired,
onExpandGroupCollectionTimeline: PropTypes.func.isRequired, onExpandGroupCollectionTimeline: PropTypes.func.isRequired,
setFeaturedTop: PropTypes.func.isRequired, setFeaturedTop: PropTypes.func.isRequired,

View File

@ -7,7 +7,6 @@ import { List as ImmutableList } from 'immutable'
import { injectIntl, defineMessages } from 'react-intl' import { injectIntl, defineMessages } from 'react-intl'
import { me } from '../initial_state' import { me } from '../initial_state'
import getSortBy from '../utils/group_sort_by' import getSortBy from '../utils/group_sort_by'
import { connectGroupStream } from '../actions/streaming'
import { import {
clearTimeline, clearTimeline,
expandGroupTimeline, expandGroupTimeline,
@ -46,10 +45,6 @@ class GroupTimeline extends ImmutablePureComponent {
this.props.onExpandGroupFeaturedTimeline(groupId) this.props.onExpandGroupFeaturedTimeline(groupId)
this.props.onExpandGroupTimeline(groupId, { sortBy, onlyMedia }) this.props.onExpandGroupTimeline(groupId, { sortBy, onlyMedia })
if (!!me) {
this.disconnect = this.props.onConnectGroupStream(groupId)
}
} }
} }
@ -67,13 +62,6 @@ class GroupTimeline extends ImmutablePureComponent {
} }
} }
componentWillUnmount() {
if (this.disconnect && !!me) {
this.disconnect()
this.disconnect = null
}
}
handleLoadMore = (maxId) => { handleLoadMore = (maxId) => {
const { const {
groupId, groupId,
@ -142,9 +130,6 @@ const mapStateToProps = (state, props) => ({
}) })
const mapDispatchToProps = (dispatch) => ({ const mapDispatchToProps = (dispatch) => ({
onConnectGroupStream(groupId) {
dispatch(connectGroupStream(groupId))
},
onClearTimeline(timelineId) { onClearTimeline(timelineId) {
dispatch(clearTimeline(timelineId)) dispatch(clearTimeline(timelineId))
}, },
@ -167,7 +152,6 @@ GroupTimeline.propTypes = {
]), ]),
groupId: PropTypes.string, groupId: PropTypes.string,
intl: PropTypes.object.isRequired, intl: PropTypes.object.isRequired,
onConnectGroupStream: PropTypes.func.isRequired,
onClearTimeline: PropTypes.func.isRequired, onClearTimeline: PropTypes.func.isRequired,
onExpandGroupTimeline: PropTypes.func.isRequired, onExpandGroupTimeline: PropTypes.func.isRequired,
onExpandGroupFeaturedTimeline: PropTypes.func.isRequired, onExpandGroupFeaturedTimeline: PropTypes.func.isRequired,

View File

@ -4,13 +4,10 @@ import { connect } from 'react-redux'
import { FormattedMessage } from 'react-intl' import { FormattedMessage } from 'react-intl'
import isEqual from 'lodash.isequal' import isEqual from 'lodash.isequal'
import { expandHashtagTimeline, clearTimeline } from '../actions/timelines' import { expandHashtagTimeline, clearTimeline } from '../actions/timelines'
import { connectHashtagStream } from '../actions/streaming'
import StatusList from '../components/status_list' import StatusList from '../components/status_list'
class HashtagTimeline extends React.PureComponent { class HashtagTimeline extends React.PureComponent {
disconnects = [];
title = () => { title = () => {
const title = [this.props.params.id] const title = [this.props.params.id]
@ -66,31 +63,10 @@ class HashtagTimeline extends React.PureComponent {
} }
} }
_subscribe (dispatch, id, tags = {}) {
const any = (tags.any || []).map(tag => tag.value)
const all = (tags.all || []).map(tag => tag.value)
const none = (tags.none || []).map(tag => tag.value);
[id, ...any].map(tag => {
this.disconnects.push(dispatch(connectHashtagStream(id, tag, status => {
const tags = status.tags.map(tag => tag.name)
return all.filter(tag => tags.includes(tag)).length === all.length &&
none.filter(tag => tags.includes(tag)).length === 0
})))
})
}
_unsubscribe () {
this.disconnects.map(disconnect => disconnect())
this.disconnects = []
}
componentDidMount () { componentDidMount () {
const { dispatch } = this.props const { dispatch } = this.props
const { id, tags } = this.props.params const { id, tags } = this.props.params
this._subscribe(dispatch, id, tags)
dispatch(expandHashtagTimeline(id, { tags })) dispatch(expandHashtagTimeline(id, { tags }))
} }
@ -99,18 +75,12 @@ class HashtagTimeline extends React.PureComponent {
const { id, tags } = nextProps.params const { id, tags } = nextProps.params
if (id !== params.id || !isEqual(tags, params.tags)) { if (id !== params.id || !isEqual(tags, params.tags)) {
this._unsubscribe()
this._subscribe(dispatch, id, tags)
this.props.dispatch(clearTimeline(`hashtag:${id}`)) this.props.dispatch(clearTimeline(`hashtag:${id}`))
this.props.dispatch(expandHashtagTimeline(id, { tags })) this.props.dispatch(expandHashtagTimeline(id, { tags }))
} }
} }
componentWillUnmount () { handleLoadMore = (maxId) => {
this._unsubscribe()
}
handleLoadMore = maxId => {
const { id, tags } = this.props.params const { id, tags } = this.props.params
this.props.dispatch(expandHashtagTimeline(id, { maxId, tags })) this.props.dispatch(expandHashtagTimeline(id, { maxId, tags }))
} }

View File

@ -4,7 +4,6 @@ import { connect } from 'react-redux'
import ImmutablePropTypes from 'react-immutable-proptypes' import ImmutablePropTypes from 'react-immutable-proptypes'
import ImmutablePureComponent from 'react-immutable-pure-component' import ImmutablePureComponent from 'react-immutable-pure-component'
import { FormattedMessage } from 'react-intl' import { FormattedMessage } from 'react-intl'
import { connectLinkStream } from '../actions/streaming'
import { expandLinkTimeline } from '../actions/timelines' import { expandLinkTimeline } from '../actions/timelines'
import { fetchLinkCard } from '../actions/links' import { fetchLinkCard } from '../actions/links'
import { openModal } from '../actions/modal' import { openModal } from '../actions/modal'
@ -25,13 +24,8 @@ class LinkTimeline extends ImmutablePureComponent {
this.handleConnect(this.props.params.id) this.handleConnect(this.props.params.id)
} }
componentWillUnmount() {
this.handleDisconnect()
}
componentWillReceiveProps(nextProps) { componentWillReceiveProps(nextProps) {
if (nextProps.params.id !== this.props.params.id) { if (nextProps.params.id !== this.props.params.id) {
this.handleDisconnect()
this.handleConnect(nextProps.params.id) this.handleConnect(nextProps.params.id)
} }
} }
@ -41,15 +35,6 @@ class LinkTimeline extends ImmutablePureComponent {
dispatch(fetchLinkCard(id)) dispatch(fetchLinkCard(id))
dispatch(expandLinkTimeline(id)) dispatch(expandLinkTimeline(id))
this.disconnect = dispatch(connectLinkStream(id))
}
handleDisconnect() {
if (this.disconnect) {
this.disconnect()
this.disconnect = null
}
} }
handleLoadMore = (maxId) => { handleLoadMore = (maxId) => {

View File

@ -16,13 +16,10 @@ class FanOutOnWriteService < BaseService
deliver_to_self(status) if status.account.local? deliver_to_self(status) if status.account.local?
deliver_to_followers(status) deliver_to_followers(status)
deliver_to_lists(status) deliver_to_lists(status)
# deliver_to_group(status)
end end
return if status.account.silenced? || !status.public_visibility? || status.reblog? return if status.account.silenced? || !status.public_visibility? || status.reblog?
deliver_to_hashtags(status)
return if status.reply? && status.in_reply_to_account_id != status.account_id return if status.reply? && status.in_reply_to_account_id != status.account_id
@ -48,16 +45,6 @@ class FanOutOnWriteService < BaseService
end end
end end
def deliver_to_group_members(status)
# Rails.logger.debug "Delivering status #{status.id} to group members #{status.group.id}"
# status.group.accounts_for_local_distribution.select(:id).reorder(nil).find_in_batches do |members|
# FeedInsertWorker.push_bulk(members) do |member|
# [status.id, member.id, :home]
# end
# end
end
def deliver_to_lists(status) def deliver_to_lists(status)
Rails.logger.debug "Delivering status #{status.id} to lists" Rails.logger.debug "Delivering status #{status.id} to lists"
@ -68,16 +55,6 @@ class FanOutOnWriteService < BaseService
end end
end end
def deliver_to_group(status)
# return if status.group_id.nil?
# Rails.logger.debug "Delivering status #{status.id} to group"
# Redis.current.publish("timeline:group:#{status.group_id}", @payload)
# deliver_to_group_members(status)
end
def deliver_to_mentioned_followers(status) def deliver_to_mentioned_followers(status)
Rails.logger.debug "Delivering status #{status.id} to limited followers" Rails.logger.debug "Delivering status #{status.id} to limited followers"
@ -91,15 +68,6 @@ class FanOutOnWriteService < BaseService
@payload = Oj.dump(event: :update, payload: @payload) @payload = Oj.dump(event: :update, payload: @payload)
end end
def deliver_to_hashtags(status)
Rails.logger.debug "Delivering status #{status.id} to hashtags"
status.tags.pluck(:name).each do |hashtag|
Redis.current.publish("timeline:hashtag:#{hashtag}", @payload)
Redis.current.publish("timeline:hashtag:#{hashtag}:local", @payload) if status.local?
end
end
def deliver_to_pro(status) def deliver_to_pro(status)
Rails.logger.debug "Delivering status #{status.id} to pro timeline" Rails.logger.debug "Delivering status #{status.id} to pro timeline"

View File

@ -266,10 +266,7 @@ const startWorker = (workerId) => {
accountFromToken(token, allowedScopes, req, next); accountFromToken(token, allowedScopes, req, next);
}; };
const PUBLIC_STREAMS = [ const PUBLIC_STREAMS = [];
'hashtag',
'hashtag:local',
];
const wsVerifyClient = (info, cb) => { const wsVerifyClient = (info, cb) => {
const location = url.parse(info.req.url, true); const location = url.parse(info.req.url, true);
@ -295,10 +292,7 @@ const startWorker = (workerId) => {
}, authRequired, allowedScopes); }, authRequired, allowedScopes);
}; };
const PUBLIC_ENDPOINTS = [ const PUBLIC_ENDPOINTS = [];
'/api/v1/streaming/hashtag',
'/api/v1/streaming/hashtag/local',
];
const authenticationMiddleware = (req, res, next) => { const authenticationMiddleware = (req, res, next) => {
if (req.method === 'OPTIONS') { if (req.method === 'OPTIONS') {
@ -542,33 +536,6 @@ const startWorker = (workerId) => {
streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req), false, true); streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req), false, true);
}); });
app.get('/api/v1/streaming/direct', (req, res) => {
const channel = `timeline:direct:${req.accountId}`;
streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)), true);
});
app.get('/api/v1/streaming/hashtag', (req, res) => {
const { tag } = req.query;
if (!tag || tag.length === 0) {
httpNotFound(res);
return;
}
streamFrom(`timeline:hashtag:${tag.toLowerCase()}`, req, streamToHttp(req, res), streamHttpEnd(req), true);
});
app.get('/api/v1/streaming/hashtag/local', (req, res) => {
const { tag } = req.query;
if (!tag || tag.length === 0) {
httpNotFound(res);
return;
}
streamFrom(`timeline:hashtag:${tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true);
});
app.get('/api/v1/streaming/list', (req, res) => { app.get('/api/v1/streaming/list', (req, res) => {
const listId = req.query.list; const listId = req.query.list;
@ -611,34 +578,6 @@ const startWorker = (workerId) => {
case 'user:notification': case 'user:notification':
streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), false, true); streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), false, true);
break; break;
case 'direct':
channel = `timeline:direct:${req.accountId}`;
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)), true);
break;
case 'hashtag':
if (!location.query.tag || location.query.tag.length === 0) {
ws.close();
return;
}
streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'link':
if (!location.query.linkId || location.query.linkId.length === 0) {
ws.close();
return;
}
streamFrom(`timeline:link:${location.query.linkId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'hashtag:local':
if (!location.query.tag || location.query.tag.length === 0) {
ws.close();
return;
}
streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}:local`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'list': case 'list':
const listId = location.query.list; const listId = location.query.list;
@ -652,20 +591,6 @@ const startWorker = (workerId) => {
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel))); streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
}); });
break; break;
case 'group':
const groupId = location.query.group;
authorizeGroupAccess(groupId, req, authorized => {
if (!authorized) {
ws.close();
return;
}
channel = `timeline:group:${groupId}`;
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
});
break;
default: default:
ws.close(); ws.close();
} }