From e65b97f55b6c5da8859fbd5e5b397cf089bf7716 Mon Sep 17 00:00:00 2001 From: Sam Ottenhoff Date: Wed, 20 Mar 2024 13:13:35 -0400 Subject: [PATCH] reverseproxy: configurable active health_passes and health_fails (#6154) * reverseproxy: active health check allows configurable health_passes and health_fails * Need to reset counters after recovery * rename methods to be more clear that these are coming from active health checks * do not export methods --- caddytest/integration/reverseproxy_test.go | 2 + modules/caddyhttp/reverseproxy/caddyfile.go | 34 +++++++++++ .../caddyhttp/reverseproxy/healthchecks.go | 56 ++++++++++++++++--- modules/caddyhttp/reverseproxy/hosts.go | 42 +++++++++++++- 4 files changed, 125 insertions(+), 9 deletions(-) diff --git a/caddytest/integration/reverseproxy_test.go b/caddytest/integration/reverseproxy_test.go index 0beb71afc..cbfe8433b 100644 --- a/caddytest/integration/reverseproxy_test.go +++ b/caddytest/integration/reverseproxy_test.go @@ -350,6 +350,8 @@ func TestReverseProxyHealthCheck(t *testing.T) { health_port 2021 health_interval 10ms health_timeout 100ms + health_passes 1 + health_fails 1 } } `, "caddyfile") diff --git a/modules/caddyhttp/reverseproxy/caddyfile.go b/modules/caddyhttp/reverseproxy/caddyfile.go index 66bbcbcd4..93cc29568 100644 --- a/modules/caddyhttp/reverseproxy/caddyfile.go +++ b/modules/caddyhttp/reverseproxy/caddyfile.go @@ -69,6 +69,8 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error) // health_uri // health_port // health_interval +// health_passes +// health_fails // health_timeout // health_status // health_body @@ -447,6 +449,38 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } h.HealthChecks.Active.ExpectBody = d.Val() + case "health_passes": + if !d.NextArg() { + return d.ArgErr() + } + if h.HealthChecks == nil { + h.HealthChecks = new(HealthChecks) + } + if h.HealthChecks.Active == nil { + h.HealthChecks.Active = new(ActiveHealthChecks) + } + passes, err := strconv.Atoi(d.Val()) + if err != nil { + return d.Errf("invalid passes count '%s': %v", d.Val(), err) + } + h.HealthChecks.Active.Passes = passes + + case "health_fails": + if !d.NextArg() { + return d.ArgErr() + } + if h.HealthChecks == nil { + h.HealthChecks = new(HealthChecks) + } + if h.HealthChecks.Active == nil { + h.HealthChecks.Active = new(ActiveHealthChecks) + } + fails, err := strconv.Atoi(d.Val()) + if err != nil { + return d.Errf("invalid fails count '%s': %v", d.Val(), err) + } + h.HealthChecks.Active.Fails = fails + case "max_fails": if !d.NextArg() { return d.ArgErr() diff --git a/modules/caddyhttp/reverseproxy/healthchecks.go b/modules/caddyhttp/reverseproxy/healthchecks.go index ad21ccb5c..507e67c88 100644 --- a/modules/caddyhttp/reverseproxy/healthchecks.go +++ b/modules/caddyhttp/reverseproxy/healthchecks.go @@ -89,6 +89,14 @@ type ActiveHealthChecks struct { // considering it unhealthy (default 5s). Timeout caddy.Duration `json:"timeout,omitempty"` + // Number of consecutive health check passes before marking + // a previously unhealthy backend as healthy again (default 1). + Passes int `json:"passes,omitempty"` + + // Number of consecutive health check failures before marking + // a previously healthy backend as unhealthy (default 1). + Fails int `json:"fails,omitempty"` + // The maximum response body to download from the backend // during a health check. MaxSize int64 `json:"max_size,omitempty"` @@ -167,6 +175,14 @@ func (a *ActiveHealthChecks) Provision(ctx caddy.Context, h *Handler) error { } } + if a.Passes < 1 { + a.Passes = 1 + } + + if a.Fails < 1 { + a.Fails = 1 + } + return nil } @@ -373,9 +389,37 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, upstre } markUnhealthy := func() { - // dispatch an event that the host newly became unhealthy - if upstream.setHealthy(false) { - h.events.Emit(h.ctx, "unhealthy", map[string]any{"host": hostAddr}) + // increment failures and then check if it has reached the threshold to mark unhealthy + err := upstream.Host.countHealthFail(1) + if err != nil { + h.HealthChecks.Active.logger.Error("could not count active health failure", + zap.String("host", upstream.Dial), + zap.Error(err)) + return + } + if upstream.Host.activeHealthFails() >= h.HealthChecks.Active.Fails { + // dispatch an event that the host newly became unhealthy + if upstream.setHealthy(false) { + h.events.Emit(h.ctx, "unhealthy", map[string]any{"host": hostAddr}) + upstream.Host.resetHealth() + } + } + } + + markHealthy := func() { + // increment passes and then check if it has reached the threshold to be healthy + err := upstream.Host.countHealthPass(1) + if err != nil { + h.HealthChecks.Active.logger.Error("could not count active health pass", + zap.String("host", upstream.Dial), + zap.Error(err)) + return + } + if upstream.Host.activeHealthPasses() >= h.HealthChecks.Active.Passes { + if upstream.setHealthy(true) { + h.events.Emit(h.ctx, "healthy", map[string]any{"host": hostAddr}) + upstream.Host.resetHealth() + } } } @@ -439,10 +483,8 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, upstre } // passed health check parameters, so mark as healthy - if upstream.setHealthy(true) { - h.HealthChecks.Active.logger.Info("host is up", zap.String("host", hostAddr)) - h.events.Emit(h.ctx, "healthy", map[string]any{"host": hostAddr}) - } + h.HealthChecks.Active.logger.Info("host is up", zap.String("host", hostAddr)) + markHealthy() return nil } diff --git a/modules/caddyhttp/reverseproxy/hosts.go b/modules/caddyhttp/reverseproxy/hosts.go index 83a39d807..be1146ac9 100644 --- a/modules/caddyhttp/reverseproxy/hosts.go +++ b/modules/caddyhttp/reverseproxy/hosts.go @@ -136,8 +136,10 @@ func (u *Upstream) fillHost() { // Host is the basic, in-memory representation of the state of a remote host. // Its fields are accessed atomically and Host values must not be copied. type Host struct { - numRequests int64 // must be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG) - fails int64 + numRequests int64 // must be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG) + fails int64 + activePasses int64 + activeFails int64 } // NumRequests returns the number of active requests to the upstream. @@ -150,6 +152,16 @@ func (h *Host) Fails() int { return int(atomic.LoadInt64(&h.fails)) } +// activeHealthPasses returns the number of consecutive active health check passes with the upstream. +func (h *Host) activeHealthPasses() int { + return int(atomic.LoadInt64(&h.activePasses)) +} + +// activeHealthFails returns the number of consecutive active health check failures with the upstream. +func (h *Host) activeHealthFails() int { + return int(atomic.LoadInt64(&h.activeFails)) +} + // countRequest mutates the active request count by // delta. It returns an error if the adjustment fails. func (h *Host) countRequest(delta int) error { @@ -170,6 +182,32 @@ func (h *Host) countFail(delta int) error { return nil } +// countHealthPass mutates the recent passes count by +// delta. It returns an error if the adjustment fails. +func (h *Host) countHealthPass(delta int) error { + result := atomic.AddInt64(&h.activePasses, int64(delta)) + if result < 0 { + return fmt.Errorf("count below 0: %d", result) + } + return nil +} + +// countHealthFail mutates the recent failures count by +// delta. It returns an error if the adjustment fails. +func (h *Host) countHealthFail(delta int) error { + result := atomic.AddInt64(&h.activeFails, int64(delta)) + if result < 0 { + return fmt.Errorf("count below 0: %d", result) + } + return nil +} + +// resetHealth resets the health check counters. +func (h *Host) resetHealth() { + atomic.StoreInt64(&h.activePasses, 0) + atomic.StoreInt64(&h.activeFails, 0) +} + // healthy returns true if the upstream is not actively marked as unhealthy. // (This returns the status only from the "active" health checks.) func (u *Upstream) healthy() bool {