]> cat aescling's git repositories - mastodon.git/commitdiff
Change algorithm of `tootctl search deploy` to improve performance (#18463)
authorEugen Rochko <eugen@zeonfederated.com>
Sun, 22 May 2022 20:16:43 +0000 (22:16 +0200)
committersingle-right-quote <11325618-aescling@users.noreply.gitlab.com>
Fri, 27 May 2022 03:54:56 +0000 (23:54 -0400)
app/chewy/accounts_index.rb
app/chewy/statuses_index.rb
app/chewy/tags_index.rb
app/lib/importer/accounts_index_importer.rb [new file with mode: 0644]
app/lib/importer/base_importer.rb [new file with mode: 0644]
app/lib/importer/statuses_index_importer.rb [new file with mode: 0644]
app/lib/importer/tags_index_importer.rb [new file with mode: 0644]
app/models/trends/history.rb
lib/mastodon/search_cli.rb

index 763958a3f95363a1b8a0cfb00314ffb6dd1b03ff..e38e14a10699ad0bc9c48ad619faa0dd5a06c09d 100644 (file)
@@ -23,7 +23,7 @@ class AccountsIndex < Chewy::Index
     },
   }
 
-  index_scope ::Account.searchable.includes(:account_stat), delete_if: ->(account) { account.destroyed? || !account.searchable? }
+  index_scope ::Account.searchable.includes(:account_stat)
 
   root date_detection: false do
     field :id, type: 'long'
@@ -36,8 +36,8 @@ class AccountsIndex < Chewy::Index
       field :edge_ngram, type: 'text', analyzer: 'edge_ngram', search_analyzer: 'content'
     end
 
-    field :following_count, type: 'long', value: ->(account) { account.following.local.count }
-    field :followers_count, type: 'long', value: ->(account) { account.followers.local.count }
+    field :following_count, type: 'long', value: ->(account) { account.following_count }
+    field :followers_count, type: 'long', value: ->(account) { account.followers_count }
     field :last_status_at, type: 'date', value: ->(account) { account.last_status_at || account.created_at }
   end
 end
index c20009879972db0a6b7d3330557e83bd26fff045..6dd4fb18b024dc204df2b4e2f722e2d6e6166fd7 100644 (file)
@@ -33,6 +33,8 @@ class StatusesIndex < Chewy::Index
     },
   }
 
+  # We do not use delete_if option here because it would call a method that we
+  # expect to be called with crutches without crutches, causing n+1 queries
   index_scope ::Status.unscoped.kept.without_reblogs.includes(:media_attachments, :preloadable_poll)
 
   crutch :mentions do |collection|
index a5b139bcaaceeb96e260351c0923888c1a901f37..df3d9e4cce2920e90a88f4b7e7acf186ed9f2731 100644 (file)
@@ -23,7 +23,11 @@ class TagsIndex < Chewy::Index
     },
   }
 
-  index_scope ::Tag.listable, delete_if: ->(tag) { tag.destroyed? || !tag.listable? }
+  index_scope ::Tag.listable
+
+  crutch :time_period do
+    7.days.ago.to_date..0.days.ago.to_date
+  end
 
   root date_detection: false do
     field :name, type: 'text', analyzer: 'content' do
@@ -31,7 +35,7 @@ class TagsIndex < Chewy::Index
     end
 
     field :reviewed, type: 'boolean', value: ->(tag) { tag.reviewed? }
-    field :usage, type: 'long', value: ->(tag) { tag.history.reduce(0) { |total, day| total + day.accounts } }
+    field :usage, type: 'long', value: ->(tag, crutches) { tag.history.aggregate(crutches.time_period).accounts }
     field :last_status_at, type: 'date', value: ->(tag) { tag.last_status_at || tag.created_at }
   end
 end
diff --git a/app/lib/importer/accounts_index_importer.rb b/app/lib/importer/accounts_index_importer.rb
new file mode 100644 (file)
index 0000000..792a31b
--- /dev/null
@@ -0,0 +1,30 @@
+# frozen_string_literal: true
+
+class Importer::AccountsIndexImporter < Importer::BaseImporter
+  def import!
+    scope.includes(:account_stat).find_in_batches(batch_size: @batch_size) do |tmp|
+      in_work_unit(tmp) do |accounts|
+        bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: accounts).bulk_body
+
+        indexed = bulk.select { |entry| entry[:index] }.size
+        deleted = bulk.select { |entry| entry[:delete] }.size
+
+        Chewy::Index::Import::BulkRequest.new(index).perform(bulk)
+
+        [indexed, deleted]
+      end
+    end
+
+    wait!
+  end
+
+  private
+
+  def index
+    AccountsIndex
+  end
+
+  def scope
+    Account.searchable
+  end
+end
diff --git a/app/lib/importer/base_importer.rb b/app/lib/importer/base_importer.rb
new file mode 100644 (file)
index 0000000..ea522c6
--- /dev/null
@@ -0,0 +1,87 @@
+# frozen_string_literal: true
+
+class Importer::BaseImporter
+  # @param [Integer] batch_size
+  # @param [Concurrent::ThreadPoolExecutor] executor
+  def initialize(batch_size:, executor:)
+    @batch_size = batch_size
+    @executor   = executor
+    @wait_for   = Concurrent::Set.new
+  end
+
+  # Callback to run when a concurrent work unit completes
+  # @param [Proc]
+  def on_progress(&block)
+    @on_progress = block
+  end
+
+  # Callback to run when a concurrent work unit fails
+  # @param [Proc]
+  def on_failure(&block)
+    @on_failure = block
+  end
+
+  # Reduce resource usage during and improve speed of indexing
+  def optimize_for_import!
+    Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: -1 } }
+  end
+
+  # Restore original index settings
+  def optimize_for_search!
+    Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: index.settings_hash[:settings][:index][:refresh_interval] } }
+  end
+
+  # Estimate the amount of documents that would be indexed. Not exact!
+  # @returns [Integer]
+  def estimate!
+    ActiveRecord::Base.connection_pool.with_connection { |connection| connection.select_one("SELECT reltuples AS estimate FROM pg_class WHERE relname = '#{index.adapter.target.table_name}'")['estimate'].to_i }
+  end
+
+  # Import data from the database into the index
+  def import!
+    raise NotImplementedError
+  end
+
+  # Remove documents from the index that no longer exist in the database
+  def clean_up!
+    index.scroll_batches do |documents|
+      ids           = documents.map { |doc| doc['_id'] }
+      existence_map = index.adapter.target.where(id: ids).pluck(:id).each_with_object({}) { |id, map| map[id.to_s] = true }
+      tmp           = ids.reject { |id| existence_map[id] }
+
+      next if tmp.empty?
+
+      in_work_unit(tmp) do |deleted_ids|
+        bulk = Chewy::Index::Import::BulkBuilder.new(index, delete: deleted_ids).bulk_body
+
+        Chewy::Index::Import::BulkRequest.new(index).perform(bulk)
+
+        [0, bulk.size]
+      end
+    end
+
+    wait!
+  end
+
+  protected
+
+  def in_work_unit(*args, &block)
+    work_unit = Concurrent::Promises.future_on(@executor, *args, &block)
+
+    work_unit.on_fulfillment!(&@on_progress)
+    work_unit.on_rejection!(&@on_failure)
+    work_unit.on_resolution! { @wait_for.delete(work_unit) }
+
+    @wait_for << work_unit
+  rescue Concurrent::RejectedExecutionError
+    sleep(0.1) && retry # Backpressure
+  end
+
+  def wait!
+    Concurrent::Promises.zip(*@wait_for).wait
+  end
+
+  def index
+    raise NotImplementedError
+  end
+end
diff --git a/app/lib/importer/statuses_index_importer.rb b/app/lib/importer/statuses_index_importer.rb
new file mode 100644 (file)
index 0000000..7c65325
--- /dev/null
@@ -0,0 +1,89 @@
+# frozen_string_literal: true
+
+class Importer::StatusesIndexImporter < Importer::BaseImporter
+  def import!
+    # The idea is that instead of iterating over all statuses in the database
+    # and calculating the searchable_by for each of them (majority of which
+    # would be empty), we approach the index from the other end
+
+    scopes.each do |scope|
+      # We could be tempted to keep track of status IDs we have already processed
+      # from a different scope to avoid indexing them multiple times, but that
+      # could end up being a very large array
+
+      scope.find_in_batches(batch_size: @batch_size) do |tmp|
+        in_work_unit(tmp.map(&:status_id)) do |status_ids|
+          bulk = ActiveRecord::Base.connection_pool.with_connection do
+            Chewy::Index::Import::BulkBuilder.new(index, to_index: Status.includes(:media_attachments, :preloadable_poll).where(id: status_ids)).bulk_body
+          end
+
+          indexed = 0
+          deleted = 0
+
+          # We can't use the delete_if proc to do the filtering because delete_if
+          # is called before rendering the data and we need to filter based
+          # on the results of the filter, so this filtering happens here instead
+          bulk.map! do |entry|
+            new_entry = begin
+              if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank?
+                { delete: entry[:index].except(:data) }
+              else
+                entry
+              end
+            end
+
+            if new_entry[:index]
+              indexed += 1
+            else
+              deleted += 1
+            end
+
+            new_entry
+          end
+
+          Chewy::Index::Import::BulkRequest.new(index).perform(bulk)
+
+          [indexed, deleted]
+        end
+      end
+    end
+
+    wait!
+  end
+
+  private
+
+  def index
+    StatusesIndex
+  end
+
+  def scopes
+    [
+      local_statuses_scope,
+      local_mentions_scope,
+      local_favourites_scope,
+      local_votes_scope,
+      local_bookmarks_scope,
+    ]
+  end
+
+  def local_mentions_scope
+    Mention.where(account: Account.local, silent: false).select(:id, :status_id)
+  end
+
+  def local_favourites_scope
+    Favourite.where(account: Account.local).select(:id, :status_id)
+  end
+
+  def local_bookmarks_scope
+    Bookmark.select(:id, :status_id)
+  end
+
+  def local_votes_scope
+    Poll.joins(:votes).where(votes: { account: Account.local }).select('polls.id, polls.status_id')
+  end
+
+  def local_statuses_scope
+    Status.local.select('id, coalesce(reblog_of_id, id) as status_id')
+  end
+end
diff --git a/app/lib/importer/tags_index_importer.rb b/app/lib/importer/tags_index_importer.rb
new file mode 100644 (file)
index 0000000..f5bd8f0
--- /dev/null
@@ -0,0 +1,26 @@
+# frozen_string_literal: true
+
+class Importer::TagsIndexImporter < Importer::BaseImporter
+  def import!
+    index.adapter.default_scope.find_in_batches(batch_size: @batch_size) do |tmp|
+      in_work_unit(tmp) do |tags|
+        bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: tags).bulk_body
+
+        indexed = bulk.select { |entry| entry[:index] }.size
+        deleted = bulk.select { |entry| entry[:delete] }.size
+
+        Chewy::Index::Import::BulkRequest.new(index).perform(bulk)
+
+        [indexed, deleted]
+      end
+    end
+
+    wait!
+  end
+
+  private
+
+  def index
+    TagsIndex
+  end
+end
index 608e337924d469db0aa48f1b7bc818fb02db3a8b..74723e35c99c963152b8f206d574f033f309392b 100644 (file)
@@ -11,11 +11,11 @@ class Trends::History
     end
 
     def uses
-      redis.mget(*@days.map { |day| day.key_for(:uses) }).map(&:to_i).sum
+      with_redis { |redis| redis.mget(*@days.map { |day| day.key_for(:uses) }).map(&:to_i).sum }
     end
 
     def accounts
-      redis.pfcount(*@days.map { |day| day.key_for(:accounts) })
+      with_redis { |redis| redis.pfcount(*@days.map { |day| day.key_for(:accounts) }) }
     end
   end
 
@@ -33,19 +33,21 @@ class Trends::History
     attr_reader :day
 
     def accounts
-      redis.pfcount(key_for(:accounts))
+      with_redis { |redis| redis.pfcount(key_for(:accounts)) }
     end
 
     def uses
-      redis.get(key_for(:uses))&.to_i || 0
+      with_redis { |redis| redis.get(key_for(:uses))&.to_i || 0 }
     end
 
     def add(account_id)
-      redis.pipelined do
-        redis.incrby(key_for(:uses), 1)
-        redis.pfadd(key_for(:accounts), account_id)
-        redis.expire(key_for(:uses), EXPIRE_AFTER)
-        redis.expire(key_for(:accounts), EXPIRE_AFTER)
+      with_redis do |redis|
+        redis.pipelined do |pipeline|
+          pipeline.incrby(key_for(:uses), 1)
+          pipeline.pfadd(key_for(:accounts), account_id)
+          pipeline.expire(key_for(:uses), EXPIRE_AFTER)
+          pipeline.expire(key_for(:accounts), EXPIRE_AFTER)
+        end
       end
     end
 
index 74f980ba11bc321c861e5d0592677e94c9f0dbba..b579ebc143477061794b93add25bdb76543855bc 100644 (file)
@@ -16,19 +16,21 @@ module Mastodon
       StatusesIndex,
     ].freeze
 
-    option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads'
-    option :batch_size, type: :numeric, default: 1_000, aliases: [:b], desc: 'Number of records in each batch'
+    option :concurrency, type: :numeric, default: 5, aliases: [:c], desc: 'Workload will be split between this number of threads'
+    option :batch_size, type: :numeric, default: 100, aliases: [:b], desc: 'Number of records in each batch'
     option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices'
+    option :import, type: :boolean, default: true, desc: 'Import data from the database to the index'
+    option :clean, type: :boolean, default: true, desc: 'Remove outdated documents from the index'
     desc 'deploy', 'Create or upgrade Elasticsearch indices and populate them'
     long_desc <<~LONG_DESC
       If Elasticsearch is empty, this command will create the necessary indices
       and then import data from the database into those indices.
 
       This command will also upgrade indices if the underlying schema has been
-      changed since the last run.
+      changed since the last run. Index upgrades erase index data.
 
       Even if creating or upgrading indices is not necessary, data from the
-      database will be imported into the indices.
+      database will be imported into the indices, unless overriden with --no-import.
     LONG_DESC
     def deploy
       if options[:concurrency] < 1
@@ -49,7 +51,9 @@ module Mastodon
         end
       end
 
-      progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)
+      pool      = Concurrent::FixedThreadPool.new(options[:concurrency], max_queue: options[:concurrency] * 10)
+      importers = indices.index_with { |index| "Importer::#{index.name}Importer".constantize.new(batch_size: options[:batch_size], executor: pool) }
+      progress  = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)
 
       # First, ensure all indices are created and have the correct
       # structure, so that live data can already be written
@@ -59,99 +63,46 @@ module Mastodon
         index.specification.lock!
       end
 
+      progress.title = 'Estimating workload '
+      progress.total = indices.sum { |index| importers[index].estimate! }
+
       reset_connection_pools!
 
-      pool    = Concurrent::FixedThreadPool.new(options[:concurrency])
-      added   = Concurrent::AtomicFixnum.new(0)
-      removed = Concurrent::AtomicFixnum.new(0)
+      added   = 0
+      removed = 0
 
-      progress.title = 'Estimating workload '
+      indices.each do |index|
+        importer = importers[index]
+        importer.optimize_for_import!
+
+        importer.on_progress do |(indexed, deleted)|
+          progress.total = nil if progress.progress + indexed + deleted > progress.total
+          progress.progress += indexed + deleted
+          added   += indexed
+          removed += deleted
+        end
 
-      # Estimate the amount of data that has to be imported first
-      progress.total = indices.sum { |index| index.adapter.default_scope.count }
+        importer.on_failure do |reason|
+          progress.log(pastel.red("Error while importing #{index}: #{reason}"))
+        end
 
-      # Now import all the actual data. Mind that unlike chewy:sync, we don't
-      # fetch and compare all record IDs from the database and the index to
-      # find out which to add and which to remove from the index. Because with
-      # potentially millions of rows, the memory footprint of such a calculation
-      # is uneconomical. So we only ever add.
-      indices.each do |index|
-        progress.title = "Importing #{index} "
-        batch_size     = options[:batch_size]
-        slice_size     = (batch_size / options[:concurrency]).ceil
-
-        index.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch|
-          futures = []
-
-          batch.each_slice(slice_size) do |records|
-            futures << Concurrent::Future.execute(executor: pool) do
-              begin
-                if !progress.total.nil? && progress.progress + records.size > progress.total
-                  # The number of items has changed between start and now,
-                  # since there is no good way to predict the final count from
-                  # here, just change the progress bar to an indeterminate one
-
-                  progress.total = nil
-                end
-
-                grouped_records = nil
-                bulk_body       = nil
-                index_count     = 0
-                delete_count    = 0
-
-                ActiveRecord::Base.connection_pool.with_connection do
-                  grouped_records = records.to_a.group_by do |record|
-                    index.adapter.send(:delete_from_index?, record) ? :delete : :to_index
-                  end
-
-                  bulk_body = Chewy::Index::Import::BulkBuilder.new(index, **grouped_records).bulk_body
-                end
-
-                index_count  = grouped_records[:to_index].size  if grouped_records.key?(:to_index)
-                delete_count = grouped_records[:delete].size    if grouped_records.key?(:delete)
-
-                # The following is an optimization for statuses specifically, since
-                # we want to de-index statuses that cannot be searched by anybody,
-                # but can't use Chewy's delete_if logic because it doesn't use
-                # crutches and our searchable_by logic depends on them
-                if index == StatusesIndex
-                  bulk_body.map! do |entry|
-                    if entry[:to_index] && entry.dig(:to_index, :data, 'searchable_by').blank?
-                      index_count  -= 1
-                      delete_count += 1
-
-                      { delete: entry[:to_index].except(:data) }
-                    else
-                      entry
-                    end
-                  end
-                end
-
-                Chewy::Index::Import::BulkRequest.new(index).perform(bulk_body)
-
-                progress.progress += records.size
-
-                added.increment(index_count)
-                removed.increment(delete_count)
-
-                sleep 1
-              rescue => e
-                progress.log pastel.red("Error importing #{index}: #{e}")
-              ensure
-                RedisConfiguration.pool.checkin if Thread.current[:redis]
-                Thread.current[:redis] = nil
-              end
-            end
-          end
-
-          futures.map(&:value)
+        if options[:import]
+          progress.title = "Importing #{index} "
+          importer.import!
+        end
+
+        if options[:clean]
+          progress.title = "Cleaning #{index} "
+          importer.clean_up!
         end
+      ensure
+        importer.optimize_for_search!
       end
 
-      progress.title = ''
-      progress.stop
+      progress.title = 'Done! '
+      progress.finish
 
-      say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true)
+      say("Indexed #{added} records, de-indexed #{removed}", :green, true)
     end
   end
 end