BACKGROUND_MIGRATION_BATCH_SIZE = 1000 # Number of rows to process per job
BACKGROUND_MIGRATION_JOB_BUFFER_SIZE = 1000 # Number of jobs to bulk queue at a time
-
+
# Gets an estimated number of rows for a table
def estimate_rows_in_table(table_name)
exec_query('SELECT reltuples FROM pg_class WHERE relname = ' +
end
table = Arel::Table.new(table_name)
-
+
total = estimate_rows_in_table(table_name).to_i
if total == 0
count_arel = table.project(Arel.star.count.as('count'))
count_arel = yield table, count_arel if block_given?
-
+
total = exec_query(count_arel.to_sql).to_hash.first['count'].to_i
-
+
return if total == 0
end
# In case there are no rows but we didn't catch it in the estimated size:
return unless first_row
start_id = first_row['id'].to_i
-
+
say "Migrating #{table_name}.#{column} (~#{total.to_i} rows)"
started_time = Time.now
migrated = 0
loop do
stop_row = nil
-
+
suppress_messages do
stop_arel = table.project(table[:id])
.where(table[:id].gteq(start_id))
execute(update_arel.to_sql)
end
-
+
migrated += batch_size
if Time.now - last_time > 1
status = "Migrated #{migrated} rows"
-
+
percentage = 100.0 * migrated / total
status += " (~#{sprintf('%.2f', percentage)}%, "
-
+
remaining_time = (100.0 - percentage) * (Time.now - started_time) / percentage
-
+
status += "#{(remaining_time / 60).to_i}:"
status += sprintf('%02d', remaining_time.to_i % 60)
status += ' remaining, '
-
+
# Tell users not to interrupt if we're almost done.
if remaining_time > 10
status += 'safe to interrupt'
else
status += 'DO NOT interrupt'
end
-
+
status += ')'
-
+
say status, true
last_time = Time.now
end
check_trigger_permissions!(table)
trigger_name = rename_trigger_name(table, old, new)
-
+
# If we were in the middle of update_column_in_batches, we should remove
# the old column and start over, as we have no idea where we were.
if column_for(table, new)
else
remove_rename_triggers_for_mysql(trigger_name)
end
-
+
remove_column(table, new)
end
temp_column = rename_column_name(column)
rename_column_concurrently(table, column, temp_column, type: new_type)
-
+
# Primary keys don't necessarily have an associated index.
if ActiveRecord::Base.get_primary_key(table) == column.to_s
old_pk_index_name = "index_#{table}_on_#{column}"
new_pk_index_name = "index_#{table}_on_#{column}_cm"
-
+
unless indexes_for(table, column).find{|i| i.name == old_pk_index_name}
add_concurrent_index(table, [temp_column], {
unique: true,
# Wait for the indices to be built
indexes_for(table, column).each do |index|
expected_name = index.name + '_cm'
-
+
puts "Waiting for index #{expected_name}"
sleep 1 until indexes_for(table, temp_column).find {|i| i.name == expected_name }
end
-
+
was_primary = (ActiveRecord::Base.get_primary_key(table) == column.to_s)
old_default_fn = column_for(table, column).default_function
-
+
old_fks = []
if was_primary
# Get any foreign keys pointing at this column we need to recreate, and
target_col: temp_column,
on_delete: extract_foreign_key_action(old_fk['on_delete'])
)
-
+
remove_foreign_key(old_fk['src_table'], name: old_fk['name'])
end
end
transaction do
# This has to be performed in a transaction as otherwise we might have
# inconsistent data.
-
+
cleanup_concurrent_column_rename(table, column, temp_column)
rename_column(table, temp_column, column)
-
+
# If there was an old default function, we didn't copy it. Do that now
# in the transaction, so we don't miss anything.
change_column_default(table, column, -> { old_default_fn }) if old_default_fn
end
-
+
# Rename any indices back to what they should be.
indexes_for(table, column).each do |index|
next unless index.name.end_with?('_cm')
real_index_name = index.name.sub(/_cm$/, '')
rename_index(table, index.name, real_index_name)
end
-
+
# Rename any foreign keys back to names based on the real column.
foreign_keys_for(table, column).each do |fk|
old_fk_name = concurrent_foreign_key_name(fk.from_table, temp_column, 'id')
execute("ALTER TABLE #{fk.from_table} RENAME CONSTRAINT " +
"#{old_fk_name} TO #{new_fk_name}")
end
-
+
# Rename any foreign keys from other tables to names based on the real
# column.
old_fks.each do |old_fk|
execute("ALTER TABLE #{old_fk['src_table']} RENAME CONSTRAINT " +
"#{old_fk_name} TO #{new_fk_name}")
end
-
+
# If the old column was a primary key, mark the new one as a primary key.
if was_primary
execute("ALTER TABLE #{table} ADD PRIMARY KEY USING INDEX " +
# This is necessary as we can't properly rename indexes such as
# "ci_taggings_idx".
name = index.name + '_cm'
-
+
# If the order contained the old column, map it to the new one.
order = index.orders
if order.key?(old)