summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEugen Rochko <eugen@zeonfederated.com>2017-08-13 00:44:41 +0200
committerGitHub <noreply@github.com>2017-08-13 00:44:41 +0200
commitb7370ac8baa643d93ea727699b3b11f9d3a55bea (patch)
tree869a8c2d44f78d96255ae0bf20a84c150ca23702
parentccdd5a9576819cdc95946d98fea0e3c8bbd1d626 (diff)
ActivityPub delivery (#4566)
* Deliver ActivityPub Like * Deliver ActivityPub Undo-Like * Deliver ActivityPub Create/Announce activities * Deliver ActivityPub creates from mentions * Deliver ActivityPub Block/Undo-Block * Deliver ActivityPub Accept/Reject-Follow * Deliver ActivityPub Undo-Follow * Deliver ActivityPub Follow * Deliver ActivityPub Delete activities Incidentally fix #889 * Adjust BatchedRemoveStatusService for ActivityPub * Add tests for ActivityPub workers * Add tests for FollowService * Add tests for FavouriteService, UnfollowService and PostStatusService * Add tests for ReblogService, BlockService, UnblockService, ProcessMentionsService * Add tests for AuthorizeFollowService, RejectFollowService, RemoveStatusService * Add tests for BatchedRemoveStatusService * Deliver updates to a local account to ActivityPub followers * Minor adjustments
-rw-r--r--app/controllers/api/v1/accounts/credentials_controller.rb3
-rw-r--r--app/controllers/settings/profiles_controller.rb1
-rw-r--r--app/lib/activitypub/activity.rb2
-rw-r--r--app/models/account.rb4
-rw-r--r--app/services/authorize_follow_service.rb19
-rw-r--r--app/services/batched_remove_status_service.rb43
-rw-r--r--app/services/block_service.rb19
-rw-r--r--app/services/favourite_service.rb28
-rw-r--r--app/services/follow_service.rb14
-rw-r--r--app/services/post_status_service.rb1
-rw-r--r--app/services/process_mentions_service.rb28
-rw-r--r--app/services/reblog_service.rb28
-rw-r--r--app/services/reject_follow_service.rb19
-rw-r--r--app/services/remove_status_service.rb49
-rw-r--r--app/services/unblock_service.rb19
-rw-r--r--app/services/unfavourite_service.rb22
-rw-r--r--app/services/unfollow_service.rb19
-rw-r--r--app/workers/activitypub/delivery_worker.rb37
-rw-r--r--app/workers/activitypub/distribution_worker.rb38
-rw-r--r--app/workers/activitypub/processing_worker.rb2
-rw-r--r--app/workers/activitypub/update_distribution_worker.rb31
-rw-r--r--spec/controllers/api/v1/accounts/credentials_controller_spec.rb6
-rw-r--r--spec/controllers/settings/profiles_controller_spec.rb2
-rw-r--r--spec/services/authorize_follow_service_spec.rb24
-rw-r--r--spec/services/batched_remove_status_service_spec.rb7
-rw-r--r--spec/services/block_service_spec.rb19
-rw-r--r--spec/services/favourite_service_spec.rb22
-rw-r--r--spec/services/follow_service_spec.rb25
-rw-r--r--spec/services/post_status_service_spec.rb8
-rw-r--r--spec/services/process_mentions_service_spec.rb46
-rw-r--r--spec/services/reblog_service_spec.rb49
-rw-r--r--spec/services/reject_follow_service_spec.rb24
-rw-r--r--spec/services/remove_status_service_spec.rb8
-rw-r--r--spec/services/resolve_remote_account_service_spec.rb66
-rw-r--r--spec/services/unblock_service_spec.rb22
-rw-r--r--spec/services/unfollow_service_spec.rb22
-rw-r--r--spec/workers/activitypub/delivery_worker_spec.rb23
-rw-r--r--spec/workers/activitypub/distribution_worker_spec.rb48
-rw-r--r--spec/workers/activitypub/processing_worker_spec.rb15
-rw-r--r--spec/workers/activitypub/thread_resolve_worker_spec.rb16
-rw-r--r--spec/workers/activitypub/update_distribution_worker_spec.rb20
41 files changed, 785 insertions, 113 deletions
diff --git a/app/controllers/api/v1/accounts/credentials_controller.rb b/app/controllers/api/v1/accounts/credentials_controller.rb
index 073808532a5..90a580c3362 100644
--- a/app/controllers/api/v1/accounts/credentials_controller.rb
+++ b/app/controllers/api/v1/accounts/credentials_controller.rb
@@ -10,8 +10,9 @@ class Api::V1::Accounts::CredentialsController < Api::BaseController
end
def update
- current_account.update!(account_params)
@account = current_account
+ @account.update!(account_params)
+ ActivityPub::UpdateDistributionWorker.perform_async(@account.id)
render json: @account, serializer: REST::CredentialAccountSerializer
end
diff --git a/app/controllers/settings/profiles_controller.rb b/app/controllers/settings/profiles_controller.rb
index 0367e359364..c751c64ae97 100644
--- a/app/controllers/settings/profiles_controller.rb
+++ b/app/controllers/settings/profiles_controller.rb
@@ -15,6 +15,7 @@ class Settings::ProfilesController < ApplicationController
def update
if @account.update(account_params)
+ ActivityPub::UpdateDistributionWorker.perform_async(@account.id)
redirect_to settings_profile_path, notice: I18n.t('generic.changes_saved_msg')
else
render :show
diff --git a/app/lib/activitypub/activity.rb b/app/lib/activitypub/activity.rb
index 5debe023a33..f8de8060c4e 100644
--- a/app/lib/activitypub/activity.rb
+++ b/app/lib/activitypub/activity.rb
@@ -93,7 +93,7 @@ class ActivityPub::Activity
end
def distribute_to_followers(status)
- DistributionWorker.perform_async(status.id)
+ ::DistributionWorker.perform_async(status.id)
end
def delete_arrived_first?(uri)
diff --git a/app/models/account.rb b/app/models/account.rb
index 163bd1c0eae..a7264353e75 100644
--- a/app/models/account.rb
+++ b/app/models/account.rb
@@ -171,6 +171,10 @@ class Account < ApplicationRecord
reorder(nil).pluck('distinct accounts.domain')
end
+ def inboxes
+ reorder(nil).where(protocol: :activitypub).pluck("distinct coalesce(nullif(accounts.shared_inbox_url, ''), accounts.inbox_url)")
+ end
+
def triadic_closures(account, limit: 5, offset: 0)
sql = <<-SQL.squish
WITH first_degree AS (
diff --git a/app/services/authorize_follow_service.rb b/app/services/authorize_follow_service.rb
index 41815a393e9..db35b6030bd 100644
--- a/app/services/authorize_follow_service.rb
+++ b/app/services/authorize_follow_service.rb
@@ -4,11 +4,28 @@ class AuthorizeFollowService < BaseService
def call(source_account, target_account)
follow_request = FollowRequest.find_by!(account: source_account, target_account: target_account)
follow_request.authorize!
- NotificationWorker.perform_async(build_xml(follow_request), target_account.id, source_account.id) unless source_account.local?
+ create_notification(follow_request) unless source_account.local?
+ follow_request
end
private
+ def create_notification(follow_request)
+ if follow_request.account.ostatus?
+ NotificationWorker.perform_async(build_xml(follow_request), follow_request.target_account_id, follow_request.account_id)
+ elsif follow_request.account.activitypub?
+ ActivityPub::DeliveryWorker.perform_async(build_json(follow_request), follow_request.target_account_id, follow_request.account.inbox_url)
+ end
+ end
+
+ def build_json(follow_request)
+ ActiveModelSerializers::SerializableResource.new(
+ follow_request,
+ serializer: ActivityPub::AcceptFollowSerializer,
+ adapter: ActivityPub::Adapter
+ ).to_json
+ end
+
def build_xml(follow_request)
OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.authorize_follow_request_salmon(follow_request))
end
diff --git a/app/services/batched_remove_status_service.rb b/app/services/batched_remove_status_service.rb
index ab810c628ed..e6c8c9208c8 100644
--- a/app/services/batched_remove_status_service.rb
+++ b/app/services/batched_remove_status_service.rb
@@ -15,9 +15,11 @@ class BatchedRemoveStatusService < BaseService
@mentions = statuses.map { |s| [s.id, s.mentions.includes(:account).to_a] }.to_h
@tags = statuses.map { |s| [s.id, s.tags.pluck(:name)] }.to_h
- @stream_entry_batches = []
- @salmon_batches = []
- @json_payloads = statuses.map { |s| [s.id, Oj.dump(event: :delete, payload: s.id)] }.to_h
+ @stream_entry_batches = []
+ @salmon_batches = []
+ @activity_json_batches = []
+ @json_payloads = statuses.map { |s| [s.id, Oj.dump(event: :delete, payload: s.id)] }.to_h
+ @activity_json = {}
# Ensure that rendered XML reflects destroyed state
Status.where(id: statuses.map(&:id)).in_batches.destroy_all
@@ -27,7 +29,11 @@ class BatchedRemoveStatusService < BaseService
account = account_statuses.first.account
unpush_from_home_timelines(account_statuses)
- batch_stream_entries(account_statuses) if account.local?
+
+ if account.local?
+ batch_stream_entries(account_statuses)
+ batch_activity_json(account, account_statuses)
+ end
end
# Cannot be batched
@@ -38,6 +44,7 @@ class BatchedRemoveStatusService < BaseService
Pubsubhubbub::DistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch }
NotificationWorker.push_bulk(@salmon_batches) { |batch| batch }
+ ActivityPub::DeliveryWorker.push_bulk(@activity_json_batches) { |batch| batch }
end
private
@@ -50,6 +57,22 @@ class BatchedRemoveStatusService < BaseService
end
end
+ def batch_activity_json(account, statuses)
+ account.followers.inboxes.each do |inbox_url|
+ statuses.each do |status|
+ @activity_json_batches << [build_json(status), account.id, inbox_url]
+ end
+ end
+
+ statuses.each do |status|
+ other_recipients = (status.mentions + status.reblogs).map(&:account).reject(&:local?).select(&:activitypub?).uniq(&:id)
+
+ other_recipients.each do |target_account|
+ @activity_json_batches << [build_json(status), account.id, target_account.inbox_url]
+ end
+ end
+ end
+
def unpush_from_home_timelines(statuses)
account = statuses.first.account
recipients = account.followers.local.pluck(:id)
@@ -79,7 +102,7 @@ class BatchedRemoveStatusService < BaseService
return if @mentions[status.id].empty?
payload = stream_entry_to_xml(status.stream_entry.reload)
- recipients = @mentions[status.id].map(&:account).reject(&:local?).uniq(&:domain).map(&:id)
+ recipients = @mentions[status.id].map(&:account).reject(&:local?).select(&:ostatus?).uniq(&:domain).map(&:id)
recipients.each do |recipient_id|
@salmon_batches << [payload, status.account_id, recipient_id]
@@ -111,4 +134,14 @@ class BatchedRemoveStatusService < BaseService
def redis
Redis.current
end
+
+ def build_json(status)
+ return @activity_json[status.id] if @activity_json.key?(status.id)
+
+ @activity_json[status.id] = ActiveModelSerializers::SerializableResource.new(
+ status,
+ serializer: ActivityPub::DeleteSerializer,
+ adapter: ActivityPub::Adapter
+ ).to_json
+ end
end
diff --git a/app/services/block_service.rb b/app/services/block_service.rb
index 5d7bf6a3bc2..f2253226b55 100644
--- a/app/services/block_service.rb
+++ b/app/services/block_service.rb
@@ -12,11 +12,28 @@ class BlockService < BaseService
block = account.block!(target_account)
BlockWorker.perform_async(account.id, target_account.id)
- NotificationWorker.perform_async(build_xml(block), account.id, target_account.id) unless target_account.local?
+ create_notification(block) unless target_account.local?
+ block
end
private
+ def create_notification(block)
+ if block.target_account.ostatus?
+ NotificationWorker.perform_async(build_xml(block), block.account_id, block.target_account_id)
+ elsif block.target_account.activitypub?
+ ActivityPub::DeliveryWorker.perform_async(build_json(block), block.account_id, block.target_account.inbox_url)
+ end
+ end
+
+ def build_json(block)
+ ActiveModelSerializers::SerializableResource.new(
+ block,
+ serializer: ActivityPub::BlockSerializer,
+ adapter: ActivityPub::Adapter
+ ).to_json
+ end
+
def build_xml(block)
OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.block_salmon(block))
end
diff --git a/app/services/favourite_service.rb b/app/services/favourite_service.rb
index 291f9e56ef1..4aa935170ec 100644
--- a/app/services/favourite_service.rb
+++ b/app/services/favourite_service.rb
@@ -15,18 +15,32 @@ class FavouriteService < BaseService
return favourite unless favourite.nil?
favourite = Favourite.create!(account: account, status: status)
-
- if status.local?
- NotifyService.new.call(favourite.status.account, favourite)
- else
- NotificationWorker.perform_async(build_xml(favourite), account.id, status.account_id)
- end
-
+ create_notification(favourite)
favourite
end
private
+ def create_notification(favourite)
+ status = favourite.status
+
+ if status.account.local?
+ NotifyService.new.call(status.account, favourite)
+ elsif status.account.ostatus?
+ NotificationWorker.perform_async(build_xml(favourite), favourite.account_id, status.account_id)
+ elsif status.account.activitypub?
+ ActivityPub::DeliveryWorker.perform_async(build_json(favourite), favourite.account_id, status.account.inbox_url)
+ end
+ end
+
+ def build_json(favourite)
+ ActiveModelSerializers::SerializableResource.new(
+ favourite,
+ serializer: ActivityPub::LikeSerializer,
+ adapter: ActivityPub::Adapter
+ ).to_json
+ end
+
def build_xml(favourite)
OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.favourite_salmon(favourite))
end
diff --git a/app/services/follow_service.rb b/app/services/follow_service.rb
index 3155feaa498..2be625cd858 100644
--- a/app/services/follow_service.rb
+++ b/app/services/follow_service.rb
@@ -14,7 +14,7 @@ class FollowService < BaseService
return if source_account.following?(target_account)
- if target_account.locked?
+ if target_account.locked? || target_account.activitypub?
request_follow(source_account, target_account)
else
direct_follow(source_account, target_account)
@@ -28,9 +28,11 @@ class FollowService < BaseService
if target_account.local?
NotifyService.new.call(target_account, follow_request)
- else
+ elsif target_account.ostatus?
NotificationWorker.perform_async(build_follow_request_xml(follow_request), source_account.id, target_account.id)
AfterRemoteFollowRequestWorker.perform_async(follow_request.id)
+ elsif target_account.activitypub?
+ ActivityPub::DeliveryWorker.perform_async(build_json(follow_request), source_account.id, target_account.inbox_url)
end
follow_request
@@ -63,4 +65,12 @@ class FollowService < BaseService
def build_follow_xml(follow)
OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.follow_salmon(follow))
end
+
+ def build_json(follow_request)
+ ActiveModelSerializers::SerializableResource.new(
+ follow_request,
+ serializer: ActivityPub::FollowSerializer,
+ adapter: ActivityPub::Adapter
+ ).to_json
+ end
end
diff --git a/app/services/post_status_service.rb b/app/services/post_status_service.rb
index 951a38e1957..5ff93f21ea8 100644
--- a/app/services/post_status_service.rb
+++ b/app/services/post_status_service.rb
@@ -39,6 +39,7 @@ class PostStatusService < BaseService
LinkCrawlWorker.perform_async(status.id) unless status.spoiler_text?
DistributionWorker.perform_async(status.id)
Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id)
+ ActivityPub::DistributionWorker.perform_async(status.id)
if options[:idempotency].present?
redis.setex("idempotency:status:#{account.id}:#{options[:idempotency]}", 3_600, status.id)
diff --git a/app/services/process_mentions_service.rb b/app/services/process_mentions_service.rb
index 438033d22b3..407fa8c182b 100644
--- a/app/services/process_mentions_service.rb
+++ b/app/services/process_mentions_service.rb
@@ -28,18 +28,32 @@ class ProcessMentionsService < BaseService
end
status.mentions.includes(:account).each do |mention|
- mentioned_account = mention.account
-
- if mentioned_account.local?
- NotifyService.new.call(mentioned_account, mention)
- else
- NotificationWorker.perform_async(stream_entry_to_xml(status.stream_entry), status.account_id, mentioned_account.id)
- end
+ create_notification(status, mention)
end
end
private
+ def create_notification(status, mention)
+ mentioned_account = mention.account
+
+ if mentioned_account.local?
+ NotifyService.new.call(mentioned_account, mention)
+ elsif mentioned_account.ostatus?
+ NotificationWorker.perform_async(stream_entry_to_xml(status.stream_entry), status.account_id, mentioned_account.id)
+ elsif mentioned_account.activitypub?
+ ActivityPub::DeliveryWorker.perform_async(build_json(mention.status), mention.status.account_id, mentioned_account.inbox_url)
+ end
+ end
+
+ def build_json(status)
+ ActiveModelSerializers::SerializableResource.new(
+ status,
+ serializer: ActivityPub::ActivitySerializer,
+ adapter: ActivityPub::Adapter
+ ).to_json
+ end
+
def follow_remote_account_service
@follow_remote_account_service ||= ResolveRemoteAccountService.new
end
diff --git a/app/services/reblog_service.rb b/app/services/reblog_service.rb
index ba24b1f9d80..7f886af7c20 100644
--- a/app/services/reblog_service.rb
+++ b/app/services/reblog_service.rb
@@ -21,13 +21,31 @@ class ReblogService < BaseService
DistributionWorker.perform_async(reblog.id)
Pubsubhubbub::DistributionWorker.perform_async(reblog.stream_entry.id)
+ ActivityPub::DistributionWorker.perform_async(reblog.id)
- if reblogged_status.local?
- NotifyService.new.call(reblog.reblog.account, reblog)
- else
- NotificationWorker.perform_async(stream_entry_to_xml(reblog.stream_entry), account.id, reblog.reblog.account_id)
+ create_notification(reblog)
+ reblog
+ end
+
+ private
+
+ def create_notification(reblog)
+ reblogged_status = reblog.reblog
+
+ if reblogged_status.account.local?
+ NotifyService.new.call(reblogged_status.account, reblog)
+ elsif reblogged_status.account.ostatus?
+ NotificationWorker.perform_async(stream_entry_to_xml(reblog.stream_entry), reblog.account_id, reblogged_status.account_id)
+ elsif reblogged_status.account.activitypub?
+ ActivityPub::DeliveryWorker.perform_async(build_json(reblog), reblog.account_id, reblogged_status.account.inbox_url)
end
+ end
- reblog
+ def build_json(reblog)
+ ActiveModelSerializers::SerializableResource.new(
+ reblog,
+ serializer: ActivityPub::ActivitySerializer,
+ adapter: ActivityPub::Adapter
+ ).to_json
end
end
diff --git a/app/services/reject_follow_service.rb b/app/services/reject_follow_service.rb
index fd7e66c2373..a91266aa4b6 100644
--- a/app/services/reject_follow_service.rb
+++ b/app/services/reject_follow_service.rb
@@ -4,11 +4,28 @@ class RejectFollowService < BaseService
def call(source_account, target_account)
follow_request = FollowRequest.find_by!(account: source_account, target_account: target_account)
follow_request.reject!
- NotificationWorker.perform_async(build_xml(follow_request), target_account.id, source_account.id) unless source_account.local?
+ create_notification(follow_request) unless source_account.local?
+ follow_request
end
private
+ def create_notification(follow_request)
+ if follow_request.account.ostatus?
+ NotificationWorker.perform_async(build_xml(follow_request), follow_request.target_account_id, follow_request.account_id)