mirror of
https://github.com/discourse/discourse.git
synced 2025-01-19 12:52:45 +08:00
DEV: refactor cache_critical_dns for SRV RR awareness
Modify the cache_critical_dns script for SRV RR awareness. The new
behaviour is only enabled when one or more of the following environment
variables are present (and only for a host where the `DISCOURSE_*_HOST_SRV`
variable is present):
- `DISCOURSE_DB_HOST_SRV`
- `DISCOURSE_DB_REPLICA_HOST_SRV`
- `DISCOURSE_REDIS_HOST_SRV`
- `DISCOURSE_REDIS_REPLICA_HOST_SRV`
Some minor changes in refactor to original script behaviour:
- add Name and SRVName classes for storing resolved addresses for a hostname
- pass DNS client into main run loop instead of creating inside the loop
- ensure all times are UTC
- add environment override for system hosts file path and time between DNS
checks mainly for testing purposes
The environment variable for `BUNDLE_GEMFILE` is set to enables Ruby to
load gems that are installed and vendored via the project's Gemfile.
This script is usually not run from the project directory as it is
configured as a system service (see
71ba9fb7b5/templates/cache-dns.template.yml (L19)
)
and therefore cannot load gems like `pg` or `redis` from the default
load paths. Setting this environment variable configures bundler to look
in the correct project directory during it's setup phase.
When a `DISCOURSE_*_HOST_SRV` environment variable is present, the
decision for which target to cache is as follows:
- resolve the SRV targets for the provided hostname
- lookup the addresses for all of the resolved SRV targets via the
A and AAAA RRs for the target's hostname
- perform a protocol-aware healthcheck (PostgreSQL or Redis pings)
- pick the newest target that passes the healthcheck
From there, the resolved address for the SRV target is cached against
the hostname as specified by the original form of the environment
variable.
For example: The hostname specified by the `DISCOURSE_DB_HOST` record
is `database.example.com`, and the `DISCOURSE_DB_HOST_SRV` record is
`database._postgresql._tcp.sd.example.com`. An SRV RR lookup will return
zero or more targets. Each of the targets will be queried for A and AAAA
RRs. For each of the addresses returned, the newest address that passes
a protocol-aware healthcheck will be cached. This address is cached so
that if any newer address for the SRV target appears we can perform a
health check and prefer the newer address if the check passes.
All resolved SRV targets are cached for a minimum of 30 minutes in memory
so that we can prefer newer hosts over older hosts when more than one target
is returned. Any host in the cache that hasn't been seen for more than 30
minutes is purged.
See /t/61485.
This commit is contained in:
parent
9524c1320a
commit
971409741f
|
@ -1,14 +1,17 @@
|
|||
#!/usr/bin/env ruby
|
||||
# frozen_string_literal: true
|
||||
|
||||
# Specifying this env var ensures ruby can load gems installed via the Discourse
|
||||
# project Gemfile (e.g. pg, redis).
|
||||
ENV['BUNDLE_GEMFILE'] ||= '/var/www/discourse/Gemfile'
|
||||
require 'bundler/setup'
|
||||
|
||||
require 'ipaddr'
|
||||
require 'pg'
|
||||
require 'redis'
|
||||
require 'resolv'
|
||||
require 'time'
|
||||
require 'socket'
|
||||
|
||||
REFRESH_SECONDS = 30
|
||||
|
||||
HOSTS_PATH = "/etc/hosts"
|
||||
require 'time'
|
||||
|
||||
CRITICAL_HOST_ENV_VARS = %w{
|
||||
DISCOURSE_DB_HOST
|
||||
|
@ -17,9 +20,153 @@ CRITICAL_HOST_ENV_VARS = %w{
|
|||
DISCOURSE_REDIS_SLAVE_HOST
|
||||
DISCOURSE_REDIS_REPLICA_HOST
|
||||
}
|
||||
HOST_RESOLVER_CACHE = {}
|
||||
HOST_HEALTHY_CACHE = {}
|
||||
HOSTS_PATH = ENV['DISCOURSE_DNS_CACHE_HOSTS_FILE'] || "/etc/hosts"
|
||||
REFRESH_SECONDS = ENV['DISCOURSE_DNS_CACHE_REFRESH_SECONDS'].to_i || 30
|
||||
|
||||
module DNSClient
|
||||
def dns_client_with_timeout
|
||||
Resolv::DNS.open do |dns_client|
|
||||
dns_client.timeouts = 2
|
||||
yield dns_client
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class Name
|
||||
include DNSClient
|
||||
|
||||
def initialize(hostname)
|
||||
@name = hostname
|
||||
end
|
||||
|
||||
def resolve
|
||||
dns_client_with_timeout do |dns_client|
|
||||
[].tap do |addresses|
|
||||
addresses.concat(dns_client.getresources(@name, Resolv::DNS::Resource::IN::A).map(&:address))
|
||||
addresses.concat(dns_client.getresources(@name, Resolv::DNS::Resource::IN::AAAA).map(&:address))
|
||||
end.map(&:to_s)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class SRVName
|
||||
include DNSClient
|
||||
|
||||
def initialize(srv_hostname)
|
||||
@name = srv_hostname
|
||||
end
|
||||
|
||||
def resolve
|
||||
dns_client_with_timeout do |dns_client|
|
||||
[].tap do |addresses|
|
||||
targets = dns_client.getresources(@name, Resolv::DNS::Resource::IN::SRV)
|
||||
addresses.concat(targets.map { |t| Name.new(t.target.to_s).resolve }.flatten)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
CacheMeta = Struct.new(:first_seen, :last_seen)
|
||||
|
||||
class ResolverCache
|
||||
def initialize(name)
|
||||
# instance of Name|SRVName
|
||||
@name = name
|
||||
|
||||
# {IPv4/IPv6 address: CacheMeta}
|
||||
@cached = {}
|
||||
end
|
||||
|
||||
# resolve returns a list of resolved addresses ordered by the time first seen,
|
||||
# most recently seen at the head of the list.
|
||||
# Addresses last seen more than 30 minutes ago are evicted from the cache on
|
||||
# a call to resolve().
|
||||
# If an exception occurs during DNS resolution we return whatever addresses are
|
||||
# cached.
|
||||
def resolve
|
||||
@name.resolve.each do |address|
|
||||
if @cached[address]
|
||||
@cached[address].last_seen = Time.now.utc
|
||||
else
|
||||
@cached[address] = CacheMeta.new(Time.now.utc, Time.now.utc)
|
||||
end
|
||||
end
|
||||
ensure
|
||||
@cached = @cached.delete_if { |_, meta| Time.now.utc - 30 * 60 > meta.last_seen }
|
||||
@cached.sort_by { |_, meta| meta.first_seen }.reverse.map(&:first)
|
||||
end
|
||||
end
|
||||
|
||||
class HealthyCache
|
||||
def initialize(resolver_cache, check)
|
||||
@resolver_cache = resolver_cache # instance of ResolverCache
|
||||
@check = check # lambda function to perform for health checks
|
||||
@cached = nil # a single IP address that was most recently found to be healthy
|
||||
end
|
||||
|
||||
def first_healthy
|
||||
address = @resolver_cache.resolve.lazy.select { |addr| @check.call(addr) }.first
|
||||
if !nilempty(address).nil?
|
||||
@cached = address
|
||||
end
|
||||
@cached
|
||||
end
|
||||
end
|
||||
|
||||
def redis_healthcheck(host:, password:)
|
||||
client = Redis.new(
|
||||
host: host,
|
||||
password: password,
|
||||
timeout: 1,
|
||||
)
|
||||
response = client.ping
|
||||
response == "PONG"
|
||||
rescue
|
||||
false
|
||||
ensure
|
||||
client.close if client
|
||||
end
|
||||
|
||||
def postgres_healthcheck(host:, user:, password:, dbname:)
|
||||
response = PG::Connection.ping(
|
||||
host: host,
|
||||
user: user,
|
||||
password: password,
|
||||
dbname: dbname,
|
||||
connect_timeout: 2, # minimum
|
||||
)
|
||||
response == PG::Constants::PQPING_OK
|
||||
rescue
|
||||
false
|
||||
end
|
||||
|
||||
HEALTH_CHECKS = {
|
||||
"DISCOURSE_DB_HOST": lambda { |addr|
|
||||
postgres_healthcheck(
|
||||
host: addr,
|
||||
user: ENV["DISCOURSE_DB_USER_NAME"],
|
||||
password: ENV["DISCOURSE_DB_PASSWORD"],
|
||||
dbname: ENV["DISCOURSE_DB_NAME"])},
|
||||
"DISCOURSE_DB_REPLICA_HOST": lambda { |addr|
|
||||
postgres_healthcheck(
|
||||
host: addr,
|
||||
user: ENV["DISCOURSE_DB_USER_NAME"],
|
||||
password: ENV["DISCOURSE_DB_PASSWORD"],
|
||||
dbname: ENV["DISCOURSE_DB_NAME"])},
|
||||
"DISCOURSE_REDIS_HOST": lambda { |addr|
|
||||
redis_healthcheck(
|
||||
host: addr,
|
||||
password: ENV["DISCOURSE_REDIS_PASSWORD"])},
|
||||
"DISCOURSE_REDIS_REPLICA_HOST": lambda { |addr|
|
||||
redis_healthcheck(
|
||||
host: addr,
|
||||
password: ENV["DISCOURSE_REDIS_PASSWORD"])},
|
||||
}
|
||||
|
||||
def log(msg)
|
||||
STDERR.puts "#{Time.now.iso8601}: #{msg}"
|
||||
STDERR.puts "#{Time.now.utc.iso8601}: #{msg}"
|
||||
end
|
||||
|
||||
def error(msg)
|
||||
|
@ -40,23 +187,12 @@ def swap_address(hosts, name, ips)
|
|||
end
|
||||
|
||||
ips.each do |ip|
|
||||
new_file << "#{ip} #{name} # AUTO GENERATED: #{Time.now.iso8601}\n"
|
||||
new_file << "#{ip} #{name} # AUTO GENERATED: #{Time.now.utc.iso8601}\n"
|
||||
end
|
||||
|
||||
new_file.join
|
||||
end
|
||||
|
||||
def hosts_entries(dns, name)
|
||||
host = ENV[name]
|
||||
|
||||
results = dns.getresources(host, Resolv::DNS::Resource::IN::A)
|
||||
results.concat dns.getresources(host, Resolv::DNS::Resource::IN::AAAA)
|
||||
|
||||
results.map do |result|
|
||||
"#{result.address}"
|
||||
end
|
||||
end
|
||||
|
||||
def send_counter(name, description, labels, value)
|
||||
host = "localhost"
|
||||
port = ENV.fetch("DISCOURSE_PROMETHEUS_COLLECTOR_PORT", 9405).to_i
|
||||
|
@ -111,63 +247,69 @@ def report_failure(errors)
|
|||
end
|
||||
end
|
||||
|
||||
@vars = CRITICAL_HOST_ENV_VARS.map do |name|
|
||||
begin
|
||||
host = ENV[name]
|
||||
next if !host || host.length == 0
|
||||
IPAddr.new(ENV[name])
|
||||
def nilempty(v)
|
||||
if v.nil?
|
||||
nil
|
||||
rescue IPAddr::InvalidAddressError, IPAddr::AddressFamilyError
|
||||
name
|
||||
elsif v.respond_to?(:empty?) && v.empty?
|
||||
nil
|
||||
else
|
||||
v
|
||||
end
|
||||
end.compact
|
||||
end
|
||||
|
||||
def loop
|
||||
def env_srv_name(env_name)
|
||||
nilempty(ENV["#{env_name}_SRV"])
|
||||
end
|
||||
|
||||
def run(hostname_vars)
|
||||
# HOSTNAME: [IP_ADDRESS, ...]
|
||||
# this will usually be a single address
|
||||
resolved = {}
|
||||
errors = Hash.new(0)
|
||||
|
||||
Resolv::DNS.open do |dns|
|
||||
dns.timeouts = 2
|
||||
|
||||
resolved = {}
|
||||
|
||||
hosts = @vars.each do |var|
|
||||
host = ENV[var]
|
||||
|
||||
begin
|
||||
entries = hosts_entries(dns, var)
|
||||
rescue => e
|
||||
error("Failed to resolve DNS for #{name} - #{e}")
|
||||
errors[host] += 1
|
||||
end
|
||||
|
||||
if entries&.length > 0
|
||||
resolved[host] = entries
|
||||
hostname_vars.each do |var|
|
||||
name = ENV[var]
|
||||
HOST_RESOLVER_CACHE[var] ||= ResolverCache.new(
|
||||
if (srv_name = env_srv_name(var))
|
||||
SRVName.new(srv_name)
|
||||
else
|
||||
error("Failed to find any DNS entry for #{var} : #{ENV[var]}")
|
||||
errors[host] += 1
|
||||
Name.new(name)
|
||||
end
|
||||
)
|
||||
|
||||
end
|
||||
HOST_HEALTHY_CACHE[var] ||= HealthyCache.new(HOST_RESOLVER_CACHE[var], HEALTH_CHECKS[var.to_sym])
|
||||
|
||||
hosts_content = File.read(HOSTS_PATH)
|
||||
hosts = Resolv::Hosts.new(HOSTS_PATH)
|
||||
|
||||
changed = false
|
||||
resolved.each do |name, ips|
|
||||
if hosts.getaddresses(name).map(&:to_s).sort != ips.sort
|
||||
log("IP addresses for #{name} changed to #{ips}")
|
||||
hosts_content = swap_address(hosts_content, name, ips)
|
||||
changed = true
|
||||
begin
|
||||
if (address = HOST_HEALTHY_CACHE[var].first_healthy)
|
||||
resolved[name] = [address]
|
||||
else
|
||||
error("#{var}: #{name}: no address")
|
||||
errors[name] += 1
|
||||
end
|
||||
rescue => e
|
||||
error("#{var}: #{name}: #{e}")
|
||||
errors[name] += 1
|
||||
end
|
||||
|
||||
if changed
|
||||
File.write(HOSTS_PATH, hosts_content)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
hosts_content = File.read(HOSTS_PATH)
|
||||
hosts = Resolv::Hosts.new(HOSTS_PATH)
|
||||
|
||||
changed = false
|
||||
resolved.each do |hostname, ips|
|
||||
if hosts.getaddresses(hostname).map(&:to_s).sort != ips.sort
|
||||
log("IP addresses for #{hostname} changed to #{ips}")
|
||||
hosts_content = swap_address(hosts_content, hostname, ips)
|
||||
changed = true
|
||||
end
|
||||
end
|
||||
|
||||
if changed
|
||||
File.write(HOSTS_PATH, hosts_content)
|
||||
end
|
||||
|
||||
rescue => e
|
||||
error("Failed to access DNS - #{e}")
|
||||
error("DNS lookup failed: #{e}")
|
||||
errors[nil] = 1
|
||||
ensure
|
||||
if errors == {}
|
||||
|
@ -177,7 +319,20 @@ ensure
|
|||
end
|
||||
end
|
||||
|
||||
# If any of the host variables are an explicit IP we will not attempt to cache
|
||||
# them.
|
||||
all_hostname_vars = CRITICAL_HOST_ENV_VARS.select do |name|
|
||||
begin
|
||||
host = ENV[name]
|
||||
next if nilempty(host).nil?
|
||||
IPAddr.new(host)
|
||||
false
|
||||
rescue IPAddr::InvalidAddressError, IPAddr::AddressFamilyError
|
||||
true
|
||||
end
|
||||
end
|
||||
|
||||
while true
|
||||
loop
|
||||
run(all_hostname_vars)
|
||||
sleep REFRESH_SECONDS
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue
Block a user