end
def direct_timeline_statuses
- # this query requires built in pagination.
- Status.as_direct_timeline(
- current_account,
+ account_direct_feed.get(
limit_param(DEFAULT_STATUSES_LIMIT),
params[:max_id],
params[:since_id],
- true # returns array of cache_ids object
+ params[:min_id]
)
end
+ def account_direct_feed
+ DirectFeed.new(current_account)
+ end
+
def insert_pagination_headers
set_pagination_headers(next_path, prev_path)
end
filter_from_home?(status, receiver_id)
elsif timeline_type == :mentions
filter_from_mentions?(status, receiver_id)
+ elsif timeline_type == :direct
+ filter_from_direct?(status, receiver_id)
else
false
end
true
end
+ def push_to_direct(account, status)
+ return false unless add_to_feed(:direct, account.id, status)
+ trim(:direct, account.id)
+ PushUpdateWorker.perform_async(account.id, status.id, "timeline:direct:#{account.id}")
+ true
+ end
+
+ def unpush_from_direct(account, status)
+ return false unless remove_from_feed(:direct, account.id, status)
+ redis.publish("timeline:direct:#{account.id}", Oj.dump(event: :delete, payload: status.id.to_s))
+ end
+
def trim(type, account_id)
timeline_key = key(type, account_id)
reblog_key = key(type, account_id, 'reblogs')
end
end
+ def populate_direct_feed(account)
+ added = 0
+ limit = FeedManager::MAX_ITEMS / 2
+ max_id = nil
+
+ loop do
+ statuses = Status.as_direct_timeline(account, limit, max_id)
+
+ break if statuses.empty?
+
+ statuses.each do |status|
+ next if filter_from_direct?(status, account)
+ added += 1 if add_to_feed(:direct, account.id, status)
+ end
+
+ break unless added.zero?
+
+ max_id = statuses.last.id
+ end
+ end
+
private
def push_update_required?(timeline_id)
should_filter
end
+ def filter_from_direct?(status, receiver_id)
+ return false if receiver_id == status.account_id
+ filter_from_mentions?(status, receiver_id)
+ end
+
def phrase_filtered?(status, receiver_id, context)
active_filters = Rails.cache.fetch("filters:#{receiver_id}") { CustomFilter.where(account_id: receiver_id).active_irreversible.to_a }.to_a
--- /dev/null
+# frozen_string_literal: true
+
+class DirectFeed < Feed
+ include Redisable
+
+ def initialize(account)
+ @type = :direct
+ @id = account.id
+ @account = account
+ end
+
+ def get(limit, max_id = nil, since_id = nil, min_id = nil)
+ unless redis.exists("account:#{@account.id}:regeneration")
+ statuses = super
+ return statuses unless statuses.empty?
+ end
+ from_database(limit, max_id, since_id, min_id)
+ end
+
+ private
+
+ def from_database(limit, max_id, since_id, min_id)
+ loop do
+ statuses = Status.as_direct_timeline(@account, limit, max_id, since_id, min_id)
+ return statuses if statuses.empty?
+ max_id = statuses.last.id
+ statuses = statuses.reject { |status| FeedManager.instance.filter?(:direct, status, @account.id) }
+ return statuses unless statuses.empty?
+ end
+ end
+end
payload = @json_payloads[status.id]
redis.pipelined do
@mentions[status.id].each do |mention|
- redis.publish("timeline:direct:#{mention.account.id}", payload) if mention.account.local?
+ FeedManager.instance.unpush_from_direct(mention.account, status) if mention.account.local?
end
- redis.publish("timeline:direct:#{status.account.id}", payload) if status.account.local?
+ FeedManager.instance.unpush_from_direct(status.account, status) if status.account.local?
end
end
def deliver_to_self(status)
Rails.logger.debug "Delivering status #{status.id} to author"
FeedManager.instance.push_to_home(status.account, status)
+ FeedManager.instance.push_to_direct(status.account, status) if status.direct_visibility?
end
def deliver_to_followers(status)
def deliver_to_direct_timelines(status)
Rails.logger.debug "Delivering status #{status.id} to direct timelines"
- status.mentions.includes(:account).each do |mention|
- Redis.current.publish("timeline:direct:#{mention.account.id}", @payload) if mention.account.local?
+ FeedInsertWorker.push_bulk(status.mentions.includes(:account).map(&:account).select { |mentioned_account| mentioned_account.local? }) do |account|
+ [status.id, account.id, :direct]
end
-
- Redis.current.publish("timeline:direct:#{status.account.id}", @payload) if status.account.local?
end
def deliver_to_own_conversation(status)
class PrecomputeFeedService < BaseService
def call(account)
FeedManager.instance.populate_feed(account)
+ FeedManager.instance.populate_direct_feed(account)
ensure
Redis.current.del("account:#{account.id}:regeneration")
end
def remove_from_self
FeedManager.instance.unpush_from_home(@account, @status)
+ FeedManager.instance.unpush_from_direct(@account, @status) if @status.direct_visibility?
end
def remove_from_followers
def remove_from_direct
@mentions.each do |mention|
- Redis.current.publish("timeline:direct:#{mention.account.id}", @payload) if mention.account.local?
+ FeedManager.instance.unpush_from_direct(mention.account, @status) if mention.account.local?
end
- Redis.current.publish("timeline:direct:#{@account.id}", @payload) if @account.local?
end
def lock_options
when :list
@list = List.find(id)
@follower = @list.account
+ when :direct
+ @account = Account.find(id)
end
check_and_insert
def feed_filtered?
# Note: Lists are a variation of home, so the filtering rules
# of home apply to both
- FeedManager.instance.filter?(:home, @status, @follower.id)
+ case @type
+ when :home, :list
+ FeedManager.instance.filter?(:home, @status, @follower.id)
+ when :direct
+ FeedManager.instance.filter?(:direct, @status, @account.id)
+ end
end
def perform_push
FeedManager.instance.push_to_home(@follower, @status)
when :list
FeedManager.instance.push_to_list(@list, @status)
+ when :direct
+ FeedManager.instance.push_to_direct(@account, @status)
end
end
end
def perform
clean_home_feeds!
clean_list_feeds!
+ clean_direct_feeds!
end
private
clean_feeds!(inactive_list_ids, :list)
end
+ def clean_direct_feeds!
+ clean_feeds!(inactive_account_ids, :direct)
+ end
+
def clean_feeds!(ids, type)
reblogged_id_sets = {}