summaryrefslogtreecommitdiffstats
path: root/app
diff options
context:
space:
mode:
authorClaire <claire.github-309c@sitedethib.com>2021-04-21 04:46:09 +0200
committerGitHub <noreply@github.com>2021-04-21 04:46:09 +0200
commit0b36e3419d4c4ce175f9db266ef5b3a49a9b3974 (patch)
tree2aa9992c320c0bb23c493e6d1ee84d9bc2589bb4 /app
parent2c322addf378d17b3962b545572a43cc9d36e526 (diff)
Fix processing of remote Delete activities (#16084)
* Add tests * Ensure deleted statuses are marked as such * Save some redis memory by not storing URIs in delete_upon_arrival values * Avoid possible race condition when processing incoming Deletes * Avoid potential duplicate Delete forwards * Lower lock durations to reduce issues in case of hard crash of the Rails process * Check for `lock.aquired?` and improve comment * Refactor RedisLock usage in app/lib/activitypub * Fix using incorrect or non-existent sender for relaying Deletes
Diffstat (limited to 'app')
-rw-r--r--app/lib/activitypub/activity.rb14
-rw-r--r--app/lib/activitypub/activity/announce.rb36
-rw-r--r--app/lib/activitypub/activity/create.rb36
-rw-r--r--app/lib/activitypub/activity/delete.rb62
-rw-r--r--app/services/remove_status_service.rb2
5 files changed, 71 insertions, 79 deletions
diff --git a/app/lib/activitypub/activity.rb b/app/lib/activitypub/activity.rb
index 2b5d3ffc29c..3baee4ca465 100644
--- a/app/lib/activitypub/activity.rb
+++ b/app/lib/activitypub/activity.rb
@@ -144,7 +144,7 @@ class ActivityPub::Activity
end
def delete_later!(uri)
- redis.setex("delete_upon_arrival:#{@account.id}:#{uri}", 6.hours.seconds, uri)
+ redis.setex("delete_upon_arrival:#{@account.id}:#{uri}", 6.hours.seconds, true)
end
def status_from_object
@@ -210,12 +210,22 @@ class ActivityPub::Activity
end
end
- def lock_or_return(key, expire_after = 7.days.seconds)
+ def lock_or_return(key, expire_after = 2.hours.seconds)
yield if redis.set(key, true, nx: true, ex: expire_after)
ensure
redis.del(key)
end
+ def lock_or_fail(key)
+ RedisLock.acquire({ redis: Redis.current, key: key }) do |lock|
+ if lock.acquired?
+ yield
+ else
+ raise Mastodon::RaceConditionError
+ end
+ end
+ end
+
def fetch?
!@options[:delivery]
end
diff --git a/app/lib/activitypub/activity/announce.rb b/app/lib/activitypub/activity/announce.rb
index ae8b2db754e..a1081522eb1 100644
--- a/app/lib/activitypub/activity/announce.rb
+++ b/app/lib/activitypub/activity/announce.rb
@@ -4,29 +4,25 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity
def perform
return reject_payload! if delete_arrived_first?(@json['id']) || !related_to_local_activity?
- RedisLock.acquire(lock_options) do |lock|
- if lock.acquired?
- original_status = status_from_object
+ lock_or_fail("announce:#{@object['id']}") do
+ original_status = status_from_object
- return reject_payload! if original_status.nil? || !announceable?(original_status)
+ return reject_payload! if original_status.nil? || !announceable?(original_status)
- @status = Status.find_by(account: @account, reblog: original_status)
+ @status = Status.find_by(account: @account, reblog: original_status)
- return @status unless @status.nil?
+ return @status unless @status.nil?
- @status = Status.create!(
- account: @account,
- reblog: original_status,
- uri: @json['id'],
- created_at: @json['published'],
- override_timestamps: @options[:override_timestamps],
- visibility: visibility_from_audience
- )
+ @status = Status.create!(
+ account: @account,
+ reblog: original_status,
+ uri: @json['id'],
+ created_at: @json['published'],
+ override_timestamps: @options[:override_timestamps],
+ visibility: visibility_from_audience
+ )
- distribute(@status)
- else
- raise Mastodon::RaceConditionError
- end
+ distribute(@status)
end
@status
@@ -69,8 +65,4 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity
def reblog_of_local_status?
status_from_uri(object_uri)&.account&.local?
end
-
- def lock_options
- { redis: Redis.current, key: "announce:#{@object['id']}" }
- end
end
diff --git a/app/lib/activitypub/activity/create.rb b/app/lib/activitypub/activity/create.rb
index 9f6dd9ce027..c7a655d9db4 100644
--- a/app/lib/activitypub/activity/create.rb
+++ b/app/lib/activitypub/activity/create.rb
@@ -45,19 +45,15 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
def create_status
return reject_payload! if unsupported_object_type? || invalid_origin?(object_uri) || tombstone_exists? || !related_to_local_activity?
- RedisLock.acquire(lock_options) do |lock|
- if lock.acquired?
- return if delete_arrived_first?(object_uri) || poll_vote? # rubocop:disable Lint/NonLocalExitFromIterator
+ lock_or_fail("create:#{object_uri}") do
+ return if delete_arrived_first?(object_uri) || poll_vote? # rubocop:disable Lint/NonLocalExitFromIterator
- @status = find_existing_status
+ @status = find_existing_status
- if @status.nil?
- process_status
- elsif @options[:delivered_to_account_id].present?
- postprocess_audience_and_deliver
- end
- else
- raise Mastodon::RaceConditionError
+ if @status.nil?
+ process_status
+ elsif @options[:delivered_to_account_id].present?
+ postprocess_audience_and_deliver
end
end
@@ -313,13 +309,9 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
poll = replied_to_status.preloadable_poll
already_voted = true
- RedisLock.acquire(poll_lock_options) do |lock|
- if lock.acquired?
- already_voted = poll.votes.where(account: @account).exists?
- poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri)
- else
- raise Mastodon::RaceConditionError
- end
+ lock_or_fail("vote:#{replied_to_status.poll_id}:#{@account.id}") do
+ already_voted = poll.votes.where(account: @account).exists?
+ poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri)
end
increment_voters_count! unless already_voted
@@ -508,12 +500,4 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
poll.reload
retry
end
-
- def lock_options
- { redis: Redis.current, key: "create:#{object_uri}" }
- end
-
- def poll_lock_options
- { redis: Redis.current, key: "vote:#{replied_to_status.poll_id}:#{@account.id}" }
- end
end
diff --git a/app/lib/activitypub/activity/delete.rb b/app/lib/activitypub/activity/delete.rb
index 2e5293b832a..801647cf74a 100644
--- a/app/lib/activitypub/activity/delete.rb
+++ b/app/lib/activitypub/activity/delete.rb
@@ -20,33 +20,35 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity
def delete_note
return if object_uri.nil?
- unless invalid_origin?(object_uri)
- RedisLock.acquire(lock_options) { |_lock| delete_later!(object_uri) }
- Tombstone.find_or_create_by(uri: object_uri, account: @account)
- end
+ lock_or_return("delete_status_in_progress:#{object_uri}", 5.minutes.seconds) do
+ unless invalid_origin?(object_uri)
+ # This lock ensures a concurrent `ActivityPub::Activity::Create` either
+ # does not create a status at all, or has finished saving it to the
+ # database before we try to load it.
+ # Without the lock, `delete_later!` could be called after `delete_arrived_first?`
+ # and `Status.find` before `Status.create!`
+ lock_or_fail("create:#{object_uri}") { delete_later!(object_uri) }
- @status = Status.find_by(uri: object_uri, account: @account)
- @status ||= Status.find_by(uri: @object['atomUri'], account: @account) if @object.is_a?(Hash) && @object['atomUri'].present?
+ Tombstone.find_or_create_by(uri: object_uri, account: @account)
+ end
- return if @status.nil?
+ @status = Status.find_by(uri: object_uri, account: @account)
+ @status ||= Status.find_by(uri: @object['atomUri'], account: @account) if @object.is_a?(Hash) && @object['atomUri'].present?
- if @status.distributable?
- forward_for_reply
- forward_for_reblogs
- end
+ return if @status.nil?
- delete_now!
+ forward! if @json['signature'].present? && @status.distributable?
+ delete_now!
+ end
end
- def forward_for_reblogs
- return if @json['signature'].blank?
-
- rebloggers_ids = @status.reblogs.includes(:account).references(:account).merge(Account.local).pluck(:account_id)
- inboxes = Account.where(id: ::Follow.where(target_account_id: rebloggers_ids).select(:account_id)).inboxes - [@account.preferred_inbox_url]
+ def rebloggers_ids
+ return @rebloggers_ids if defined?(@rebloggers_ids)
+ @rebloggers_ids = @status.reblogs.includes(:account).references(:account).merge(Account.local).pluck(:account_id)
+ end
- ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes) do |inbox_url|
- [payload, rebloggers_ids.first, inbox_url]
- end
+ def inboxes_for_reblogs
+ Account.where(id: ::Follow.where(target_account_id: rebloggers_ids).select(:account_id)).inboxes
end
def replied_to_status
@@ -58,13 +60,19 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity
!replied_to_status.nil? && replied_to_status.account.local?
end
- def forward_for_reply
- return unless @json['signature'].present? && reply_to_local?
+ def inboxes_for_reply
+ replied_to_status.account.followers.inboxes
+ end
+
+ def forward!
+ inboxes = inboxes_for_reblogs
+ inboxes += inboxes_for_reply if reply_to_local?
+ inboxes -= [@account.preferred_inbox_url]
- inboxes = replied_to_status.account.followers.inboxes - [@account.preferred_inbox_url]
+ sender_id = reply_to_local? ? replied_to_status.account_id : rebloggers_ids.first
- ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes) do |inbox_url|
- [payload, replied_to_status.account_id, inbox_url]
+ ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes.uniq) do |inbox_url|
+ [payload, sender_id, inbox_url]
end
end
@@ -75,8 +83,4 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity
def payload
@payload ||= Oj.dump(@json)
end
-
- def lock_options
- { redis: Redis.current, key: "create:#{object_uri}" }
- end
end
diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb
index 2c2a2664156..6e4d6e72ab2 100644
--- a/app/services/remove_status_service.rb
+++ b/app/services/remove_status_service.rb
@@ -16,6 +16,8 @@ class RemoveStatusService < BaseService
@account = status.account
@options = options
+ @status.discard
+
RedisLock.acquire(lock_options) do |lock|
if lock.acquired?
remove_from_self if @account.local?