gem 'concurrent-ruby', require: false
gem 'connection_pool', require: false
+
+gem 'xorcist', '~> 1.1'
+gem 'pluck_each', '~> 0.1.3'
pghero (2.7.2)
activerecord (>= 5)
pkg-config (1.4.4)
+ pluck_each (0.1.3)
+ activerecord (> 3.2.0)
+ activesupport (> 3.0.0)
posix-spawn (0.3.15)
premailer (1.14.2)
addressable
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.5)
wisper (2.0.1)
+ xorcist (1.1.2)
xpath (3.2.0)
nokogiri (~> 1.8)
pg (~> 1.2)
pghero (~> 2.7)
pkg-config (~> 1.4)
+ pluck_each (~> 0.1.3)
posix-spawn
premailer-rails
private_address_check (~> 0.5)
webmock (~> 3.9)
webpacker (~> 5.2)
webpush
+ xorcist (~> 1.1)
--- /dev/null
+# frozen_string_literal: true
+
+class ActivityPub::FollowersSynchronizationsController < ActivityPub::BaseController
+ include SignatureVerification
+ include AccountOwnedConcern
+
+ before_action :require_signature!
+ before_action :set_items
+ before_action :set_cache_headers
+
+ def show
+ expires_in 0, public: false
+ render json: collection_presenter,
+ serializer: ActivityPub::CollectionSerializer,
+ adapter: ActivityPub::Adapter,
+ content_type: 'application/activity+json'
+ end
+
+ private
+
+ def uri_prefix
+ signed_request_account.uri[/http(s?):\/\/[^\/]+\//]
+ end
+
+ def set_items
+ @items = @account.followers.where(Account.arel_table[:uri].matches(uri_prefix + '%', false, true)).pluck(:uri)
+ end
+
+ def collection_presenter
+ ActivityPub::CollectionPresenter.new(
+ id: account_followers_synchronization_url(@account),
+ type: :ordered,
+ items: @items
+ )
+ end
+end
def create
upgrade_account
+ process_collection_synchronization
process_payload
head 202
end
DeliveryFailureTracker.reset!(signed_request_account.inbox_url)
end
+ def process_collection_synchronization
+ raw_params = request.headers['Collection-Synchronization']
+ return if raw_params.blank? || ENV['DISABLE_FOLLOWERS_SYNCHRONIZATION'] == 'true'
+
+ # Re-using the syntax for signature parameters
+ tree = SignatureParamsParser.new.parse(raw_params)
+ params = SignatureParamsTransformer.new.apply(tree)
+
+ ActivityPub::PrepareFollowersSynchronizationService.new.call(signed_request_account, params)
+ rescue Parslet::ParseFailed
+ Rails.logger.warn 'Error parsing Collection-Synchronization header'
+ end
+
def process_payload
ActivityPub::ProcessingWorker.perform_async(signed_request_account.id, body, @account&.id)
end
end
end
+ def uri_for_username(username)
+ account_url(username: username)
+ end
+
def generate_uri_for(_target)
URI.join(root_url, 'payloads', SecureRandom.uuid)
end
shared_inbox_url.presence || inbox_url
end
+ def synchronization_uri_prefix
+ return 'local' if local?
+
+ @synchronization_uri_prefix ||= uri[/http(s?):\/\/[^\/]+\//]
+ end
+
class Field < ActiveModelSerializers::Model
attributes :name, :value, :verified_at, :account, :errors
.where('users.current_sign_in_at > ?', User::ACTIVE_DURATION.ago)
end
+ def remote_followers_hash(url_prefix)
+ Rails.cache.fetch("followers_hash:#{id}:#{url_prefix}") do
+ digest = "\x00" * 32
+ followers.where(Account.arel_table[:uri].matches(url_prefix + '%', false, true)).pluck_each(:uri) do |uri|
+ Xorcist.xor!(digest, Digest::SHA256.digest(uri))
+ end
+ digest.unpack('H*')[0]
+ end
+ end
+
+ def local_followers_hash
+ Rails.cache.fetch("followers_hash:#{id}:local") do
+ digest = "\x00" * 32
+ followers.where(domain: nil).pluck_each(:username) do |username|
+ Xorcist.xor!(digest, Digest::SHA256.digest(ActivityPub::TagManager.instance.uri_for_username(username)))
+ end
+ digest.unpack('H*')[0]
+ end
+ end
+
private
def remove_potential_friendship(other_account, mutual = false)
before_validation :set_uri, only: :create
after_create :increment_cache_counters
+ after_create :invalidate_hash_cache
after_destroy :remove_endorsements
after_destroy :decrement_cache_counters
+ after_destroy :invalidate_hash_cache
private
account&.decrement_count!(:following_count)
target_account&.decrement_count!(:followers_count)
end
+
+ def invalidate_hash_cache
+ return if account.local? && target_account.local?
+
+ Rails.cache.delete("followers_hash:#{target_account_id}:#{account.synchronization_uri_prefix}")
+ end
end
--- /dev/null
+# frozen_string_literal: true
+
+class ActivityPub::PrepareFollowersSynchronizationService < BaseService
+ include JsonLdHelper
+
+ def call(account, params)
+ @account = account
+
+ return if params['collectionId'] != @account.followers_url || invalid_origin?(params['url']) || @account.local_followers_hash == params['digest']
+
+ ActivityPub::FollowersSynchronizationWorker.perform_async(@account.id, params['url'])
+ end
+end
--- /dev/null
+# frozen_string_literal: true
+
+class ActivityPub::SynchronizeFollowersService < BaseService
+ include JsonLdHelper
+ include Payloadable
+
+ def call(account, partial_collection_url)
+ @account = account
+
+ items = collection_items(partial_collection_url)
+ return if items.nil?
+
+ # There could be unresolved accounts (hence the call to .compact) but this
+ # should never happen in practice, since in almost all cases we keep an
+ # Account record, and should we not do that, we should have sent a Delete.
+ # In any case there is not much we can do if that occurs.
+ @expected_followers = items.map { |uri| ActivityPub::TagManager.instance.uri_to_resource(uri, Account) }.compact
+
+ remove_unexpected_local_followers!
+ handle_unexpected_outgoing_follows!
+ end
+
+ private
+
+ def remove_unexpected_local_followers!
+ @account.followers.local.where.not(id: @expected_followers.map(&:id)).each do |unexpected_follower|
+ UnfollowService.new.call(unexpected_follower, @account)
+ end
+ end
+
+ def handle_unexpected_outgoing_follows!
+ @expected_followers.each do |expected_follower|
+ next if expected_follower.following?(@account)
+
+ if expected_follower.requested?(@account)
+ # For some reason the follow request went through but we missed it
+ expected_follower.follow_requests.find_by(target_account: @account)&.authorize!
+ else
+ # Since we were not aware of the follow from our side, we do not have an
+ # ID for it that we can include in the Undo activity. For this reason,
+ # the Undo may not work with software that relies exclusively on
+ # matching activity IDs and not the actor and target
+ follow = Follow.new(account: expected_follower, target_account: @account)
+ ActivityPub::DeliveryWorker.perform_async(build_undo_follow_json(follow), follow.account_id, follow.target_account.inbox_url)
+ end
+ end
+ end
+
+ def build_undo_follow_json(follow)
+ Oj.dump(serialize_payload(follow, ActivityPub::UndoFollowSerializer))
+ end
+
+ def collection_items(collection_or_uri)
+ collection = fetch_collection(collection_or_uri)
+ return unless collection.is_a?(Hash)
+
+ collection = fetch_collection(collection['first']) if collection['first'].present?
+ return unless collection.is_a?(Hash)
+
+ case collection['type']
+ when 'Collection', 'CollectionPage'
+ collection['items']
+ when 'OrderedCollection', 'OrderedCollectionPage'
+ collection['orderedItems']
+ end
+ end
+
+ def fetch_collection(collection_or_uri)
+ return collection_or_uri if collection_or_uri.is_a?(Hash)
+ return if invalid_origin?(collection_or_uri)
+
+ fetch_resource_without_id_validation(collection_or_uri, nil, true)
+ end
+end
class ActivityPub::DeliveryWorker
include Sidekiq::Worker
+ include RoutingHelper
include JsonLdHelper
STOPLIGHT_FAILURE_THRESHOLD = 10
Request.new(:post, @inbox_url, body: @json, http_client: http_client).tap do |request|
request.on_behalf_of(@source_account, :uri, sign_with: @options[:sign_with])
request.add_headers(HEADERS)
+ request.add_headers({ 'Collection-Synchronization' => synchronization_header }) if ENV['DISABLE_FOLLOWERS_SYNCHRONIZATION'] != 'true' && @options[:synchronize_followers]
end
end
+ def synchronization_header
+ "collectionId=\"#{account_followers_url(@source_account)}\", digest=\"#{@source_account.remote_followers_hash(inbox_url_prefix)}\", url=\"#{account_followers_synchronization_url(@source_account)}\""
+ end
+
+ def inbox_url_prefix
+ @inbox_url[/http(s?):\/\/[^\/]+\//]
+ end
+
def perform_request
light = Stoplight(@inbox_url) do
request_pool.with(@host) do |http_client|
return if skip_distribution?
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
- [payload, @account.id, inbox_url]
+ [payload, @account.id, inbox_url, { synchronize_followers: !@status.distributable? }]
end
relay! if relayable?
--- /dev/null
+# frozen_string_literal: true
+
+class ActivityPub::FollowersSynchronizationWorker
+ include Sidekiq::Worker
+
+ sidekiq_options queue: 'push', lock: :until_executed
+
+ def perform(account_id, url)
+ @account = Account.find_by(id: account_id)
+ return true if @account.nil?
+
+ ActivityPub::SynchronizeFollowersService.new.call(@account, url)
+ end
+end
resource :inbox, only: [:create], module: :activitypub
resource :claim, only: [:create], module: :activitypub
resources :collections, only: [:show], module: :activitypub
+ resource :followers_synchronization, only: [:show], module: :activitypub
end
resource :inbox, only: [:create], module: :activitypub
--- /dev/null
+require 'rails_helper'
+
+RSpec.describe ActivityPub::FollowersSynchronizationsController, type: :controller do
+ let!(:account) { Fabricate(:account) }
+ let!(:follower_1) { Fabricate(:account, domain: 'example.com', uri: 'https://example.com/users/a') }
+ let!(:follower_2) { Fabricate(:account, domain: 'example.com', uri: 'https://example.com/users/b') }
+ let!(:follower_3) { Fabricate(:account, domain: 'foo.com', uri: 'https://foo.com/users/a') }
+
+ before do
+ follower_1.follow!(account)
+ follower_2.follow!(account)
+ follower_3.follow!(account)
+ end
+
+ before do
+ allow(controller).to receive(:signed_request_account).and_return(remote_account)
+ end
+
+ describe 'GET #show' do
+ context 'without signature' do
+ let(:remote_account) { nil }
+
+ before do
+ get :show, params: { account_username: account.username }
+ end
+
+ it 'returns http not authorized' do
+ expect(response).to have_http_status(401)
+ end
+ end
+
+ context 'with signature from example.com' do
+ let(:remote_account) { Fabricate(:account, domain: 'example.com', uri: 'https://example.com/instance') }
+
+ before do
+ get :show, params: { account_username: account.username }
+ end
+
+ it 'returns http success' do
+ expect(response).to have_http_status(200)
+ end
+
+ it 'returns application/activity+json' do
+ expect(response.content_type).to eq 'application/activity+json'
+ end
+
+ it 'returns orderedItems with followers from example.com' do
+ json = body_as_json
+ expect(json[:orderedItems]).to be_an Array
+ expect(json[:orderedItems].sort).to eq [follower_1.uri, follower_2.uri]
+ end
+
+ it 'returns private Cache-Control header' do
+ expect(response.headers['Cache-Control']).to eq 'max-age=0, private'
+ end
+ end
+ end
+end
end
end
+ context 'with Collection-Synchronization header' do
+ let(:remote_account) { Fabricate(:account, followers_url: 'https://example.com/followers', domain: 'example.com', uri: 'https://example.com/actor', protocol: :activitypub) }
+ let(:synchronization_collection) { remote_account.followers_url }
+ let(:synchronization_url) { 'https://example.com/followers-for-domain' }
+ let(:synchronization_hash) { 'somehash' }
+ let(:synchronization_header) { "collectionId=\"#{synchronization_collection}\", digest=\"#{synchronization_hash}\", url=\"#{synchronization_url}\"" }
+
+ before do
+ allow(ActivityPub::FollowersSynchronizationWorker).to receive(:perform_async).and_return(nil)
+ allow_any_instance_of(Account).to receive(:local_followers_hash).and_return('somehash')
+
+ request.headers['Collection-Synchronization'] = synchronization_header
+ post :create, body: '{}'
+ end
+
+ context 'with mismatching target collection' do
+ let(:synchronization_collection) { 'https://example.com/followers2' }
+
+ it 'does not start a synchronization job' do
+ expect(ActivityPub::FollowersSynchronizationWorker).not_to have_received(:perform_async)
+ end
+ end
+
+ context 'with mismatching domain in partial collection attribute' do
+ let(:synchronization_url) { 'https://example.org/followers' }
+
+ it 'does not start a synchronization job' do
+ expect(ActivityPub::FollowersSynchronizationWorker).not_to have_received(:perform_async)
+ end
+ end
+
+ context 'with matching digest' do
+ it 'does not start a synchronization job' do
+ expect(ActivityPub::FollowersSynchronizationWorker).not_to have_received(:perform_async)
+ end
+ end
+
+ context 'with mismatching digest' do
+ let(:synchronization_hash) { 'wronghash' }
+
+ it 'starts a synchronization job' do
+ expect(ActivityPub::FollowersSynchronizationWorker).to have_received(:perform_async)
+ end
+ end
+
+ it 'returns http accepted' do
+ expect(response).to have_http_status(202)
+ end
+ end
+
context 'without signature' do
before do
post :create, body: '{}'
end
end
+ describe '#followers_hash' do
+ let(:me) { Fabricate(:account, username: 'Me') }
+ let(:remote_1) { Fabricate(:account, username: 'alice', domain: 'example.org', uri: 'https://example.org/users/alice') }
+ let(:remote_2) { Fabricate(:account, username: 'bob', domain: 'example.org', uri: 'https://example.org/users/bob') }
+ let(:remote_3) { Fabricate(:account, username: 'eve', domain: 'foo.org', uri: 'https://foo.org/users/eve') }
+
+ before do
+ remote_1.follow!(me)
+ remote_2.follow!(me)
+ remote_3.follow!(me)
+ me.follow!(remote_1)
+ end
+
+ context 'on a local user' do
+ it 'returns correct hash for remote domains' do
+ expect(me.remote_followers_hash('https://example.org/')).to eq '707962e297b7bd94468a21bc8e506a1bcea607a9142cd64e27c9b106b2a5f6ec'
+ expect(me.remote_followers_hash('https://foo.org/')).to eq 'ccb9c18a67134cfff9d62c7f7e7eb88e6b803446c244b84265565f4eba29df0e'
+ end
+
+ it 'invalidates cache as needed when removing or adding followers' do
+ expect(me.remote_followers_hash('https://example.org/')).to eq '707962e297b7bd94468a21bc8e506a1bcea607a9142cd64e27c9b106b2a5f6ec'
+ remote_1.unfollow!(me)
+ expect(me.remote_followers_hash('https://example.org/')).to eq '241b00794ce9b46aa864f3220afadef128318da2659782985bac5ed5bd436bff'
+ remote_1.follow!(me)
+ expect(me.remote_followers_hash('https://example.org/')).to eq '707962e297b7bd94468a21bc8e506a1bcea607a9142cd64e27c9b106b2a5f6ec'
+ end
+ end
+
+ context 'on a remote user' do
+ it 'returns correct hash for remote domains' do
+ expect(remote_1.local_followers_hash).to eq Digest::SHA256.hexdigest(ActivityPub::TagManager.instance.uri_for(me))
+ end
+
+ it 'invalidates cache as needed when removing or adding followers' do
+ expect(remote_1.local_followers_hash).to eq Digest::SHA256.hexdigest(ActivityPub::TagManager.instance.uri_for(me))
+ me.unfollow!(remote_1)
+ expect(remote_1.local_followers_hash).to eq '0000000000000000000000000000000000000000000000000000000000000000'
+ me.follow!(remote_1)
+ expect(remote_1.local_followers_hash).to eq Digest::SHA256.hexdigest(ActivityPub::TagManager.instance.uri_for(me))
+ end
+ end
+ end
+
describe 'muting an account' do
let(:me) { Fabricate(:account, username: 'Me') }
let(:you) { Fabricate(:account, username: 'You') }
--- /dev/null
+require 'rails_helper'
+
+RSpec.describe ActivityPub::SynchronizeFollowersService, type: :service do
+ let(:actor) { Fabricate(:account, domain: 'example.com', uri: 'http://example.com/account', inbox_url: 'http://example.com/inbox') }
+ let(:alice) { Fabricate(:account, username: 'alice') }
+ let(:bob) { Fabricate(:account, username: 'bob') }
+ let(:eve) { Fabricate(:account, username: 'eve') }
+ let(:mallory) { Fabricate(:account, username: 'mallory') }
+ let(:collection_uri) { 'http://example.com/partial-followers' }
+
+ let(:items) do
+ [
+ ActivityPub::TagManager.instance.uri_for(alice),
+ ActivityPub::TagManager.instance.uri_for(eve),
+ ActivityPub::TagManager.instance.uri_for(mallory),
+ ]
+ end
+
+ let(:payload) do
+ {
+ '@context': 'https://www.w3.org/ns/activitystreams',
+ type: 'Collection',
+ id: collection_uri,
+ items: items,
+ }.with_indifferent_access
+ end
+
+ subject { described_class.new }
+
+ shared_examples 'synchronizes followers' do
+ before do
+ alice.follow!(actor)
+ bob.follow!(actor)
+ mallory.request_follow!(actor)
+
+ allow(ActivityPub::DeliveryWorker).to receive(:perform_async)
+
+ subject.call(actor, collection_uri)
+ end
+
+ it 'keeps expected followers' do
+ expect(alice.following?(actor)).to be true
+ end
+
+ it 'removes local followers not in the remote list' do
+ expect(bob.following?(actor)).to be false
+ end
+
+ it 'converts follow requests to follow relationships when they have been accepted' do
+ expect(mallory.following?(actor)).to be true
+ end
+
+ it 'sends an Undo Follow to the actor' do
+ expect(ActivityPub::DeliveryWorker).to have_received(:perform_async).with(anything, eve.id, actor.inbox_url)
+ end
+ end
+
+ describe '#call' do
+ context 'when the endpoint is a Collection of actor URIs' do
+ before do
+ stub_request(:get, collection_uri).to_return(status: 200, body: Oj.dump(payload))
+ end
+
+ it_behaves_like 'synchronizes followers'
+ end
+
+ context 'when the endpoint is an OrderedCollection of actor URIs' do
+ let(:payload) do
+ {
+ '@context': 'https://www.w3.org/ns/activitystreams',
+ type: 'OrderedCollection',
+ id: collection_uri,
+ orderedItems: items,
+ }.with_indifferent_access
+ end
+
+ before do
+ stub_request(:get, collection_uri).to_return(status: 200, body: Oj.dump(payload))
+ end
+
+ it_behaves_like 'synchronizes followers'
+ end
+
+ context 'when the endpoint is a paginated Collection of actor URIs' do
+ let(:payload) do
+ {
+ '@context': 'https://www.w3.org/ns/activitystreams',
+ type: 'Collection',
+ id: collection_uri,
+ first: {
+ type: 'CollectionPage',
+ partOf: collection_uri,
+ items: items,
+ }
+ }.with_indifferent_access
+ end
+
+ before do
+ stub_request(:get, collection_uri).to_return(status: 200, body: Oj.dump(payload))
+ end
+
+ it_behaves_like 'synchronizes followers'
+ end
+ end
+end
require 'rails_helper'
describe ActivityPub::DeliveryWorker do
+ include RoutingHelper
+
subject { described_class.new }
let(:sender) { Fabricate(:account) }
let(:payload) { 'test' }
+ before do
+ allow_any_instance_of(Account).to receive(:remote_followers_hash).with('https://example.com/').and_return('somehash')
+ end
+
describe 'perform' do
it 'performs a request' do
stub_request(:post, 'https://example.com/api').to_return(status: 200)
- subject.perform(payload, sender.id, 'https://example.com/api')
- expect(a_request(:post, 'https://example.com/api')).to have_been_made.once
+ subject.perform(payload, sender.id, 'https://example.com/api', { synchronize_followers: true })
+ expect(a_request(:post, 'https://example.com/api').with(headers: { 'Collection-Synchronization' => "collectionId=\"#{account_followers_url(sender)}\", digest=\"somehash\", url=\"#{account_followers_synchronization_url(sender)}\"" })).to have_been_made.once
end
it 'raises when request fails' do