mirror of
https://github.com/discourse/discourse.git
synced 2024-11-22 11:23:25 +08:00
0784c28702
For Redis connections that operate over TLS, we need to ensure that we are setting the correct arguments for the Redis client. We can utilise the existing environment variable `DISCOURSE_REDIS_USE_SSL` to toggle this behaviour. No SSL verification is performed for two reasons: - the Discourse application will perform a verification against any FQDN as specified for the Redis host - the healthcheck is run against the _resolved_ IP address for the Redis hostname, and any SSL verification will always fail against a direct IP address If no SSL arguments are provided, the IP address is never cached against the hostname as no healthy address is ever found in the HealthyCache.
346 lines
8.0 KiB
Ruby
Executable File
346 lines
8.0 KiB
Ruby
Executable File
#!/usr/bin/env ruby
|
|
# frozen_string_literal: true
|
|
|
|
# Specifying this env var ensures ruby can load gems installed via the Discourse
|
|
# project Gemfile (e.g. pg, redis).
|
|
ENV['BUNDLE_GEMFILE'] ||= '/var/www/discourse/Gemfile'
|
|
require 'bundler/setup'
|
|
|
|
require 'ipaddr'
|
|
require 'pg'
|
|
require 'redis'
|
|
require 'resolv'
|
|
require 'socket'
|
|
require 'time'
|
|
|
|
CRITICAL_HOST_ENV_VARS = %w{
|
|
DISCOURSE_DB_HOST
|
|
DISCOURSE_DB_REPLICA_HOST
|
|
DISCOURSE_REDIS_HOST
|
|
DISCOURSE_REDIS_SLAVE_HOST
|
|
DISCOURSE_REDIS_REPLICA_HOST
|
|
}
|
|
HOST_RESOLVER_CACHE = {}
|
|
HOST_HEALTHY_CACHE = {}
|
|
HOSTS_PATH = ENV['DISCOURSE_DNS_CACHE_HOSTS_FILE'] || "/etc/hosts"
|
|
REFRESH_SECONDS = ENV['DISCOURSE_DNS_CACHE_REFRESH_SECONDS'].to_i || 30
|
|
|
|
module DNSClient
|
|
def dns_client_with_timeout
|
|
Resolv::DNS.open do |dns_client|
|
|
dns_client.timeouts = 2
|
|
yield dns_client
|
|
end
|
|
end
|
|
end
|
|
|
|
class Name
|
|
include DNSClient
|
|
|
|
def initialize(hostname)
|
|
@name = hostname
|
|
end
|
|
|
|
def resolve
|
|
dns_client_with_timeout do |dns_client|
|
|
[].tap do |addresses|
|
|
addresses.concat(dns_client.getresources(@name, Resolv::DNS::Resource::IN::A).map(&:address))
|
|
addresses.concat(dns_client.getresources(@name, Resolv::DNS::Resource::IN::AAAA).map(&:address))
|
|
end.map(&:to_s)
|
|
end
|
|
end
|
|
end
|
|
|
|
class SRVName
|
|
include DNSClient
|
|
|
|
def initialize(srv_hostname)
|
|
@name = srv_hostname
|
|
end
|
|
|
|
def resolve
|
|
dns_client_with_timeout do |dns_client|
|
|
[].tap do |addresses|
|
|
targets = dns_client.getresources(@name, Resolv::DNS::Resource::IN::SRV)
|
|
addresses.concat(targets.map { |t| Name.new(t.target.to_s).resolve }.flatten)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
CacheMeta = Struct.new(:first_seen, :last_seen)
|
|
|
|
class ResolverCache
|
|
def initialize(name)
|
|
# instance of Name|SRVName
|
|
@name = name
|
|
|
|
# {IPv4/IPv6 address: CacheMeta}
|
|
@cached = {}
|
|
end
|
|
|
|
# resolve returns a list of resolved addresses ordered by the time first seen,
|
|
# most recently seen at the head of the list.
|
|
# Addresses last seen more than 30 minutes ago are evicted from the cache on
|
|
# a call to resolve().
|
|
# If an exception occurs during DNS resolution we return whatever addresses are
|
|
# cached.
|
|
def resolve
|
|
@name.resolve.each do |address|
|
|
if @cached[address]
|
|
@cached[address].last_seen = Time.now.utc
|
|
else
|
|
@cached[address] = CacheMeta.new(Time.now.utc, Time.now.utc)
|
|
end
|
|
end
|
|
ensure
|
|
@cached = @cached.delete_if { |_, meta| Time.now.utc - 30 * 60 > meta.last_seen }
|
|
@cached.sort_by { |_, meta| meta.first_seen }.reverse.map(&:first)
|
|
end
|
|
end
|
|
|
|
class HealthyCache
|
|
def initialize(resolver_cache, check)
|
|
@resolver_cache = resolver_cache # instance of ResolverCache
|
|
@check = check # lambda function to perform for health checks
|
|
@cached = nil # a single IP address that was most recently found to be healthy
|
|
end
|
|
|
|
def first_healthy
|
|
address = @resolver_cache.resolve.lazy.select { |addr| @check.call(addr) }.first
|
|
if !nilempty(address).nil?
|
|
@cached = address
|
|
end
|
|
@cached
|
|
end
|
|
end
|
|
|
|
def redis_healthcheck(host:, password:)
|
|
client_opts = {
|
|
host: host,
|
|
password: password,
|
|
timeout: 1,
|
|
}
|
|
if !nilempty(ENV['DISCOURSE_REDIS_USE_SSL']).nil? then
|
|
client_opts[:ssl] = true
|
|
client_opts[:ssl_params] = {
|
|
verify_mode: OpenSSL::SSL::VERIFY_NONE,
|
|
}
|
|
end
|
|
client = Redis.new(**client_opts)
|
|
response = client.ping
|
|
response == "PONG"
|
|
rescue
|
|
false
|
|
ensure
|
|
client.close if client
|
|
end
|
|
|
|
def postgres_healthcheck(host:, user:, password:, dbname:)
|
|
response = PG::Connection.ping(
|
|
host: host,
|
|
user: user,
|
|
password: password,
|
|
dbname: dbname,
|
|
connect_timeout: 2, # minimum
|
|
)
|
|
response == PG::Constants::PQPING_OK
|
|
rescue
|
|
false
|
|
end
|
|
|
|
HEALTH_CHECKS = {
|
|
"DISCOURSE_DB_HOST": lambda { |addr|
|
|
postgres_healthcheck(
|
|
host: addr,
|
|
user: ENV["DISCOURSE_DB_USER_NAME"],
|
|
password: ENV["DISCOURSE_DB_PASSWORD"],
|
|
dbname: ENV["DISCOURSE_DB_NAME"])},
|
|
"DISCOURSE_DB_REPLICA_HOST": lambda { |addr|
|
|
postgres_healthcheck(
|
|
host: addr,
|
|
user: ENV["DISCOURSE_DB_USER_NAME"],
|
|
password: ENV["DISCOURSE_DB_PASSWORD"],
|
|
dbname: ENV["DISCOURSE_DB_NAME"])},
|
|
"DISCOURSE_REDIS_HOST": lambda { |addr|
|
|
redis_healthcheck(
|
|
host: addr,
|
|
password: ENV["DISCOURSE_REDIS_PASSWORD"])},
|
|
"DISCOURSE_REDIS_REPLICA_HOST": lambda { |addr|
|
|
redis_healthcheck(
|
|
host: addr,
|
|
password: ENV["DISCOURSE_REDIS_PASSWORD"])},
|
|
}
|
|
|
|
def log(msg)
|
|
STDERR.puts "#{Time.now.utc.iso8601}: #{msg}"
|
|
end
|
|
|
|
def error(msg)
|
|
log(msg)
|
|
end
|
|
|
|
def swap_address(hosts, name, ips)
|
|
new_file = []
|
|
|
|
hosts.split("\n").each do |line|
|
|
line.strip!
|
|
if line[0] != '#'
|
|
_, hostname = line.split(/\s+/)
|
|
next if hostname == name
|
|
end
|
|
new_file << line
|
|
new_file << "\n"
|
|
end
|
|
|
|
ips.each do |ip|
|
|
new_file << "#{ip} #{name} # AUTO GENERATED: #{Time.now.utc.iso8601}\n"
|
|
end
|
|
|
|
new_file.join
|
|
end
|
|
|
|
def send_counter(name, description, labels, value)
|
|
host = "localhost"
|
|
port = ENV.fetch("DISCOURSE_PROMETHEUS_COLLECTOR_PORT", 9405).to_i
|
|
|
|
if labels
|
|
labels = labels.map do |k, v|
|
|
"\"#{k}\": \"#{v}\""
|
|
end.join(",")
|
|
else
|
|
labels = ""
|
|
end
|
|
|
|
json = <<~JSON
|
|
{
|
|
"_type": "Custom",
|
|
"type": "Counter",
|
|
"name": "#{name}",
|
|
"description": "#{description}",
|
|
"labels": { #{labels} },
|
|
"value": #{value}
|
|
}
|
|
JSON
|
|
|
|
payload = +"POST /send-metrics HTTP/1.1\r\n"
|
|
payload << "Host: #{host}\r\n"
|
|
payload << "Connection: Close\r\n"
|
|
payload << "Content-Type: application/json\r\n"
|
|
payload << "Content-Length: #{json.bytesize}\r\n"
|
|
payload << "\r\n"
|
|
payload << json
|
|
|
|
socket = TCPSocket.new host, port
|
|
socket.write payload
|
|
socket.flush
|
|
result = socket.read
|
|
first_line = result.split("\n")[0]
|
|
if first_line.strip != "HTTP/1.1 200 OK"
|
|
error("Failed to report metric #{result}")
|
|
end
|
|
socket.close
|
|
rescue => e
|
|
error("Failed to send metric to Prometheus #{e}")
|
|
end
|
|
|
|
def report_success
|
|
send_counter('critical_dns_successes_total', 'critical DNS resolution success', nil, 1)
|
|
end
|
|
|
|
def report_failure(errors)
|
|
errors.each do |host, count|
|
|
send_counter('critical_dns_failures_total', 'critical DNS resolution failures', host ? { host: host } : nil, count)
|
|
end
|
|
end
|
|
|
|
def nilempty(v)
|
|
if v.nil?
|
|
nil
|
|
elsif v.respond_to?(:empty?) && v.empty?
|
|
nil
|
|
else
|
|
v
|
|
end
|
|
end
|
|
|
|
def env_srv_name(env_name)
|
|
nilempty(ENV["#{env_name}_SRV"])
|
|
end
|
|
|
|
def run(hostname_vars)
|
|
# HOSTNAME: [IP_ADDRESS, ...]
|
|
# this will usually be a single address
|
|
resolved = {}
|
|
errors = Hash.new(0)
|
|
|
|
hostname_vars.each do |var|
|
|
name = ENV[var]
|
|
HOST_RESOLVER_CACHE[var] ||= ResolverCache.new(
|
|
if (srv_name = env_srv_name(var))
|
|
SRVName.new(srv_name)
|
|
else
|
|
Name.new(name)
|
|
end
|
|
)
|
|
|
|
HOST_HEALTHY_CACHE[var] ||= HealthyCache.new(HOST_RESOLVER_CACHE[var], HEALTH_CHECKS[var.to_sym])
|
|
|
|
begin
|
|
if (address = HOST_HEALTHY_CACHE[var].first_healthy)
|
|
resolved[name] = [address]
|
|
else
|
|
error("#{var}: #{name}: no address")
|
|
errors[name] += 1
|
|
end
|
|
rescue => e
|
|
error("#{var}: #{name}: #{e}")
|
|
errors[name] += 1
|
|
end
|
|
end
|
|
|
|
hosts_content = File.read(HOSTS_PATH)
|
|
hosts = Resolv::Hosts.new(HOSTS_PATH)
|
|
|
|
changed = false
|
|
resolved.each do |hostname, ips|
|
|
if hosts.getaddresses(hostname).map(&:to_s).sort != ips.sort
|
|
log("IP addresses for #{hostname} changed to #{ips}")
|
|
hosts_content = swap_address(hosts_content, hostname, ips)
|
|
changed = true
|
|
end
|
|
end
|
|
|
|
if changed
|
|
File.write(HOSTS_PATH, hosts_content)
|
|
end
|
|
|
|
rescue => e
|
|
error("DNS lookup failed: #{e}")
|
|
errors[nil] = 1
|
|
ensure
|
|
if errors == {}
|
|
report_success
|
|
else
|
|
report_failure(errors)
|
|
end
|
|
end
|
|
|
|
# If any of the host variables are an explicit IP we will not attempt to cache
|
|
# them.
|
|
all_hostname_vars = CRITICAL_HOST_ENV_VARS.select do |name|
|
|
begin
|
|
host = ENV[name]
|
|
next if nilempty(host).nil?
|
|
IPAddr.new(host)
|
|
false
|
|
rescue IPAddr::InvalidAddressError, IPAddr::AddressFamilyError
|
|
true
|
|
end
|
|
end
|
|
|
|
while true
|
|
run(all_hostname_vars)
|
|
sleep REFRESH_SECONDS
|
|
end
|