Gab Social. All are welcome.
This commit is contained in:
60
app/workers/activitypub/delivery_worker.rb
Normal file
60
app/workers/activitypub/delivery_worker.rb
Normal file
@@ -0,0 +1,60 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::DeliveryWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
STOPLIGHT_FAILURE_THRESHOLD = 10
|
||||
STOPLIGHT_COOLDOWN = 60
|
||||
|
||||
sidekiq_options queue: 'push', retry: 16, dead: false
|
||||
|
||||
HEADERS = { 'Content-Type' => 'application/activity+json' }.freeze
|
||||
|
||||
def perform(json, source_account_id, inbox_url, options = {})
|
||||
return if DeliveryFailureTracker.unavailable?(inbox_url)
|
||||
|
||||
@options = options.with_indifferent_access
|
||||
@json = json
|
||||
@source_account = Account.find(source_account_id)
|
||||
@inbox_url = inbox_url
|
||||
|
||||
perform_request
|
||||
|
||||
failure_tracker.track_success!
|
||||
rescue => e
|
||||
failure_tracker.track_failure!
|
||||
raise e.class, "Delivery failed for #{inbox_url}: #{e.message}", e.backtrace[0]
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def build_request
|
||||
request = Request.new(:post, @inbox_url, body: @json)
|
||||
request.on_behalf_of(@source_account, :uri, sign_with: @options[:sign_with])
|
||||
request.add_headers(HEADERS)
|
||||
end
|
||||
|
||||
def perform_request
|
||||
light = Stoplight(@inbox_url) do
|
||||
build_request.perform do |response|
|
||||
raise GabSocial::UnexpectedResponseError, response unless response_successful?(response) || response_error_unsalvageable?(response)
|
||||
end
|
||||
end
|
||||
|
||||
light.with_threshold(STOPLIGHT_FAILURE_THRESHOLD)
|
||||
.with_cool_off_time(STOPLIGHT_COOLDOWN)
|
||||
.run
|
||||
end
|
||||
|
||||
def response_successful?(response)
|
||||
(200...300).cover?(response.code)
|
||||
end
|
||||
|
||||
def response_error_unsalvageable?(response)
|
||||
(400...500).cover?(response.code) && ![401, 408, 429].include?(response.code)
|
||||
end
|
||||
|
||||
def failure_tracker
|
||||
@failure_tracker ||= DeliveryFailureTracker.new(@inbox_url)
|
||||
end
|
||||
end
|
||||
65
app/workers/activitypub/distribute_poll_update_worker.rb
Normal file
65
app/workers/activitypub/distribute_poll_update_worker.rb
Normal file
@@ -0,0 +1,65 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::DistributePollUpdateWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'push', unique: :until_executed, retry: 0
|
||||
|
||||
def perform(status_id)
|
||||
@status = Status.find(status_id)
|
||||
@account = @status.account
|
||||
|
||||
return unless @status.preloadable_poll
|
||||
|
||||
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
|
||||
[payload, @account.id, inbox_url]
|
||||
end
|
||||
|
||||
relay! if relayable?
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def relayable?
|
||||
@status.public_visibility?
|
||||
end
|
||||
|
||||
def inboxes
|
||||
return @inboxes if defined?(@inboxes)
|
||||
|
||||
@inboxes = [@status.mentions, @status.reblogs, @status.preloadable_poll.votes].flat_map do |relation|
|
||||
relation.includes(:account).map do |record|
|
||||
record.account.preferred_inbox_url if !record.account.local? && record.account.activitypub?
|
||||
end
|
||||
end
|
||||
|
||||
@inboxes.concat(@account.followers.inboxes) unless @status.direct_visibility?
|
||||
@inboxes.uniq!
|
||||
@inboxes.compact!
|
||||
@inboxes
|
||||
end
|
||||
|
||||
def signed_payload
|
||||
Oj.dump(ActivityPub::LinkedDataSignature.new(unsigned_payload).sign!(@account))
|
||||
end
|
||||
|
||||
def unsigned_payload
|
||||
ActiveModelSerializers::SerializableResource.new(
|
||||
@status,
|
||||
serializer: ActivityPub::UpdatePollSerializer,
|
||||
adapter: ActivityPub::Adapter
|
||||
).as_json
|
||||
end
|
||||
|
||||
def payload
|
||||
@payload ||= @status.distributable? ? signed_payload : Oj.dump(unsigned_payload)
|
||||
end
|
||||
|
||||
def relay!
|
||||
ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url|
|
||||
[payload, @account.id, inbox_url]
|
||||
end
|
||||
end
|
||||
end
|
||||
65
app/workers/activitypub/distribution_worker.rb
Normal file
65
app/workers/activitypub/distribution_worker.rb
Normal file
@@ -0,0 +1,65 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::DistributionWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'push'
|
||||
|
||||
def perform(status_id)
|
||||
@status = Status.find(status_id)
|
||||
@account = @status.account
|
||||
|
||||
return if skip_distribution?
|
||||
|
||||
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
|
||||
[payload, @account.id, inbox_url]
|
||||
end
|
||||
|
||||
relay! if relayable?
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def skip_distribution?
|
||||
@status.direct_visibility? || @status.limited_visibility?
|
||||
end
|
||||
|
||||
def relayable?
|
||||
@status.public_visibility?
|
||||
end
|
||||
|
||||
def inboxes
|
||||
# Deliver the status to all followers.
|
||||
# If the status is a reply to another local status, also forward it to that
|
||||
# status' authors' followers.
|
||||
@inboxes ||= if @status.reply? && @status.thread.account.local? && @status.distributable?
|
||||
@account.followers.or(@status.thread.account.followers).inboxes
|
||||
else
|
||||
@account.followers.inboxes
|
||||
end
|
||||
end
|
||||
|
||||
def signed_payload
|
||||
Oj.dump(ActivityPub::LinkedDataSignature.new(unsigned_payload).sign!(@account))
|
||||
end
|
||||
|
||||
def unsigned_payload
|
||||
ActiveModelSerializers::SerializableResource.new(
|
||||
@status,
|
||||
serializer: ActivityPub::ActivitySerializer,
|
||||
adapter: ActivityPub::Adapter
|
||||
).as_json
|
||||
end
|
||||
|
||||
def payload
|
||||
@payload ||= @status.distributable? ? signed_payload : Oj.dump(unsigned_payload)
|
||||
end
|
||||
|
||||
def relay!
|
||||
ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url|
|
||||
[payload, @account.id, inbox_url]
|
||||
end
|
||||
end
|
||||
end
|
||||
14
app/workers/activitypub/fetch_replies_worker.rb
Normal file
14
app/workers/activitypub/fetch_replies_worker.rb
Normal file
@@ -0,0 +1,14 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::FetchRepliesWorker
|
||||
include Sidekiq::Worker
|
||||
include ExponentialBackoff
|
||||
|
||||
sidekiq_options queue: 'pull', retry: 3
|
||||
|
||||
def perform(parent_status_id, replies_uri)
|
||||
ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri)
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
end
|
||||
5
app/workers/activitypub/low_priority_delivery_worker.rb
Normal file
5
app/workers/activitypub/low_priority_delivery_worker.rb
Normal file
@@ -0,0 +1,5 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::LowPriorityDeliveryWorker < ActivityPub::DeliveryWorker
|
||||
sidekiq_options queue: 'pull', retry: 8, dead: false
|
||||
end
|
||||
15
app/workers/activitypub/post_upgrade_worker.rb
Normal file
15
app/workers/activitypub/post_upgrade_worker.rb
Normal file
@@ -0,0 +1,15 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::PostUpgradeWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'pull'
|
||||
|
||||
def perform(domain)
|
||||
Account.where(domain: domain)
|
||||
.where(protocol: :ostatus)
|
||||
.where.not(last_webfingered_at: nil)
|
||||
.in_batches
|
||||
.update_all(last_webfingered_at: nil)
|
||||
end
|
||||
end
|
||||
13
app/workers/activitypub/processing_worker.rb
Normal file
13
app/workers/activitypub/processing_worker.rb
Normal file
@@ -0,0 +1,13 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::ProcessingWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options backtrace: true
|
||||
|
||||
def perform(account_id, body, delivered_to_account_id = nil)
|
||||
ActivityPub::ProcessCollectionService.new.call(body, Account.find(account_id), override_timestamps: true, delivered_to_account_id: delivered_to_account_id, delivery: true)
|
||||
rescue ActiveRecord::RecordInvalid => e
|
||||
Rails.logger.debug "Error processing incoming ActivityPub object: #{e}"
|
||||
end
|
||||
end
|
||||
23
app/workers/activitypub/raw_distribution_worker.rb
Normal file
23
app/workers/activitypub/raw_distribution_worker.rb
Normal file
@@ -0,0 +1,23 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::RawDistributionWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'push'
|
||||
|
||||
def perform(json, source_account_id, exclude_inboxes = [])
|
||||
@account = Account.find(source_account_id)
|
||||
|
||||
ActivityPub::DeliveryWorker.push_bulk(inboxes - exclude_inboxes) do |inbox_url|
|
||||
[json, @account.id, inbox_url]
|
||||
end
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def inboxes
|
||||
@inboxes ||= @account.followers.inboxes
|
||||
end
|
||||
end
|
||||
45
app/workers/activitypub/reply_distribution_worker.rb
Normal file
45
app/workers/activitypub/reply_distribution_worker.rb
Normal file
@@ -0,0 +1,45 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
# Obsolete but kept around to make sure existing jobs do not fail after upgrade.
|
||||
# Should be removed in a subsequent release.
|
||||
|
||||
class ActivityPub::ReplyDistributionWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'push'
|
||||
|
||||
def perform(status_id)
|
||||
@status = Status.find(status_id)
|
||||
@account = @status.thread&.account
|
||||
|
||||
return unless @account.present? && @status.distributable?
|
||||
|
||||
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
|
||||
[payload, @status.account_id, inbox_url]
|
||||
end
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def inboxes
|
||||
@inboxes ||= @account.followers.inboxes
|
||||
end
|
||||
|
||||
def signed_payload
|
||||
Oj.dump(ActivityPub::LinkedDataSignature.new(unsigned_payload).sign!(@status.account))
|
||||
end
|
||||
|
||||
def unsigned_payload
|
||||
ActiveModelSerializers::SerializableResource.new(
|
||||
@status,
|
||||
serializer: ActivityPub::ActivitySerializer,
|
||||
adapter: ActivityPub::Adapter
|
||||
).as_json
|
||||
end
|
||||
|
||||
def payload
|
||||
@payload ||= @status.distributable? ? signed_payload : Oj.dump(unsigned_payload)
|
||||
end
|
||||
end
|
||||
@@ -0,0 +1,13 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::SynchronizeFeaturedCollectionWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'pull', unique: :until_executed
|
||||
|
||||
def perform(account_id)
|
||||
ActivityPub::FetchFeaturedCollectionService.new.call(Account.find(account_id))
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
end
|
||||
40
app/workers/activitypub/update_distribution_worker.rb
Normal file
40
app/workers/activitypub/update_distribution_worker.rb
Normal file
@@ -0,0 +1,40 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ActivityPub::UpdateDistributionWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'push'
|
||||
|
||||
def perform(account_id, options = {})
|
||||
@options = options.with_indifferent_access
|
||||
@account = Account.find(account_id)
|
||||
|
||||
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
|
||||
[signed_payload, @account.id, inbox_url]
|
||||
end
|
||||
|
||||
ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url|
|
||||
[signed_payload, @account.id, inbox_url]
|
||||
end
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def inboxes
|
||||
@inboxes ||= @account.followers.inboxes
|
||||
end
|
||||
|
||||
def signed_payload
|
||||
@signed_payload ||= Oj.dump(ActivityPub::LinkedDataSignature.new(payload).sign!(@account, sign_with: @options[:sign_with]))
|
||||
end
|
||||
|
||||
def payload
|
||||
@payload ||= ActiveModelSerializers::SerializableResource.new(
|
||||
@account,
|
||||
serializer: ActivityPub::UpdateSerializer,
|
||||
adapter: ActivityPub::Adapter
|
||||
).as_json
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user