include RoutingHelper
include Authorization
include Redisable
+ include Lockable
skip_before_action :store_current_location
skip_before_action :require_functional!
rescue_from HTTP::TimeoutError, HTTP::ConnectionError, OpenSSL::SSL::SSLError, with: :internal_server_error
def show
- RedisLock.acquire(lock_options) do |lock|
- if lock.acquired?
- @media_attachment = MediaAttachment.remote.attached.find(params[:id])
- authorize @media_attachment.status, :show?
- redownload! if @media_attachment.needs_redownload? && !reject_media?
- else
- raise Mastodon::RaceConditionError
- end
+ with_lock("media_download:#{params[:id]}") do
+ @media_attachment = MediaAttachment.remote.attached.find(params[:id])
+ authorize @media_attachment.status, :show?
+ redownload! if @media_attachment.needs_redownload? && !reject_media?
end
redirect_to full_asset_url(@media_attachment.file.url(version))
end
end
- def lock_options
- { redis: redis, key: "media_download:#{params[:id]}", autorelease: 15.minutes.seconds }
- end
-
def reject_media?
DomainBlock.reject_media?(@media_attachment.account.domain)
end
class Settings::ExportsController < Settings::BaseController
include Authorization
include Redisable
+ include Lockable
skip_before_action :require_functional!
def create
backup = nil
- RedisLock.acquire(lock_options) do |lock|
- if lock.acquired?
- authorize :backup, :create?
- backup = current_user.backups.create!
- else
- raise Mastodon::RaceConditionError
- end
+ with_lock("backup:#{current_user.id}") do
+ authorize :backup, :create?
+ backup = current_user.backups.create!
end
BackupWorker.perform_async(backup.id)
redirect_to settings_export_path
end
-
- def lock_options
- { redis: redis, key: "backup:#{current_user.id}" }
- end
end
class ActivityPub::Activity
include JsonLdHelper
include Redisable
+ include Lockable
SUPPORTED_TYPES = %w(Note Question).freeze
CONVERTED_TYPES = %w(Image Audio Video Article Page Event).freeze
end
end
- def lock_or_return(key, expire_after = 2.hours.seconds)
- yield if redis.set(key, true, nx: true, ex: expire_after)
- ensure
- redis.del(key)
- end
-
- def lock_or_fail(key, expire_after = 15.minutes.seconds)
- RedisLock.acquire({ redis: redis, key: key, autorelease: expire_after }) do |lock|
- if lock.acquired?
- yield
- else
- raise Mastodon::RaceConditionError
- end
- end
- end
-
def fetch?
!@options[:delivery]
end
def perform
return reject_payload! if delete_arrived_first?(@json['id']) || !related_to_local_activity?
- lock_or_fail("announce:#{@object['id']}") do
+ with_lock("announce:#{@object['id']}") do
original_status = status_from_object
return reject_payload! if original_status.nil? || !announceable?(original_status)
def create_status
return reject_payload! if unsupported_object_type? || invalid_origin?(object_uri) || tombstone_exists? || !related_to_local_activity?
- lock_or_fail("create:#{object_uri}") do
+ with_lock("create:#{object_uri}") do
return if delete_arrived_first?(object_uri) || poll_vote?
@status = find_existing_status
poll = replied_to_status.preloadable_poll
already_voted = true
- lock_or_fail("vote:#{replied_to_status.poll_id}:#{@account.id}") do
+ with_lock("vote:#{replied_to_status.poll_id}:#{@account.id}") do
already_voted = poll.votes.where(account: @account).exists?
poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri)
end
private
def delete_person
- lock_or_return("delete_in_progress:#{@account.id}") do
+ with_lock("delete_in_progress:#{@account.id}", autorelease: 2.hours, raise_on_failure: false) do
DeleteAccountService.new.call(@account, reserve_username: false, skip_activitypub: true)
end
end
def delete_note
return if object_uri.nil?
- lock_or_return("delete_status_in_progress:#{object_uri}", 5.minutes.seconds) do
+ with_lock("delete_status_in_progress:#{object_uri}", raise_on_failure: false) do
unless invalid_origin?(object_uri)
# This lock ensures a concurrent `ActivityPub::Activity::Create` either
# does not create a status at all, or has finished saving it to the
# database before we try to load it.
# Without the lock, `delete_later!` could be called after `delete_arrived_first?`
# and `Status.find` before `Status.create!`
- lock_or_fail("create:#{object_uri}") { delete_later!(object_uri) }
+ with_lock("create:#{object_uri}") { delete_later!(object_uri) }
Tombstone.find_or_create_by(uri: object_uri, account: @account)
end
class AccountMigration < ApplicationRecord
include Redisable
+ include Lockable
COOLDOWN_PERIOD = 30.days.freeze
return false unless errors.empty?
- RedisLock.acquire(lock_options) do |lock|
- if lock.acquired?
- save
- else
- raise Mastodon::RaceConditionError
- end
+ with_lock("account_migration:#{account.id}") do
+ save
end
end
def validate_migration_cooldown
errors.add(:base, I18n.t('migrations.errors.on_cooldown')) if account.migrations.within_cooldown.exists?
end
-
- def lock_options
- { redis: redis, key: "account_migration:#{account.id}" }
- end
end
--- /dev/null
+# frozen_string_literal: true
+
+module Lockable
+ # @param [String] lock_name
+ # @param [ActiveSupport::Duration] autorelease Automatically release the lock after this time
+ # @param [Boolean] raise_on_failure Raise an error if a lock cannot be acquired, or fail silently
+ # @raise [Mastodon::RaceConditionError]
+ def with_lock(lock_name, autorelease: 15.minutes, raise_on_failure: true)
+ with_redis do |redis|
+ RedisLock.acquire(redis: redis, key: "lock:#{lock_name}", autorelease: autorelease.seconds) do |lock|
+ if lock.acquired?
+ yield
+ elsif raise_on_failure
+ raise Mastodon::RaceConditionError, "Could not acquire lock for #{lock_name}, try again later"
+ end
+ end
+ end
+ end
+end
# frozen_string_literal: true
module Redisable
- extend ActiveSupport::Concern
-
- private
-
def redis
Thread.current[:redis] ||= RedisConfiguration.pool.checkout
end
+
+ def with_redis(&block)
+ RedisConfiguration.with(&block)
+ end
end
include JsonLdHelper
include DomainControlHelper
include Redisable
+ include Lockable
# Should be called with confirmed valid JSON
# and WebFinger-resolved username and domain
@domain = domain
@collections = {}
- RedisLock.acquire(lock_options) do |lock|
- if lock.acquired?
- @account = Account.remote.find_by(uri: @uri) if @options[:only_key]
- @account ||= Account.find_remote(@username, @domain)
- @old_public_key = @account&.public_key
- @old_protocol = @account&.protocol
- @suspension_changed = false
-
- create_account if @account.nil?
- update_account
- process_tags
-
- process_duplicate_accounts! if @options[:verified_webfinger]
- else
- raise Mastodon::RaceConditionError
- end
+ with_lock("process_account:#{@uri}") do
+ @account = Account.remote.find_by(uri: @uri) if @options[:only_key]
+ @account ||= Account.find_remote(@username, @domain)
+ @old_public_key = @account&.public_key
+ @old_protocol = @account&.protocol
+ @suspension_changed = false
+
+ create_account if @account.nil?
+ update_account
+ process_tags
+
+ process_duplicate_accounts! if @options[:verified_webfinger]
end
return if @account.nil?
!@old_protocol.nil? && @old_protocol != @account.protocol
end
- def lock_options
- { redis: redis, key: "process_account:#{@uri}", autorelease: 15.minutes.seconds }
- end
-
def process_tags
return if @json['tag'].blank?
class ActivityPub::ProcessStatusUpdateService < BaseService
include JsonLdHelper
include Redisable
+ include Lockable
def call(status, json)
raise ArgumentError, 'Status has unsaved changes' if status.changed?
last_edit_date = @status.edited_at.presence || @status.created_at
# Only allow processing one create/update per status at a time
- RedisLock.acquire(lock_options) do |lock|
- if lock.acquired?
- Status.transaction do
- record_previous_edit!
- update_media_attachments!
- update_poll!
- update_immediate_attributes!
- update_metadata!
- create_edits!
- end
+ with_lock("create:#{@uri}") do
+ Status.transaction do
+ record_previous_edit!
+ update_media_attachments!
+ update_poll!
+ update_immediate_attributes!
+ update_metadata!
+ create_edits!
+ end
- queue_poll_notifications!
+ queue_poll_notifications!
- next unless significant_changes?
+ next unless significant_changes?
- reset_preview_card!
- broadcast_updates!
- else
- raise Mastodon::RaceConditionError
- end
+ reset_preview_card!
+ broadcast_updates!
end
forward_activity! if significant_changes? && @status_parser.edited_at > last_edit_date
end
def handle_implicit_update!
- RedisLock.acquire(lock_options) do |lock|
- if lock.acquired?
- update_poll!(allow_significant_changes: false)
- else
- raise Mastodon::RaceConditionError
- end
+ with_lock("create:#{@uri}") do
+ update_poll!(allow_significant_changes: false)
+ queue_poll_notifications!
end
-
- queue_poll_notifications!
end
def update_media_attachments!
equals_or_includes_any?(@json['type'], %w(Note Question))
end
- def lock_options
- { redis: redis, key: "create:#{@uri}", autorelease: 15.minutes.seconds }
- end
-
def record_previous_edit!
@previous_edit = @status.build_snapshot(at_time: @status.created_at, rate_limit: false) if @status.edits.empty?
end
class FetchLinkCardService < BaseService
include Redisable
+ include Lockable
URL_PATTERN = %r{
(#{Twitter::TwitterText::Regex[:valid_url_preceding_chars]}) # $1 preceding chars
@url = @original_url.to_s
- RedisLock.acquire(lock_options) do |lock|
- if lock.acquired?
- @card = PreviewCard.find_by(url: @url)
- process_url if @card.nil? || @card.updated_at <= 2.weeks.ago || @card.missing_image?
- else
- raise Mastodon::RaceConditionError
- end
+ with_lock("fetch:#{@original_url}") do
+ @card = PreviewCard.find_by(url: @url)
+ process_url if @card.nil? || @card.updated_at <= 2.weeks.ago || @card.missing_image?
end
attach_card if @card&.persisted?
@card.assign_attributes(link_details_extractor.to_preview_card_attributes)
@card.save_with_optional_image! unless @card.title.blank? && @card.html.blank?
end
-
- def lock_options
- { redis: redis, key: "fetch:#{@original_url}", autorelease: 15.minutes.seconds }
- end
end
class RemoveStatusService < BaseService
include Redisable
include Payloadable
+ include Lockable
# Delete a status
# @param [Status] status
@account = status.account
@options = options
- RedisLock.acquire(lock_options) do |lock|
- if lock.acquired?
- @status.discard
-
- remove_from_self if @account.local?
- remove_from_followers
- remove_from_lists
-
- # There is no reason to send out Undo activities when the
- # cause is that the original object has been removed, since
- # original object being removed implicitly removes reblogs
- # of it. The Delete activity of the original is forwarded
- # separately.
- remove_from_remote_reach if @account.local? && !@options[:original_removed]
-
- # Since reblogs don't mention anyone, don't get reblogged,
- # favourited and don't contain their own media attachments
- # or hashtags, this can be skipped
- unless @status.reblog?
- remove_from_mentions
- remove_reblogs
- remove_from_hashtags
- remove_from_public
- remove_from_media if @status.with_media?
- remove_from_direct if status.direct_visibility?
- remove_media
- end
-
- @status.destroy! if permanently?
- else
- raise Mastodon::RaceConditionError
+ with_lock("distribute:#{@status.id}") do
+ @status.discard
+
+ remove_from_self if @account.local?
+ remove_from_followers
+ remove_from_lists
+
+ # There is no reason to send out Undo activities when the
+ # cause is that the original object has been removed, since
+ # original object being removed implicitly removes reblogs
+ # of it. The Delete activity of the original is forwarded
+ # separately.
+ remove_from_remote_reach if @account.local? && !@options[:original_removed]
+
+ # Since reblogs don't mention anyone, don't get reblogged,
+ # favourited and don't contain their own media attachments
+ # or hashtags, this can be skipped
+ unless @status.reblog?
+ remove_from_mentions
+ remove_reblogs
+ remove_from_hashtags
+ remove_from_public
+ remove_from_media if @status.with_media?
+ remove_from_direct if status.direct_visibility?
+ remove_media
end
+
+ @status.destroy! if permanently?
end
end
def permanently?
@options[:immediate] || !(@options[:preserve] || @status.reported?)
end
-
- def lock_options
- { redis: redis, key: "distribute:#{@status.id}", autorelease: 5.minutes.seconds }
- end
end
include DomainControlHelper
include WebfingerHelper
include Redisable
+ include Lockable
# Find or create an account record for a remote user. When creating,
# look up the user's webfinger and fetch ActivityPub data
def fetch_account!
return unless activitypub_ready?
- RedisLock.acquire(lock_options) do |lock|
- if lock.acquired?
- @account = ActivityPub::FetchRemoteAccountService.new.call(actor_url)
- else
- raise Mastodon::RaceConditionError
- end
+ with_lock("resolve:#{@username}@#{@domain}") do
+ @account = ActivityPub::FetchRemoteAccountService.new.call(actor_url)
end
@account
@account.suspend!(origin: :remote)
AccountDeletionWorker.perform_async(@account.id, { 'reserve_username' => false, 'skip_activitypub' => true })
end
-
- def lock_options
- { redis: redis, key: "resolve:#{@username}@#{@domain}", autorelease: 15.minutes.seconds }
- end
end
include Authorization
include Payloadable
include Redisable
+ include Lockable
def call(account, poll, choices)
authorize_with account, poll, :vote?
already_voted = true
- RedisLock.acquire(lock_options) do |lock|
- if lock.acquired?
- already_voted = @poll.votes.where(account: @account).exists?
+ with_lock("vote:#{@poll.id}:#{@account.id}") do
+ already_voted = @poll.votes.where(account: @account).exists?
- ApplicationRecord.transaction do
- @choices.each do |choice|
- @votes << @poll.votes.create!(account: @account, choice: Integer(choice))
- end
+ ApplicationRecord.transaction do
+ @choices.each do |choice|
+ @votes << @poll.votes.create!(account: @account, choice: Integer(choice))
end
- else
- raise Mastodon::RaceConditionError
end
end
@poll.reload
retry
end
-
- def lock_options
- { redis: redis, key: "vote:#{@poll.id}:#{@account.id}" }
- end
end
class DistributionWorker
include Sidekiq::Worker
include Redisable
+ include Lockable
def perform(status_id, options = {})
- RedisLock.acquire(redis: redis, key: "distribute:#{status_id}", autorelease: 5.minutes.seconds) do |lock|
- if lock.acquired?
- FanOutOnWriteService.new.call(Status.find(status_id), **options.symbolize_keys)
- else
- raise Mastodon::RaceConditionError
- end
+ with_lock("distribute:#{status_id}") do
+ FanOutOnWriteService.new.call(Status.find(status_id), **options.symbolize_keys)
end
rescue ActiveRecord::RecordNotFound
true