]> cat aescling's git repositories - mastodon.git/commitdiff
More robust PuSH subscription refreshes (#2799)
authorEugen Rochko <eugen@zeonfederated.com>
Fri, 5 May 2017 00:23:01 +0000 (02:23 +0200)
committerGitHub <noreply@github.com>
Fri, 5 May 2017 00:23:01 +0000 (02:23 +0200)
* Fix #2473 - Use sidekiq scheduler to refresh PuSH subscriptions instead of cron

Fix an issue where / in domain would raise exception in TagManager#normalize_domain

PuSH subscriptions refresh done in a round-robin way to avoid hammering a single
server's hub in sequence. Correct handling of failures/retries through Sidekiq (see
also #2613). Optimize Account#with_followers scope. Also, since subscriptions
are now delegated to Sidekiq jobs, an uncaught exception will not stop the entire
refreshing operation halfway through

Fix #2702 - Correct user agent header on outgoing http requests

* Add test for SubscribeService

* Extract #expiring_accounts into method

* Make mastodon:push:refresh no-op

* Queues are now defined in sidekiq.yml

* Queues are now in sidekiq.yml

19 files changed:
Gemfile
Gemfile.lock
Procfile
Procfile.dev
app/helpers/http_helper.rb
app/lib/tag_manager.rb
app/models/account.rb
app/services/follow_service.rb
app/services/process_interaction_service.rb
app/services/subscribe_service.rb
app/services/update_remote_profile_service.rb
app/workers/pubsubhubbub/delivery_worker.rb
app/workers/pubsubhubbub/subscribe_worker.rb [new file with mode: 0644]
app/workers/scheduler/subscriptions_scheduler.rb [new file with mode: 0644]
config/environments/development.rb
config/sidekiq.yml
lib/tasks/mastodon.rake
spec/services/follow_service_spec.rb
spec/services/subscribe_service_spec.rb [new file with mode: 0644]

diff --git a/Gemfile b/Gemfile
index 25979d0a41c6fe422c0a25013c9b5686537a3469..54fb972ab16eb92a278064bf3f5199151a93db71 100644 (file)
--- a/Gemfile
+++ b/Gemfile
@@ -35,7 +35,7 @@ gem 'link_header'
 gem 'local_time'
 gem 'nokogiri'
 gem 'oj'
-gem 'ostatus2', '~> 1.1'
+gem 'ostatus2', '~> 2.0'
 gem 'ox'
 gem 'rabl'
 gem 'rack-attack'
@@ -48,6 +48,7 @@ gem 'rqrcode'
 gem 'ruby-oembed', require: 'oembed'
 gem 'sanitize'
 gem 'sidekiq'
+gem 'sidekiq-scheduler'
 gem 'sidekiq-unique-jobs'
 gem 'simple-navigation'
 gem 'simple_form'
index aedf83433353a334f42ddf506f4f3aaa1a87ad44..10b47082cdb2f2b70020f70fe19774141c25c470 100644 (file)
@@ -143,6 +143,8 @@ GEM
       thread_safe
     encryptor (3.0.0)
     erubis (2.7.0)
+    et-orbi (1.0.3)
+      tzinfo
     execjs (2.7.0)
     fabrication (2.16.1)
     faker (1.7.3)
@@ -251,7 +253,7 @@ GEM
     oj (3.0.5)
     openssl (2.0.3)
     orm_adapter (0.5.0)
-    ostatus2 (1.1.0)
+    ostatus2 (2.0.0)
       addressable (~> 2.4)
       http (~> 2.0)
       nokogiri (~> 1.6)
@@ -386,6 +388,8 @@ GEM
       unicode-display_width (~> 1.0, >= 1.0.1)
     ruby-oembed (0.12.0)
     ruby-progressbar (1.8.1)
+    rufus-scheduler (3.4.0)
+      et-orbi (~> 1.0)
     safe_yaml (1.0.4)
     sanitize (4.4.0)
       crass (~> 1.0.2)
@@ -396,6 +400,11 @@ GEM
       connection_pool (~> 2.2, >= 2.2.0)
       rack-protection (>= 1.5.0)
       redis (~> 3.3, >= 3.3.3)
+    sidekiq-scheduler (2.1.4)
+      redis (~> 3)
+      rufus-scheduler (~> 3.2)
+      sidekiq (>= 3)
+      tilt (>= 1.4.0)
     sidekiq-unique-jobs (5.0.7)
       sidekiq (>= 4.0, <= 6.0)
       thor (~> 0)
@@ -499,7 +508,7 @@ DEPENDENCIES
   microformats2
   nokogiri
   oj
-  ostatus2 (~> 1.1)
+  ostatus2 (~> 2.0)
   ox
   paperclip (~> 5.1)
   paperclip-av-transcoder
@@ -527,6 +536,7 @@ DEPENDENCIES
   ruby-oembed
   sanitize
   sidekiq
+  sidekiq-scheduler
   sidekiq-unique-jobs
   simple-navigation
   simple_form
index 646e26059e0918d5d1bf99da23ddd8282cc1c35c..b18e4b6be55cd92e9313933845aeced388c0fd60 100644 (file)
--- a/Procfile
+++ b/Procfile
@@ -1,2 +1,2 @@
 web: bundle exec puma -C config/puma.rb
-worker: bundle exec sidekiq -q default -q push -q pull -q mailers
+worker: bundle exec sidekiq
index 87da7a7f481a24e36e67d83b543a86ec769f10ab..80666280dcf3bb8652a24ef6ee76e92df87824ad 100644 (file)
@@ -1,3 +1,4 @@
 web: PORT=3000 bundle exec puma -C config/puma.rb
+sidekiq: bundle exec sidekiq
 stream: PORT=4000 yarn run start
 webpack: ./bin/webpack-dev-server --host 0.0.0.0
index 1697de746ea6398601887e534a79566bedb5dff4..e39a52da01fe451d3efe02f6a4afd94a6f2a786d 100644 (file)
@@ -1,13 +1,17 @@
 # frozen_string_literal: true
 
 module HttpHelper
-  USER_AGENT = "#{HTTP::Request::USER_AGENT} (Mastodon/#{Mastodon::Version}; +http://#{Rails.configuration.x.local_domain}/)"
-
   def http_client(options = {})
     timeout = { write: 10, connect: 10, read: 10 }.merge(options)
 
-    HTTP.headers(user_agent: USER_AGENT)
+    HTTP.headers(user_agent: user_agent)
         .timeout(:per_operation, timeout)
         .follow
   end
+
+  private
+
+  def user_agent
+    @user_agent ||= "#{HTTP::Request::USER_AGENT} (Mastodon/#{Mastodon::Version}; +http://#{Rails.configuration.x.local_domain}/)"
+  end
 end
index 3bddfba7c2825ef6f19d80599627922dfc1a9248..6170a90def5c9eedf3060f678a73dab7b4591e18 100644 (file)
@@ -65,8 +65,10 @@ class TagManager
   end
 
   def normalize_domain(domain)
+    return if domain.nil?
+
     uri = Addressable::URI.new
-    uri.host = domain
+    uri.host = domain.gsub(/[\/]/, '')
     uri.normalize.host
   end
 
index d64591641c1a7adbe8a99daa6e64b1661ee74c4c..87b97a20d21827d3184af0d7d8561ec5f79613fe 100644 (file)
@@ -103,9 +103,10 @@ class Account < ApplicationRecord
 
   scope :remote, -> { where.not(domain: nil) }
   scope :local, -> { where(domain: nil) }
-  scope :without_followers, -> { where('(select count(f.id) from follows as f where f.target_account_id = accounts.id) = 0') }
-  scope :with_followers, -> { where('(select count(f.id) from follows as f where f.target_account_id = accounts.id) > 0') }
+  scope :without_followers, -> { where(followers_count: 0) }
+  scope :with_followers, -> { where('followers_count > 0') }
   scope :expiring, ->(time) { where(subscription_expires_at: nil).or(where('subscription_expires_at < ?', time)).remote.with_followers }
+  scope :partitioned, -> { order('row_number() over (partition by domain)') }
   scope :silenced, -> { where(silenced: true) }
   scope :suspended, -> { where(suspended: true) }
   scope :recent, -> { reorder(id: :desc) }
index 844f5282d4616fe018aa4c918e5d863760b48c82..23e721fac6b46207a8a1cbc6ca5f1e97372fbd42 100644 (file)
@@ -40,7 +40,7 @@ class FollowService < BaseService
     if target_account.local?
       NotifyService.new.call(target_account, follow)
     else
-      SubscribeService.new.call(target_account) unless target_account.subscribed?
+      Pubsubhubbub::SubscribeWorker.perform_async(target_account.id) unless target_account.subscribed?
       NotificationWorker.perform_async(build_follow_xml(follow), source_account.id, target_account.id)
       AfterRemoteFollowWorker.perform_async(follow.id)
     end
index 1f15a265d57bc80e63b19c43365f6a9d88058849..16eac235390b666c30255c3d787c0acef918f012 100644 (file)
@@ -77,7 +77,7 @@ class ProcessInteractionService < BaseService
   def authorize_follow_request!(account, target_account)
     follow_request = FollowRequest.find_by(account: target_account, target_account: account)
     follow_request&.authorize!
-    SubscribeService.new.call(account) unless account.subscribed?
+    Pubsubhubbub::SubscribeWorker.perform_async(account.id) unless account.subscribed?
   end
 
   def reject_follow_request!(account, target_account)
index 820b208e9936424ac616025bf10d2359745f8be3..138718f14ca12cbc3428d2b4c5247f586669da24 100644 (file)
@@ -5,15 +5,31 @@ class SubscribeService < BaseService
     account.secret = SecureRandom.hex
 
     subscription = account.subscription(api_subscription_url(account.id))
-    response = subscription.subscribe
+    response     = subscription.subscribe
 
-    unless response.successful?
+    if response_failed_permanently?(response)
+      # An error in the 4xx range (except for 429, which is rate limiting)
+      # means we're not allowed to subscribe. Fail and move on
       account.secret = ''
-      Rails.logger.debug "PuSH subscription request for #{account.acct} failed: #{response.message}"
+      account.save!
+    elsif response_successful?(response)
+      # Anything in the 2xx range means the subscription will be confirmed
+      # asynchronously, we've done what we needed to do
+      account.save!
+    else
+      # What's left is the 5xx range and 429, which means we need to retry
+      # at a later time. Fail loudly!
+      raise "Subscription attempt failed for #{account.acct} (#{account.hub_url}): HTTP #{response.code}"
     end
+  end
+
+  private
+
+  def response_failed_permanently?(response)
+    response.code > 299 && response.code < 500 && response.code != 429
+  end
 
-    account.save!
-  rescue HTTP::Error, OpenSSL::SSL::SSLError
-    Rails.logger.debug "PuSH subscription request for #{account.acct} could not be made due to HTTP or SSL error"
+  def response_successful?(response)
+    response.code > 199 && response.code < 300
   end
 end
index 31f4af2c1222197cc1ca62bfebc0addb4963f015..f0c39ecc0a10184fb7cb6787e885aee5aaf8c776 100644 (file)
@@ -28,7 +28,7 @@ class UpdateRemoteProfileService < BaseService
 
     account.save_with_optional_avatar!
 
-    SubscribeService.new.call(account) if resubscribe && (account.hub_url != old_hub_url)
+    Pubsubhubbub::SubscribeWorker.perform_async(account.id) if resubscribe && (account.hub_url != old_hub_url)
   end
 
   private
index f645b1e24d0189b4d387970135c667331ca90196..511ae14b3f7992fd01d96238f8244893a18513d9 100644 (file)
@@ -25,8 +25,8 @@ class Pubsubhubbub::DeliveryWorker
                    .headers(headers)
                    .post(subscription.callback_url, body: payload)
 
-    return subscription.destroy! if response.code > 299 && response.code < 500 && response.code != 429 # HTTP 4xx means error is not temporary, except for 429 (throttling)
-    raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response.code > 199 && response.code < 300
+    return subscription.destroy! if response_failed_permanently?(response) # HTTP 4xx means error is not temporary, except for 429 (throttling)
+    raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response_successful?(response)
 
     subscription.touch(:last_successful_delivery_at)
   end
@@ -37,4 +37,12 @@ class Pubsubhubbub::DeliveryWorker
     hmac = OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), secret, payload)
     "sha1=#{hmac}"
   end
+
+  def response_failed_permanently?(response)
+    response.code > 299 && response.code < 500 && response.code != 429
+  end
+
+  def response_successful?(response)
+    response.code > 199 && response.code < 300
+  end
 end
diff --git a/app/workers/pubsubhubbub/subscribe_worker.rb b/app/workers/pubsubhubbub/subscribe_worker.rb
new file mode 100644 (file)
index 0000000..0c4111a
--- /dev/null
@@ -0,0 +1,13 @@
+# frozen_string_literal: true
+
+class Pubsubhubbub::SubscribeWorker
+  include Sidekiq::Worker
+
+  sidekiq_options queue: 'push'
+
+  def perform(account_id)
+    account = Account.find(account_id)
+    Rails.logger.debug "PuSH re-subscribing to #{account.acct}"
+    ::SubscribeService.new.call(account)
+  end
+end
diff --git a/app/workers/scheduler/subscriptions_scheduler.rb b/app/workers/scheduler/subscriptions_scheduler.rb
new file mode 100644 (file)
index 0000000..03622e9
--- /dev/null
@@ -0,0 +1,20 @@
+# frozen_string_literal: true
+require 'sidekiq-scheduler'
+
+class Scheduler::SubscriptionsScheduler
+  include Sidekiq::Worker
+
+  def perform
+    Rails.logger.debug 'Queueing PuSH re-subscriptions'
+
+    expiring_accounts.pluck(:id) do |id|
+      Pubsubhubbub::SubscribeWorker.perform_async(id)
+    end
+  end
+
+  private
+
+  def expiring_accounts
+    Account.expiring(1.day.from_now).partitioned
+  end
+end
index 58e8a0728271c001f75fba7166d4eea7b8504e36..c20d08493b6d5b578b8cfa459ff2059d539eda95 100644 (file)
@@ -69,7 +69,4 @@ Rails.application.configure do
   end
 end
 
-require 'sidekiq/testing'
-Sidekiq::Testing.inline!
-
 ActiveRecordQueryTrace.enabled = ENV.fetch('QUERY_TRACE_ENABLED') { false }
index ee32b631754c2a522519a5bc3aa3edd216e1ad23..5c700cb27aadd1087af099e507584e461414acea 100644 (file)
@@ -1,2 +1,11 @@
 ---
 :concurrency: 5
+:queues:
+  - default
+  - push
+  - pull
+  - mailers
+:schedule:
+  subscriptions_scheduler:
+    cron: '0 5 * * *'
+    class: Scheduler::SubscriptionsScheduler
index 2cc1c29eb727daceec076eb02d9a54a75dacdf25..290b28098f9f6a02c637e94c1c06830d2de986f5 100644 (file)
@@ -77,10 +77,8 @@ namespace :mastodon do
 
     desc 'Re-subscribes to soon expiring PuSH subscriptions'
     task refresh: :environment do
-      Account.expiring(1.day.from_now).find_each do |a|
-        Rails.logger.debug "PuSH re-subscribing to #{a.acct}"
-        SubscribeService.new.call(a)
-      end
+      # No-op
+      # This task is now executed via sidekiq-scheduler
     end
   end
 
index 2ce0fa464e93e0c6e8eef6ac7a13090a0431131f..bda5daee13d191bbda82e3c7bbbd58c26ccc96a1 100644 (file)
@@ -53,10 +53,11 @@ RSpec.describe FollowService do
     end
 
     describe 'unlocked account' do
-      let(:bob) { Fabricate(:user, email: 'bob@example.com', account: Fabricate(:account, username: 'bob', domain: 'example.com', salmon_url: 'http://salmon.example.com')).account }
+      let(:bob) { Fabricate(:user, email: 'bob@example.com', account: Fabricate(:account, username: 'bob', domain: 'example.com', salmon_url: 'http://salmon.example.com', hub_url: 'http://hub.example.com')).account }
 
       before do
         stub_request(:post, "http://salmon.example.com/").to_return(:status => 200, :body => "", :headers => {})
+        stub_request(:post, "http://hub.example.com/").to_return(status: 202)
         subject.call(sender, bob.acct)
       end
 
@@ -70,6 +71,10 @@ RSpec.describe FollowService do
           xml.match(TagManager::VERBS[:follow])
         }).to have_been_made.once
       end
+
+      it 'subscribes to PuSH' do
+        expect(a_request(:post, "http://hub.example.com/")).to have_been_made.once
+      end
     end
   end
 end
diff --git a/spec/services/subscribe_service_spec.rb b/spec/services/subscribe_service_spec.rb
new file mode 100644 (file)
index 0000000..8cf0100
--- /dev/null
@@ -0,0 +1,38 @@
+require 'rails_helper'
+
+RSpec.describe SubscribeService do
+  let(:account) { Fabricate(:account, username: 'bob', domain: 'example.com', hub_url: 'http://hub.example.com') }
+  subject { SubscribeService.new }
+
+  it 'sends subscription request to PuSH hub' do
+    stub_request(:post, 'http://hub.example.com/').to_return(status: 202)
+    subject.call(account)
+    expect(a_request(:post, 'http://hub.example.com/')).to have_been_made.once
+  end
+
+  it 'generates and keeps PuSH secret on successful call' do
+    stub_request(:post, 'http://hub.example.com/').to_return(status: 202)
+    subject.call(account)
+    expect(account.secret).to_not be_blank
+  end
+
+  it 'fails silently if PuSH hub forbids subscription' do
+    stub_request(:post, 'http://hub.example.com/').to_return(status: 403)
+    subject.call(account)
+  end
+
+  it 'fails silently if PuSH hub is not found' do
+    stub_request(:post, 'http://hub.example.com/').to_return(status: 404)
+    subject.call(account)
+  end
+
+  it 'fails loudly if there is a network error' do
+    stub_request(:post, 'http://hub.example.com/').to_raise(HTTP::Error)
+    expect { subject.call(account) }.to raise_error HTTP::Error
+  end
+
+  it 'fails loudly if PuSH hub is unavailable' do
+    stub_request(:post, 'http://hub.example.com/').to_return(status: 503)
+    expect { subject.call(account) }.to raise_error
+  end
+end