Commiting
This commit is contained in:
@@ -1,60 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::DeliveryWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
STOPLIGHT_FAILURE_THRESHOLD = 10
|
||||
STOPLIGHT_COOLDOWN = 60
|
||||
|
||||
sidekiq_options queue: 'push', retry: 0, dead: true
|
||||
|
||||
HEADERS = { 'Content-Type' => 'application/activity+json' }.freeze
|
||||
|
||||
def perform(json, source_account_id, inbox_url, options = {})
|
||||
return if DeliveryFailureTracker.unavailable?(inbox_url)
|
||||
|
||||
@options = options.with_indifferent_access
|
||||
@json = json
|
||||
@source_account = Account.find(source_account_id)
|
||||
@inbox_url = inbox_url
|
||||
|
||||
perform_request
|
||||
|
||||
failure_tracker.track_success!
|
||||
rescue => e
|
||||
failure_tracker.track_failure!
|
||||
raise e.class, "Delivery failed for #{inbox_url}: #{e.message}", e.backtrace[0]
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def build_request
|
||||
request = Request.new(:post, @inbox_url, body: @json)
|
||||
request.on_behalf_of(@source_account, :uri, sign_with: @options[:sign_with])
|
||||
request.add_headers(HEADERS)
|
||||
end
|
||||
|
||||
def perform_request
|
||||
light = Stoplight(@inbox_url) do
|
||||
build_request.perform do |response|
|
||||
raise GabSocial::UnexpectedResponseError, response unless response_successful?(response) || response_error_unsalvageable?(response)
|
||||
end
|
||||
end
|
||||
|
||||
light.with_threshold(STOPLIGHT_FAILURE_THRESHOLD)
|
||||
.with_cool_off_time(STOPLIGHT_COOLDOWN)
|
||||
.run
|
||||
end
|
||||
|
||||
def response_successful?(response)
|
||||
(200...300).cover?(response.code)
|
||||
end
|
||||
|
||||
def response_error_unsalvageable?(response)
|
||||
(400...500).cover?(response.code) && ![401, 408, 429].include?(response.code)
|
||||
end
|
||||
|
||||
def failure_tracker
|
||||
@failure_tracker ||= DeliveryFailureTracker.new(@inbox_url)
|
||||
end
|
||||
end
|
||||
@@ -1,65 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::DistributePollUpdateWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'push', unique: :until_executed, retry: 0
|
||||
|
||||
def perform(status_id)
|
||||
@status = Status.find(status_id)
|
||||
@account = @status.account
|
||||
|
||||
return unless @status.preloadable_poll
|
||||
|
||||
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
|
||||
[payload, @account.id, inbox_url]
|
||||
end
|
||||
|
||||
relay! if relayable?
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def relayable?
|
||||
@status.public_visibility?
|
||||
end
|
||||
|
||||
def inboxes
|
||||
return @inboxes if defined?(@inboxes)
|
||||
|
||||
@inboxes = [@status.mentions, @status.reblogs, @status.preloadable_poll.votes].flat_map do |relation|
|
||||
relation.includes(:account).map do |record|
|
||||
record.account.preferred_inbox_url if !record.account.local? && record.account.activitypub?
|
||||
end
|
||||
end
|
||||
|
||||
@inboxes.concat(@account.followers.inboxes) unless @status.direct_visibility?
|
||||
@inboxes.uniq!
|
||||
@inboxes.compact!
|
||||
@inboxes
|
||||
end
|
||||
|
||||
def signed_payload
|
||||
Oj.dump(ActivityPub::LinkedDataSignature.new(unsigned_payload).sign!(@account))
|
||||
end
|
||||
|
||||
def unsigned_payload
|
||||
ActiveModelSerializers::SerializableResource.new(
|
||||
@status,
|
||||
serializer: ActivityPub::UpdatePollSerializer,
|
||||
adapter: ActivityPub::Adapter
|
||||
).as_json
|
||||
end
|
||||
|
||||
def payload
|
||||
@payload ||= @status.distributable? ? signed_payload : Oj.dump(unsigned_payload)
|
||||
end
|
||||
|
||||
def relay!
|
||||
ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url|
|
||||
[payload, @account.id, inbox_url]
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,65 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::DistributionWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'push', retry: 0, dead: true
|
||||
|
||||
def perform(status_id)
|
||||
@status = Status.find(status_id)
|
||||
@account = @status.account
|
||||
|
||||
return if skip_distribution?
|
||||
|
||||
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
|
||||
[payload, @account.id, inbox_url]
|
||||
end
|
||||
|
||||
relay! if relayable?
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def skip_distribution?
|
||||
@status.direct_visibility? || @status.limited_visibility?
|
||||
end
|
||||
|
||||
def relayable?
|
||||
@status.public_visibility?
|
||||
end
|
||||
|
||||
def inboxes
|
||||
# Deliver the status to all followers.
|
||||
# If the status is a reply to another local status, also forward it to that
|
||||
# status' authors' followers.
|
||||
@inboxes ||= if @status.reply? && @status.thread.account.local? && @status.distributable?
|
||||
@account.followers.or(@status.thread.account.followers).inboxes
|
||||
else
|
||||
@account.followers.inboxes
|
||||
end
|
||||
end
|
||||
|
||||
def signed_payload
|
||||
Oj.dump(ActivityPub::LinkedDataSignature.new(unsigned_payload).sign!(@account))
|
||||
end
|
||||
|
||||
def unsigned_payload
|
||||
ActiveModelSerializers::SerializableResource.new(
|
||||
@status,
|
||||
serializer: ActivityPub::ActivitySerializer,
|
||||
adapter: ActivityPub::Adapter
|
||||
).as_json
|
||||
end
|
||||
|
||||
def payload
|
||||
@payload ||= @status.distributable? ? signed_payload : Oj.dump(unsigned_payload)
|
||||
end
|
||||
|
||||
def relay!
|
||||
ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url|
|
||||
[payload, @account.id, inbox_url]
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,14 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::FetchRepliesWorker
|
||||
include Sidekiq::Worker
|
||||
include ExponentialBackoff
|
||||
|
||||
sidekiq_options queue: 'pull', retry: 3
|
||||
|
||||
def perform(parent_status_id, replies_uri)
|
||||
ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri)
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
end
|
||||
@@ -1,5 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::LowPriorityDeliveryWorker < ActivityPub::DeliveryWorker
|
||||
sidekiq_options queue: 'pull', retry: 0, dead: true
|
||||
end
|
||||
@@ -1,15 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::PostUpgradeWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'pull'
|
||||
|
||||
def perform(domain)
|
||||
Account.where(domain: domain)
|
||||
.where(protocol: :ostatus)
|
||||
.where.not(last_webfingered_at: nil)
|
||||
.in_batches
|
||||
.update_all(last_webfingered_at: nil)
|
||||
end
|
||||
end
|
||||
@@ -1,13 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::ProcessingWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options backtrace: true, retry: 0, dead: true
|
||||
|
||||
def perform(account_id, body, delivered_to_account_id = nil)
|
||||
ActivityPub::ProcessCollectionService.new.call(body, Account.find(account_id), override_timestamps: true, delivered_to_account_id: delivered_to_account_id, delivery: true)
|
||||
rescue ActiveRecord::RecordInvalid => e
|
||||
Rails.logger.debug "Error processing incoming ActivityPub object: #{e}"
|
||||
end
|
||||
end
|
||||
@@ -1,23 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::RawDistributionWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'push'
|
||||
|
||||
def perform(json, source_account_id, exclude_inboxes = [])
|
||||
@account = Account.find(source_account_id)
|
||||
|
||||
ActivityPub::DeliveryWorker.push_bulk(inboxes - exclude_inboxes) do |inbox_url|
|
||||
[json, @account.id, inbox_url]
|
||||
end
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def inboxes
|
||||
@inboxes ||= @account.followers.inboxes
|
||||
end
|
||||
end
|
||||
@@ -1,45 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
# Obsolete but kept around to make sure existing jobs do not fail after upgrade.
|
||||
# Should be removed in a subsequent release.
|
||||
|
||||
class ActivityPub::ReplyDistributionWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'push'
|
||||
|
||||
def perform(status_id)
|
||||
@status = Status.find(status_id)
|
||||
@account = @status.thread&.account
|
||||
|
||||
return unless @account.present? && @status.distributable?
|
||||
|
||||
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
|
||||
[payload, @status.account_id, inbox_url]
|
||||
end
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def inboxes
|
||||
@inboxes ||= @account.followers.inboxes
|
||||
end
|
||||
|
||||
def signed_payload
|
||||
Oj.dump(ActivityPub::LinkedDataSignature.new(unsigned_payload).sign!(@status.account))
|
||||
end
|
||||
|
||||
def unsigned_payload
|
||||
ActiveModelSerializers::SerializableResource.new(
|
||||
@status,
|
||||
serializer: ActivityPub::ActivitySerializer,
|
||||
adapter: ActivityPub::Adapter
|
||||
).as_json
|
||||
end
|
||||
|
||||
def payload
|
||||
@payload ||= @status.distributable? ? signed_payload : Oj.dump(unsigned_payload)
|
||||
end
|
||||
end
|
||||
@@ -1,13 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::SynchronizeFeaturedCollectionWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'pull', unique: :until_executed, retry: 0, dead: true
|
||||
|
||||
def perform(account_id)
|
||||
ActivityPub::FetchFeaturedCollectionService.new.call(Account.find(account_id))
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
end
|
||||
@@ -1,40 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::UpdateDistributionWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'push', retry: 0, dead: true
|
||||
|
||||
def perform(account_id, options = {})
|
||||
@options = options.with_indifferent_access
|
||||
@account = Account.find(account_id)
|
||||
|
||||
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
|
||||
[signed_payload, @account.id, inbox_url]
|
||||
end
|
||||
|
||||
ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url|
|
||||
[signed_payload, @account.id, inbox_url]
|
||||
end
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def inboxes
|
||||
@inboxes ||= @account.followers.inboxes
|
||||
end
|
||||
|
||||
def signed_payload
|
||||
@signed_payload ||= Oj.dump(ActivityPub::LinkedDataSignature.new(payload).sign!(@account, sign_with: @options[:sign_with]))
|
||||
end
|
||||
|
||||
def payload
|
||||
@payload ||= ActiveModelSerializers::SerializableResource.new(
|
||||
@account,
|
||||
serializer: ActivityPub::UpdateSerializer,
|
||||
adapter: ActivityPub::Adapter
|
||||
).as_json
|
||||
end
|
||||
end
|
||||
@@ -1,11 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class AfterAccountDomainBlockWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
def perform(account_id, domain)
|
||||
AfterBlockDomainFromAccountService.new.call(Account.find(account_id), domain)
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
end
|
||||
@@ -1,31 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class AfterRemoteFollowRequestWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'pull', retry: 5
|
||||
|
||||
attr_reader :follow_request
|
||||
|
||||
def perform(follow_request_id)
|
||||
@follow_request = FollowRequest.find(follow_request_id)
|
||||
process_follow_service if processing_required?
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def process_follow_service
|
||||
follow_request.destroy
|
||||
FollowService.new.call(follow_request.account, updated_account.acct)
|
||||
end
|
||||
|
||||
def processing_required?
|
||||
!updated_account.nil? && !updated_account.locked?
|
||||
end
|
||||
|
||||
def updated_account
|
||||
@_updated_account ||= FetchRemoteAccountService.new.call(follow_request.target_account.remote_url)
|
||||
end
|
||||
end
|
||||
@@ -1,31 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class AfterRemoteFollowWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'pull', retry: 5
|
||||
|
||||
attr_reader :follow
|
||||
|
||||
def perform(follow_id)
|
||||
@follow = Follow.find(follow_id)
|
||||
process_follow_service if processing_required?
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def process_follow_service
|
||||
follow.destroy
|
||||
FollowService.new.call(follow.account, updated_account.acct)
|
||||
end
|
||||
|
||||
def updated_account
|
||||
@_updated_account ||= FetchRemoteAccountService.new.call(follow.target_account.remote_url)
|
||||
end
|
||||
|
||||
def processing_required?
|
||||
!updated_account.nil? && updated_account.locked?
|
||||
end
|
||||
end
|
||||
@@ -1,11 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module ExponentialBackoff
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
included do
|
||||
sidekiq_retry_in do |count|
|
||||
15 + 10 * (count**4) + rand(10 * (count**4))
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,11 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class DomainBlockWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
def perform(domain_block_id)
|
||||
BlockDomainService.new.call(DomainBlock.find(domain_block_id))
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
end
|
||||
@@ -10,9 +10,6 @@ class FeedInsertWorker
|
||||
case @type
|
||||
when :home
|
||||
@follower = Account.find(id)
|
||||
when :list
|
||||
@list = List.find(id)
|
||||
@follower = @list.account
|
||||
end
|
||||
|
||||
check_and_insert
|
||||
@@ -36,8 +33,6 @@ class FeedInsertWorker
|
||||
case @type
|
||||
when :home
|
||||
FeedManager.instance.push_to_home(@follower, @status)
|
||||
when :list
|
||||
FeedManager.instance.push_to_list(@list, @status)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class FetchReplyWorker
|
||||
include Sidekiq::Worker
|
||||
include ExponentialBackoff
|
||||
|
||||
sidekiq_options queue: 'pull', retry: 3
|
||||
|
||||
def perform(child_url)
|
||||
FetchRemoteStatusService.new.call(child_url)
|
||||
end
|
||||
end
|
||||
@@ -1,32 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class Import::RelationshipWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'pull', retry: 8, dead: false
|
||||
|
||||
def perform(account_id, target_account_uri, relationship, options = {})
|
||||
from_account = Account.find(account_id)
|
||||
target_account = ResolveAccountService.new.call(target_account_uri)
|
||||
options.symbolize_keys!
|
||||
|
||||
return if target_account.nil?
|
||||
|
||||
case relationship
|
||||
when 'follow'
|
||||
FollowService.new.call(from_account, target_account, options)
|
||||
when 'unfollow'
|
||||
UnfollowService.new.call(from_account, target_account)
|
||||
when 'block'
|
||||
BlockService.new.call(from_account, target_account)
|
||||
when 'unblock'
|
||||
UnblockService.new.call(from_account, target_account)
|
||||
when 'mute'
|
||||
MuteService.new.call(from_account, target_account, options)
|
||||
when 'unmute'
|
||||
UnmuteService.new.call(from_account, target_account)
|
||||
end
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
end
|
||||
@@ -1,14 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ImportWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'pull', retry: false
|
||||
|
||||
def perform(import_id)
|
||||
import = Import.find(import_id)
|
||||
ImportService.new.call(import)
|
||||
ensure
|
||||
import&.destroy
|
||||
end
|
||||
end
|
||||
@@ -5,7 +5,5 @@ class NotificationWorker
|
||||
|
||||
sidekiq_options queue: 'push', retry: 5
|
||||
|
||||
def perform(xml, source_account_id, target_account_id)
|
||||
SendInteractionService.new.call(xml, Account.find(source_account_id), Account.find(target_account_id))
|
||||
end
|
||||
def perform(xml, source_account_id, target_account_id); end
|
||||
end
|
||||
|
||||
@@ -10,7 +10,6 @@ class PollExpirationNotifyWorker
|
||||
|
||||
# Notify poll owner and remote voters
|
||||
if poll.local?
|
||||
ActivityPub::DistributePollUpdateWorker.perform_async(poll.status.id)
|
||||
NotifyService.new.call(poll.account, poll)
|
||||
end
|
||||
|
||||
|
||||
@@ -1,11 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ProcessingWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options backtrace: true
|
||||
|
||||
def perform(account_id, body)
|
||||
ProcessFeedService.new.call(body, Account.find(account_id), override_timestamps: true)
|
||||
end
|
||||
end
|
||||
@@ -1,82 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class Pubsubhubbub::ConfirmationWorker
|
||||
include Sidekiq::Worker
|
||||
include RoutingHelper
|
||||
|
||||
sidekiq_options queue: 'push', retry: false
|
||||
|
||||
attr_reader :subscription, :mode, :secret, :lease_seconds
|
||||
|
||||
def perform(subscription_id, mode, secret = nil, lease_seconds = nil)
|
||||
@subscription = Subscription.find(subscription_id)
|
||||
@mode = mode
|
||||
@secret = secret
|
||||
@lease_seconds = lease_seconds
|
||||
process_confirmation
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def process_confirmation
|
||||
prepare_subscription
|
||||
|
||||
callback_get_with_params
|
||||
logger.debug "Confirming PuSH subscription for #{subscription.callback_url} with challenge #{challenge}: #{@callback_response_body}"
|
||||
|
||||
update_subscription
|
||||
end
|
||||
|
||||
def update_subscription
|
||||
if successful_subscribe?
|
||||
subscription.save!
|
||||
elsif successful_unsubscribe?
|
||||
subscription.destroy!
|
||||
end
|
||||
end
|
||||
|
||||
def successful_subscribe?
|
||||
subscribing? && response_matches_challenge?
|
||||
end
|
||||
|
||||
def successful_unsubscribe?
|
||||
(unsubscribing? && response_matches_challenge?) || !subscription.confirmed?
|
||||
end
|
||||
|
||||
def response_matches_challenge?
|
||||
@callback_response_body == challenge
|
||||
end
|
||||
|
||||
def subscribing?
|
||||
mode == 'subscribe'
|
||||
end
|
||||
|
||||
def unsubscribing?
|
||||
mode == 'unsubscribe'
|
||||
end
|
||||
|
||||
def callback_get_with_params
|
||||
Request.new(:get, subscription.callback_url, params: callback_params).perform do |response|
|
||||
@callback_response_body = response.body_with_limit
|
||||
end
|
||||
end
|
||||
|
||||
def callback_params
|
||||
{
|
||||
'hub.topic': account_url(subscription.account, format: :atom),
|
||||
'hub.mode': mode,
|
||||
'hub.challenge': challenge,
|
||||
'hub.lease_seconds': subscription.lease_seconds,
|
||||
}
|
||||
end
|
||||
|
||||
def prepare_subscription
|
||||
subscription.secret = secret
|
||||
subscription.lease_seconds = lease_seconds
|
||||
subscription.confirmed = true
|
||||
end
|
||||
|
||||
def challenge
|
||||
@_challenge ||= SecureRandom.hex
|
||||
end
|
||||
end
|
||||
@@ -1,81 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class Pubsubhubbub::DeliveryWorker
|
||||
include Sidekiq::Worker
|
||||
include RoutingHelper
|
||||
|
||||
sidekiq_options queue: 'push', retry: 3, dead: false
|
||||
|
||||
sidekiq_retry_in do |count|
|
||||
5 * (count + 1)
|
||||
end
|
||||
|
||||
attr_reader :subscription, :payload
|
||||
|
||||
def perform(subscription_id, payload)
|
||||
@subscription = Subscription.find(subscription_id)
|
||||
@payload = payload
|
||||
process_delivery unless blocked_domain?
|
||||
rescue => e
|
||||
raise e.class, "Delivery failed for #{subscription&.callback_url}: #{e.message}", e.backtrace[0]
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def process_delivery
|
||||
callback_post_payload do |payload_delivery|
|
||||
raise GabSocial::UnexpectedResponseError, payload_delivery unless response_successful? payload_delivery
|
||||
end
|
||||
|
||||
subscription.touch(:last_successful_delivery_at)
|
||||
end
|
||||
|
||||
def callback_post_payload(&block)
|
||||
request = Request.new(:post, subscription.callback_url, body: payload)
|
||||
request.add_headers(headers)
|
||||
request.perform(&block)
|
||||
end
|
||||
|
||||
def blocked_domain?
|
||||
DomainBlock.blocked?(host)
|
||||
end
|
||||
|
||||
def host
|
||||
Addressable::URI.parse(subscription.callback_url).normalized_host
|
||||
end
|
||||
|
||||
def headers
|
||||
{
|
||||
'Content-Type' => 'application/atom+xml',
|
||||
'Link' => link_header,
|
||||
}.merge(signature_headers.to_h)
|
||||
end
|
||||
|
||||
def link_header
|
||||
LinkHeader.new([hub_link_header, self_link_header]).to_s
|
||||
end
|
||||
|
||||
def hub_link_header
|
||||
[api_push_url, [%w(rel hub)]]
|
||||
end
|
||||
|
||||
def self_link_header
|
||||
[account_url(subscription.account, format: :atom), [%w(rel self)]]
|
||||
end
|
||||
|
||||
def signature_headers
|
||||
{ 'X-Hub-Signature' => payload_signature } if subscription.secret?
|
||||
end
|
||||
|
||||
def payload_signature
|
||||
"sha1=#{hmac_payload_digest}"
|
||||
end
|
||||
|
||||
def hmac_payload_digest
|
||||
OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), subscription.secret, payload)
|
||||
end
|
||||
|
||||
def response_successful?(payload_delivery)
|
||||
payload_delivery.code > 199 && payload_delivery.code < 300
|
||||
end
|
||||
end
|
||||
@@ -1,32 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class Pubsubhubbub::DistributionWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'push'
|
||||
|
||||
def perform(stream_entry_ids)
|
||||
stream_entries = StreamEntry.where(id: stream_entry_ids).includes(:status).reject { |e| e.status.nil? || e.status.hidden? }
|
||||
|
||||
return if stream_entries.empty?
|
||||
|
||||
@account = stream_entries.first.account
|
||||
@subscriptions = active_subscriptions.to_a
|
||||
|
||||
distribute_public!(stream_entries)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def distribute_public!(stream_entries)
|
||||
@payload = OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.feed(@account, stream_entries))
|
||||
|
||||
Pubsubhubbub::DeliveryWorker.push_bulk(@subscriptions) do |subscription_id|
|
||||
[subscription_id, @payload]
|
||||
end
|
||||
end
|
||||
|
||||
def active_subscriptions
|
||||
Subscription.where(account: @account).active.pluck(:id)
|
||||
end
|
||||
end
|
||||
@@ -1,22 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class Pubsubhubbub::RawDistributionWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'push'
|
||||
|
||||
def perform(xml, source_account_id)
|
||||
@account = Account.find(source_account_id)
|
||||
@subscriptions = active_subscriptions.to_a
|
||||
|
||||
Pubsubhubbub::DeliveryWorker.push_bulk(@subscriptions) do |subscription|
|
||||
[subscription.id, xml]
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def active_subscriptions
|
||||
Subscription.where(account: @account).active.select('id, callback_url, domain')
|
||||
end
|
||||
end
|
||||
@@ -1,34 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class Pubsubhubbub::SubscribeWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'push', retry: 10, unique: :until_executed, dead: false
|
||||
|
||||
sidekiq_retry_in do |count|
|
||||
case count
|
||||
when 0
|
||||
30.minutes.seconds
|
||||
when 1
|
||||
2.hours.seconds
|
||||
when 2
|
||||
12.hours.seconds
|
||||
else
|
||||
24.hours.seconds * (count - 2)
|
||||
end
|
||||
end
|
||||
|
||||
sidekiq_retries_exhausted do |msg, _e|
|
||||
account = Account.find(msg['args'].first)
|
||||
Sidekiq.logger.error "PuSH subscription attempts for #{account.acct} exhausted. Unsubscribing"
|
||||
::UnsubscribeService.new.call(account)
|
||||
end
|
||||
|
||||
def perform(account_id)
|
||||
account = Account.find(account_id)
|
||||
logger.debug "PuSH re-subscribing to #{account.acct}"
|
||||
::SubscribeService.new.call(account)
|
||||
rescue => e
|
||||
raise e.class, "Subscribe failed for #{account&.acct}: #{e.message}", e.backtrace[0]
|
||||
end
|
||||
end
|
||||
@@ -1,15 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class Pubsubhubbub::UnsubscribeWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'push', retry: false, unique: :until_executed, dead: false
|
||||
|
||||
def perform(account_id)
|
||||
account = Account.find(account_id)
|
||||
logger.debug "PuSH unsubscribing from #{account.acct}"
|
||||
::UnsubscribeService.new.call(account)
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
end
|
||||
@@ -1,15 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class PushConversationWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
def perform(conversation_account_id)
|
||||
conversation = AccountConversation.find(conversation_account_id)
|
||||
message = InlineRenderer.render(conversation, conversation.account, :conversation)
|
||||
timeline_id = "timeline:direct:#{conversation.account_id}"
|
||||
Redis.current.publish(timeline_id, Oj.dump(event: :conversation, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i))
|
||||
true
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
end
|
||||
@@ -1,24 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class RefollowWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'pull', retry: false
|
||||
|
||||
def perform(target_account_id)
|
||||
target_account = Account.find(target_account_id)
|
||||
return unless target_account.protocol == :activitypub
|
||||
|
||||
target_account.followers.where(domain: nil).reorder(nil).find_each do |follower|
|
||||
# Locally unfollow remote account
|
||||
follower.unfollow!(target_account)
|
||||
|
||||
# Schedule re-follow
|
||||
begin
|
||||
FollowService.new.call(follower, target_account)
|
||||
rescue GabSocial::NotPermittedError, ActiveRecord::RecordNotFound, GabSocial::UnexpectedResponseError, HTTP::Error, OpenSSL::SSL::SSLError
|
||||
next
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,13 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class RemoteProfileUpdateWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'pull'
|
||||
|
||||
def perform(account_id, body, resubscribe)
|
||||
UpdateRemoteProfileService.new.call(body, Account.find(account_id), resubscribe)
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
end
|
||||
@@ -1,11 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ResolveAccountWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'pull', unique: :until_executed
|
||||
|
||||
def perform(uri)
|
||||
ResolveAccountService.new.call(uri)
|
||||
end
|
||||
end
|
||||
@@ -1,13 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class SalmonWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options backtrace: true
|
||||
|
||||
def perform(account_id, body)
|
||||
ProcessInteractionService.new.call(body, Account.find(account_id))
|
||||
rescue Nokogiri::XML::XPath::SyntaxError, ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
end
|
||||
@@ -1,11 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class Scheduler::SubscriptionsCleanupScheduler
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options unique: :until_executed, retry: 0
|
||||
|
||||
def perform
|
||||
Subscription.expired.in_batches.delete_all
|
||||
end
|
||||
end
|
||||
@@ -1,17 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class Scheduler::SubscriptionsScheduler
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options unique: :until_executed, retry: 0
|
||||
|
||||
def perform
|
||||
Pubsubhubbub::SubscribeWorker.push_bulk(expiring_accounts.pluck(:id))
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def expiring_accounts
|
||||
Account.expiring(1.day.from_now).partitioned
|
||||
end
|
||||
end
|
||||
@@ -1,14 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class Scheduler::UserCleanupScheduler
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options unique: :until_executed, retry: 0
|
||||
|
||||
def perform
|
||||
User.where('confirmed_at is NULL AND confirmation_sent_at <= ?', 2.days.ago).reorder(nil).find_in_batches do |batch|
|
||||
Account.where(id: batch.map(&:account_id)).delete_all
|
||||
User.where(id: batch.map(&:id)).delete_all
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,18 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ThreadResolveWorker
|
||||
include Sidekiq::Worker
|
||||
include ExponentialBackoff
|
||||
|
||||
sidekiq_options queue: 'pull', retry: 3
|
||||
|
||||
def perform(child_status_id, parent_url)
|
||||
child_status = Status.find(child_status_id)
|
||||
parent_status = FetchRemoteStatusService.new.call(parent_url)
|
||||
|
||||
return if parent_status.nil?
|
||||
|
||||
child_status.thread = parent_status
|
||||
child_status.save!
|
||||
end
|
||||
end
|
||||
@@ -1,18 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class UnfollowFollowWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'pull'
|
||||
|
||||
def perform(follower_account_id, old_target_account_id, new_target_account_id)
|
||||
follower_account = Account.find(follower_account_id)
|
||||
old_target_account = Account.find(old_target_account_id)
|
||||
new_target_account = Account.find(new_target_account_id)
|
||||
|
||||
FollowService.new.call(follower_account, new_target_account)
|
||||
UnfollowService.new.call(follower_account, old_target_account)
|
||||
rescue ActiveRecord::RecordNotFound, GabSocial::NotPermittedError
|
||||
true
|
||||
end
|
||||
end
|
||||
@@ -1,20 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class VerifyAccountLinksWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'pull', retry: false, unique: :until_executed
|
||||
|
||||
def perform(account_id)
|
||||
account = Account.find(account_id)
|
||||
|
||||
account.fields.each do |field|
|
||||
next unless !field.verified? && field.verifiable?
|
||||
VerifyLinkService.new.call(field)
|
||||
end
|
||||
|
||||
account.save! if account.changed?
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user