mirror of
https://github.com/discourse/discourse.git
synced 2024-11-29 14:44:31 +08:00
7c3a29c9d6
* Updates GitHub Actions * Switches from `bundler/inline` to an optional group in the `Gemfile` because the previous solution didn't work well with rspec * Adds the converter framework and tests * Allows loading private converters (see README) * Switches from multiple CLI tools to a single CLI * Makes DB connections reusable and adds a new abstraction for the `IntermediateDB` * `IntermediateDB` acts as an interface for IPC calls when a converter steps runs in parallel (forks). Only the main process writes to the DB. * Includes a simple example implementation of a converter for now.
102 lines
2.4 KiB
Ruby
102 lines
2.4 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "extralite"
|
|
require "lru_redux"
|
|
|
|
module Migrations::Database
|
|
class Connection
|
|
TRANSACTION_BATCH_SIZE = 1000
|
|
PREPARED_STATEMENT_CACHE_SIZE = 5
|
|
|
|
def self.open_database(path:)
|
|
FileUtils.mkdir_p(File.dirname(path))
|
|
|
|
db = Extralite::Database.new(path)
|
|
db.pragma(
|
|
busy_timeout: 60_000, # 60 seconds
|
|
journal_mode: "wal",
|
|
synchronous: "off",
|
|
temp_store: "memory",
|
|
locking_mode: "normal",
|
|
cache_size: -10_000, # 10_000 pages
|
|
)
|
|
db
|
|
end
|
|
|
|
attr_reader :db, :path
|
|
|
|
def initialize(path:, transaction_batch_size: TRANSACTION_BATCH_SIZE)
|
|
@path = path
|
|
@transaction_batch_size = transaction_batch_size
|
|
@db = self.class.open_database(path:)
|
|
@statement_counter = 0
|
|
|
|
# don't cache too many prepared statements
|
|
@statement_cache = PreparedStatementCache.new(PREPARED_STATEMENT_CACHE_SIZE)
|
|
|
|
@fork_hooks = setup_fork_handling
|
|
end
|
|
|
|
def close
|
|
close_connection(keep_path: false)
|
|
|
|
before_hook, after_hook = @fork_hooks
|
|
::Migrations::ForkManager.remove_before_fork_hook(before_hook)
|
|
::Migrations::ForkManager.remove_after_fork_parent_hook(after_hook)
|
|
end
|
|
|
|
def closed?
|
|
!@db || @db.closed?
|
|
end
|
|
|
|
def insert(sql, parameters = [])
|
|
begin_transaction if @statement_counter == 0
|
|
|
|
stmt = @statement_cache.getset(sql) { @db.prepare(sql) }
|
|
stmt.execute(parameters)
|
|
|
|
if (@statement_counter += 1) >= @transaction_batch_size
|
|
commit_transaction
|
|
@statement_counter = 0
|
|
end
|
|
end
|
|
|
|
private
|
|
|
|
def begin_transaction
|
|
return if @db.transaction_active?
|
|
|
|
@db.execute("BEGIN DEFERRED TRANSACTION")
|
|
end
|
|
|
|
def commit_transaction
|
|
return unless @db.transaction_active?
|
|
|
|
@db.execute("COMMIT")
|
|
end
|
|
|
|
def close_connection(keep_path:)
|
|
return if !@db
|
|
|
|
commit_transaction
|
|
@statement_cache.clear
|
|
@db.close
|
|
|
|
@path = nil unless keep_path
|
|
@db = nil
|
|
@statement_counter = 0
|
|
end
|
|
|
|
def setup_fork_handling
|
|
before_hook = ::Migrations::ForkManager.before_fork { close_connection(keep_path: true) }
|
|
|
|
after_hook =
|
|
::Migrations::ForkManager.after_fork_parent do
|
|
@db = self.class.open_database(path: @path) if @path
|
|
end
|
|
|
|
[before_hook, after_hook]
|
|
end
|
|
end
|
|
end
|