reverseproxy: configurable active health_passes and health_fails (#6154)

* reverseproxy: active health check allows configurable health_passes and health_fails

* Need to reset counters after recovery

* rename methods to be more clear that these are coming from active health checks

* do not export methods
This commit is contained in:
Sam Ottenhoff 2024-03-20 13:13:35 -04:00 committed by GitHub
parent a9768d2fde
commit e65b97f55b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 125 additions and 9 deletions

View File

@ -350,6 +350,8 @@ func TestReverseProxyHealthCheck(t *testing.T) {
health_port 2021 health_port 2021
health_interval 10ms health_interval 10ms
health_timeout 100ms health_timeout 100ms
health_passes 1
health_fails 1
} }
} }
`, "caddyfile") `, "caddyfile")

View File

@ -69,6 +69,8 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error)
// health_uri <uri> // health_uri <uri>
// health_port <port> // health_port <port>
// health_interval <interval> // health_interval <interval>
// health_passes <num>
// health_fails <num>
// health_timeout <duration> // health_timeout <duration>
// health_status <status> // health_status <status>
// health_body <regexp> // health_body <regexp>
@ -447,6 +449,38 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
} }
h.HealthChecks.Active.ExpectBody = d.Val() h.HealthChecks.Active.ExpectBody = d.Val()
case "health_passes":
if !d.NextArg() {
return d.ArgErr()
}
if h.HealthChecks == nil {
h.HealthChecks = new(HealthChecks)
}
if h.HealthChecks.Active == nil {
h.HealthChecks.Active = new(ActiveHealthChecks)
}
passes, err := strconv.Atoi(d.Val())
if err != nil {
return d.Errf("invalid passes count '%s': %v", d.Val(), err)
}
h.HealthChecks.Active.Passes = passes
case "health_fails":
if !d.NextArg() {
return d.ArgErr()
}
if h.HealthChecks == nil {
h.HealthChecks = new(HealthChecks)
}
if h.HealthChecks.Active == nil {
h.HealthChecks.Active = new(ActiveHealthChecks)
}
fails, err := strconv.Atoi(d.Val())
if err != nil {
return d.Errf("invalid fails count '%s': %v", d.Val(), err)
}
h.HealthChecks.Active.Fails = fails
case "max_fails": case "max_fails":
if !d.NextArg() { if !d.NextArg() {
return d.ArgErr() return d.ArgErr()

View File

@ -89,6 +89,14 @@ type ActiveHealthChecks struct {
// considering it unhealthy (default 5s). // considering it unhealthy (default 5s).
Timeout caddy.Duration `json:"timeout,omitempty"` Timeout caddy.Duration `json:"timeout,omitempty"`
// Number of consecutive health check passes before marking
// a previously unhealthy backend as healthy again (default 1).
Passes int `json:"passes,omitempty"`
// Number of consecutive health check failures before marking
// a previously healthy backend as unhealthy (default 1).
Fails int `json:"fails,omitempty"`
// The maximum response body to download from the backend // The maximum response body to download from the backend
// during a health check. // during a health check.
MaxSize int64 `json:"max_size,omitempty"` MaxSize int64 `json:"max_size,omitempty"`
@ -167,6 +175,14 @@ func (a *ActiveHealthChecks) Provision(ctx caddy.Context, h *Handler) error {
} }
} }
if a.Passes < 1 {
a.Passes = 1
}
if a.Fails < 1 {
a.Fails = 1
}
return nil return nil
} }
@ -373,9 +389,37 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, upstre
} }
markUnhealthy := func() { markUnhealthy := func() {
// increment failures and then check if it has reached the threshold to mark unhealthy
err := upstream.Host.countHealthFail(1)
if err != nil {
h.HealthChecks.Active.logger.Error("could not count active health failure",
zap.String("host", upstream.Dial),
zap.Error(err))
return
}
if upstream.Host.activeHealthFails() >= h.HealthChecks.Active.Fails {
// dispatch an event that the host newly became unhealthy // dispatch an event that the host newly became unhealthy
if upstream.setHealthy(false) { if upstream.setHealthy(false) {
h.events.Emit(h.ctx, "unhealthy", map[string]any{"host": hostAddr}) h.events.Emit(h.ctx, "unhealthy", map[string]any{"host": hostAddr})
upstream.Host.resetHealth()
}
}
}
markHealthy := func() {
// increment passes and then check if it has reached the threshold to be healthy
err := upstream.Host.countHealthPass(1)
if err != nil {
h.HealthChecks.Active.logger.Error("could not count active health pass",
zap.String("host", upstream.Dial),
zap.Error(err))
return
}
if upstream.Host.activeHealthPasses() >= h.HealthChecks.Active.Passes {
if upstream.setHealthy(true) {
h.events.Emit(h.ctx, "healthy", map[string]any{"host": hostAddr})
upstream.Host.resetHealth()
}
} }
} }
@ -439,10 +483,8 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, upstre
} }
// passed health check parameters, so mark as healthy // passed health check parameters, so mark as healthy
if upstream.setHealthy(true) {
h.HealthChecks.Active.logger.Info("host is up", zap.String("host", hostAddr)) h.HealthChecks.Active.logger.Info("host is up", zap.String("host", hostAddr))
h.events.Emit(h.ctx, "healthy", map[string]any{"host": hostAddr}) markHealthy()
}
return nil return nil
} }

View File

@ -138,6 +138,8 @@ func (u *Upstream) fillHost() {
type Host struct { type Host struct {
numRequests int64 // must be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG) numRequests int64 // must be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
fails int64 fails int64
activePasses int64
activeFails int64
} }
// NumRequests returns the number of active requests to the upstream. // NumRequests returns the number of active requests to the upstream.
@ -150,6 +152,16 @@ func (h *Host) Fails() int {
return int(atomic.LoadInt64(&h.fails)) return int(atomic.LoadInt64(&h.fails))
} }
// activeHealthPasses returns the number of consecutive active health check passes with the upstream.
func (h *Host) activeHealthPasses() int {
return int(atomic.LoadInt64(&h.activePasses))
}
// activeHealthFails returns the number of consecutive active health check failures with the upstream.
func (h *Host) activeHealthFails() int {
return int(atomic.LoadInt64(&h.activeFails))
}
// countRequest mutates the active request count by // countRequest mutates the active request count by
// delta. It returns an error if the adjustment fails. // delta. It returns an error if the adjustment fails.
func (h *Host) countRequest(delta int) error { func (h *Host) countRequest(delta int) error {
@ -170,6 +182,32 @@ func (h *Host) countFail(delta int) error {
return nil return nil
} }
// countHealthPass mutates the recent passes count by
// delta. It returns an error if the adjustment fails.
func (h *Host) countHealthPass(delta int) error {
result := atomic.AddInt64(&h.activePasses, int64(delta))
if result < 0 {
return fmt.Errorf("count below 0: %d", result)
}
return nil
}
// countHealthFail mutates the recent failures count by
// delta. It returns an error if the adjustment fails.
func (h *Host) countHealthFail(delta int) error {
result := atomic.AddInt64(&h.activeFails, int64(delta))
if result < 0 {
return fmt.Errorf("count below 0: %d", result)
}
return nil
}
// resetHealth resets the health check counters.
func (h *Host) resetHealth() {
atomic.StoreInt64(&h.activePasses, 0)
atomic.StoreInt64(&h.activeFails, 0)
}
// healthy returns true if the upstream is not actively marked as unhealthy. // healthy returns true if the upstream is not actively marked as unhealthy.
// (This returns the status only from the "active" health checks.) // (This returns the status only from the "active" health checks.)
func (u *Upstream) healthy() bool { func (u *Upstream) healthy() bool {