end
end
+ # Populate direct feed of account from scratch
+ # @param [Account] account
+ # @return [void]
+ def populate_direct_feed(account)
+ added = 0
+ limit = FeedManager::MAX_ITEMS / 2
+ max_id = nil
+
+ loop do
+ statuses = Status.as_direct_timeline(account, limit, max_id)
+
+ break if statuses.empty?
+
+ statuses.each do |status|
+ next if filter_from_direct?(status, account)
+ added += 1 if add_to_feed(:direct, account.id, status)
+ end
+
+ break unless added.zero?
+
+ max_id = statuses.last.id
+ end
+ end
+
+ # Completely clear multiple feeds at once
+ # @param [Symbol] type
+ # @param [Array<Integer>] ids
+ # @return [void]
+ def clean_feeds!(type, ids)
+ reblogged_id_sets = {}
+
+ redis.pipelined do
+ ids.each do |feed_id|
+ redis.del(key(type, feed_id))
+ reblog_key = key(type, feed_id, 'reblogs')
+ # We collect a future for this: we don't block while getting
+ # it, but we can iterate over it later.
+ reblogged_id_sets[feed_id] = redis.zrange(reblog_key, 0, -1)
+ redis.del(reblog_key)
+ end
+ end
+
+ # Remove all of the reblog tracking keys we just removed the
+ # references to.
+ redis.pipelined do
+ reblogged_id_sets.each do |feed_id, future|
+ future.value.each do |reblogged_id|
+ reblog_set_key = key(type, feed_id, "reblogs:#{reblogged_id}")
+ redis.del(reblog_set_key)
+ end
+ end
+ end
+ end
+
private
# Trim a feed to maximum size by removing older items
class BatchedRemoveStatusService < BaseService
include Redisable
- # Delete given statuses and reblogs of them
- # Remove statuses from home feeds
- # Push delete events to streaming API for home feeds and public feeds
- # @param [Enumerable<Status>] statuses A preferably batched array of statuses
+ # Delete multiple statuses and reblogs of them as efficiently as possible
+ # @param [Enumerable<Status>] statuses An array of statuses
# @param [Hash] options
- # @option [Boolean] :skip_side_effects
+ # @option [Boolean] :skip_side_effects Do not modify feeds and send updates to streaming API
def call(statuses, **options)
- statuses = Status.where(id: statuses.map(&:id)).includes(:account).flat_map { |status| [status] + status.reblogs.includes(:account).to_a }
+ ActiveRecord::Associations::Preloader.new.preload(statuses, options[:skip_side_effects] ? :reblogs : [:account, :tags, reblogs: :account])
- @mentions = statuses.each_with_object({}) { |s, h| h[s.id] = s.active_mentions.includes(:account).to_a }
- @tags = statuses.each_with_object({}) { |s, h| h[s.id] = s.tags.pluck(:name) }
+ statuses_and_reblogs = statuses.flat_map { |status| [status] + status.reblogs }
- @json_payloads = statuses.each_with_object({}) { |s, h| h[s.id] = Oj.dump(event: :delete, payload: s.id.to_s) }
+ # The conversations for direct visibility statuses also need
+ # to be manually updated. This part is not efficient but we
+ # rely on direct visibility statuses being relatively rare.
+ statuses_with_account_conversations = statuses.select(&:direct_visibility?)
- statuses.each do |status|
- status.mark_for_mass_destruction!
- status.destroy
+ ActiveRecord::Associations::Preloader.new.preload(statuses_with_account_conversations, [mentions: :account])
+
+ statuses_with_account_conversations.each do |status|
+ status.send(:unlink_from_conversations)
++ unpush_from_direct_timelines(status)
end
+ # We do not batch all deletes into one to avoid having a long-running
+ # transaction lock the database, but we use the delete method instead
+ # of destroy to avoid all callbacks. We rely on foreign keys to
+ # cascade the delete faster without loading the associations.
+ statuses_and_reblogs.each_slice(50) { |slice| Status.where(id: slice.map(&:id)).delete_all }
+
+ # Since we skipped all callbacks, we also need to manually
+ # deindex the statuses
+ Chewy.strategy.current.update(StatusesIndex, statuses_and_reblogs) if Chewy.enabled?
+
return if options[:skip_side_effects]
# Batch by source account
end
def unpush_from_public_timelines(status)
- return unless status.public_visibility?
+ return unless status.public_visibility? && status.id > @status_id_cutoff
- payload = @json_payloads[status.id]
+ payload = Oj.dump(event: :delete, payload: status.id.to_s)
- redis.pipelined do
- redis.publish('timeline:public', payload)
- if status.local?
- redis.publish('timeline:public:local', payload)
- else
- redis.publish('timeline:public:remote', payload)
- end
- if status.media_attachments.any?
- redis.publish('timeline:public:media', payload)
- if status.local?
- redis.publish('timeline:public:local:media', payload)
- else
- redis.publish('timeline:public:remote:media', payload)
- end
- end
+ redis.publish('timeline:public', payload)
+ redis.publish(status.local? ? 'timeline:public:local' : 'timeline:public:remote', payload)
- @tags[status.id].each do |hashtag|
- redis.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}", payload)
- redis.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}:local", payload) if status.local?
- end
+ if status.media_attachments.any?
+ redis.publish('timeline:public:media', payload)
+ redis.publish(status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', payload)
+ end
+
+ status.tags.map { |tag| tag.name.mb_chars.downcase }.each do |hashtag|
+ redis.publish("timeline:hashtag:#{hashtag}", payload)
+ redis.publish("timeline:hashtag:#{hashtag}:local", payload) if status.local?
end
end
- payload = @json_payloads[status.id]
- redis.pipelined do
- @mentions[status.id].each do |mention|
- FeedManager.instance.unpush_from_direct(mention.account, status) if mention.account.local?
- end
- FeedManager.instance.unpush_from_direct(status.account, status) if status.account.local?
+
+ def unpush_from_direct_timelines(status)
++ status.mentions.each do |mention|
++ FeedManager.instance.unpush_from_direct(mention.account, status) if mention.account.local?
+ end
+ end
end
end
def clean_list_feeds!
- clean_feeds!(inactive_list_ids, :list)
+ feed_manager.clean_feeds!(:list, inactive_list_ids)
end
- clean_feeds!(inactive_account_ids, :direct)
- end
-
- def clean_feeds!(ids, type)
- reblogged_id_sets = {}
-
- redis.pipelined do
- ids.each do |feed_id|
- redis.del(feed_manager.key(type, feed_id))
- reblog_key = feed_manager.key(type, feed_id, 'reblogs')
- # We collect a future for this: we don't block while getting
- # it, but we can iterate over it later.
- reblogged_id_sets[feed_id] = redis.zrange(reblog_key, 0, -1)
- redis.del(reblog_key)
- end
- end
-
- # Remove all of the reblog tracking keys we just removed the
- # references to.
- redis.pipelined do
- reblogged_id_sets.each do |feed_id, future|
- future.value.each do |reblogged_id|
- reblog_set_key = feed_manager.key(type, feed_id, "reblogs:#{reblogged_id}")
- redis.del(reblog_set_key)
- end
- end
- end
+ def clean_direct_feeds!
++ feed_manager.clean_feeds!(:direct, inactive_account_ids)
+ end
+
def inactive_account_ids
@inactive_account_ids ||= User.confirmed.inactive.pluck(:account_id)
end