reverse_proxy: Implement active health checks

This commit is contained in:
Matthew Holt 2019-09-03 12:10:11 -06:00
parent 026df7c5cb
commit ccfb12347b
No known key found for this signature in database
GPG Key ID: 2A349DD577D586A5
3 changed files with 309 additions and 74 deletions

View File

@ -0,0 +1,220 @@
// Copyright 2015 Matthew Holt and The Caddy Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package reverseproxy
import (
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"regexp"
"strconv"
"time"
"github.com/caddyserver/caddy/v2"
"github.com/caddyserver/caddy/v2/modules/caddyhttp"
)
type HealthChecks struct {
Active *ActiveHealthChecks `json:"active,omitempty"`
Passive *PassiveHealthChecks `json:"passive,omitempty"`
}
type ActiveHealthChecks struct {
Path string `json:"path,omitempty"`
Port int `json:"port,omitempty"`
Interval caddy.Duration `json:"interval,omitempty"`
Timeout caddy.Duration `json:"timeout,omitempty"`
MaxSize int64 `json:"max_size,omitempty"`
ExpectStatus int `json:"expect_status,omitempty"`
ExpectBody string `json:"expect_body,omitempty"`
stopChan chan struct{}
httpClient *http.Client
bodyRegexp *regexp.Regexp
}
type PassiveHealthChecks struct {
MaxFails int `json:"max_fails,omitempty"`
FailDuration caddy.Duration `json:"fail_duration,omitempty"`
UnhealthyRequestCount int `json:"unhealthy_request_count,omitempty"`
UnhealthyStatus []int `json:"unhealthy_status,omitempty"`
UnhealthyLatency caddy.Duration `json:"unhealthy_latency,omitempty"`
}
func (h *Handler) activeHealthChecker() {
ticker := time.NewTicker(time.Duration(h.HealthChecks.Active.Interval))
h.doActiveHealthChecksForAllHosts()
for {
select {
case <-ticker.C:
h.doActiveHealthChecksForAllHosts()
case <-h.HealthChecks.Active.stopChan:
ticker.Stop()
return
}
}
}
func (h *Handler) doActiveHealthChecksForAllHosts() {
hosts.Range(func(key, value interface{}) bool {
addr := key.(string)
host := value.(Host)
go func(addr string, host Host) {
err := h.doActiveHealthCheck(addr, host)
if err != nil {
log.Printf("[ERROR] reverse_proxy: active health check for host %s: %v", addr, err)
}
}(addr, host)
// continue to iterate all hosts
return true
})
}
// doActiveHealthCheck performs a health check to host which
// can be reached at address hostAddr. The actual address for
// the request will be built according to active health checker
// config. The health status of the host will be updated
// according to whether it passes the health check. An error is
// returned only if the health check fails to occur or if marking
// the host's health status fails.
func (h *Handler) doActiveHealthCheck(hostAddr string, host Host) error {
// create the URL for the health check
u, err := url.Parse(hostAddr)
if err != nil {
return err
}
if h.HealthChecks.Active.Path != "" {
u.Path = h.HealthChecks.Active.Path
}
if h.HealthChecks.Active.Port != 0 {
portStr := strconv.Itoa(h.HealthChecks.Active.Port)
u.Host = net.JoinHostPort(u.Hostname(), portStr)
}
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return err
}
// do the request, careful to tame the response body
resp, err := h.HealthChecks.Active.httpClient.Do(req)
if err != nil {
log.Printf("[INFO] reverse_proxy: active health check: %s is down (HTTP request failed: %v)", hostAddr, err)
_, err2 := host.SetHealthy(false)
if err2 != nil {
return fmt.Errorf("marking unhealthy: %v", err2)
}
return nil
}
var body io.Reader = resp.Body
if h.HealthChecks.Active.MaxSize > 0 {
body = io.LimitReader(body, h.HealthChecks.Active.MaxSize)
}
defer func() {
// drain any remaining body so connection can be re-used
io.Copy(ioutil.Discard, body)
resp.Body.Close()
}()
// if status code is outside criteria, mark down
if h.HealthChecks.Active.ExpectStatus > 0 {
if !caddyhttp.StatusCodeMatches(resp.StatusCode, h.HealthChecks.Active.ExpectStatus) {
log.Printf("[INFO] reverse_proxy: active health check: %s is down (status code %d unexpected)", hostAddr, resp.StatusCode)
_, err := host.SetHealthy(false)
if err != nil {
return fmt.Errorf("marking unhealthy: %v", err)
}
return nil
}
} else if resp.StatusCode < 200 || resp.StatusCode >= 400 {
log.Printf("[INFO] reverse_proxy: active health check: %s is down (status code %d out of tolerances)", hostAddr, resp.StatusCode)
_, err := host.SetHealthy(false)
if err != nil {
return fmt.Errorf("marking unhealthy: %v", err)
}
return nil
}
// if body does not match regex, mark down
if h.HealthChecks.Active.bodyRegexp != nil {
bodyBytes, err := ioutil.ReadAll(body)
if err != nil {
log.Printf("[INFO] reverse_proxy: active health check: %s is down (failed to read response body)", hostAddr)
_, err := host.SetHealthy(false)
if err != nil {
return fmt.Errorf("marking unhealthy: %v", err)
}
return nil
}
if !h.HealthChecks.Active.bodyRegexp.Match(bodyBytes) {
log.Printf("[INFO] reverse_proxy: active health check: %s is down (response body failed expectations)", hostAddr)
_, err := host.SetHealthy(false)
if err != nil {
return fmt.Errorf("marking unhealthy: %v", err)
}
return nil
}
}
// passed health check parameters, so mark as healthy
swapped, err := host.SetHealthy(true)
if swapped {
log.Printf("[INFO] reverse_proxy: active health check: %s is back up", hostAddr)
}
if err != nil {
return fmt.Errorf("marking healthy: %v", err)
}
return nil
}
// countFailure is used with passive health checks. It
// remembers 1 failure for upstream for the configured
// duration. If passive health checks are disabled or
// failure expiry is 0, this is a no-op.
func (h Handler) countFailure(upstream *Upstream) {
// only count failures if passive health checking is enabled
// and if failures are configured have a non-zero expiry
if h.HealthChecks == nil || h.HealthChecks.Passive == nil {
return
}
failDuration := time.Duration(h.HealthChecks.Passive.FailDuration)
if failDuration == 0 {
return
}
// count failure immediately
err := upstream.Host.CountFail(1)
if err != nil {
log.Printf("[ERROR] proxy: upstream %s: counting failure: %v",
upstream.hostURL, err)
}
// forget it later
go func(host Host, failDuration time.Duration) {
time.Sleep(failDuration)
err := host.CountFail(-1)
if err != nil {
log.Printf("[ERROR] proxy: upstream %s: expiring failure: %v",
upstream.hostURL, err)
}
}(upstream.Host, failDuration)
}

View File

@ -15,15 +15,14 @@
package reverseproxy package reverseproxy
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"log"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"regexp"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -90,11 +89,41 @@ func (h *Handler) Provision(ctx caddy.Context) error {
if h.LoadBalancing.TryDuration > 0 && h.LoadBalancing.TryInterval == 0 { if h.LoadBalancing.TryDuration > 0 && h.LoadBalancing.TryInterval == 0 {
// a non-zero try_duration with a zero try_interval // a non-zero try_duration with a zero try_interval
// will always spin the CPU for try_duration if the // will always spin the CPU for try_duration if the
// upstream is local or low-latency; default to some // upstream is local or low-latency; avoid that by
// sane waiting period before try attempts // defaulting to a sane wait period between attempts
h.LoadBalancing.TryInterval = caddy.Duration(250 * time.Millisecond) h.LoadBalancing.TryInterval = caddy.Duration(250 * time.Millisecond)
} }
// if active health checks are enabled, configure them and start a worker
if h.HealthChecks != nil &&
h.HealthChecks.Active != nil &&
(h.HealthChecks.Active.Path != "" || h.HealthChecks.Active.Port != 0) {
timeout := time.Duration(h.HealthChecks.Active.Timeout)
if timeout == 0 {
timeout = 10 * time.Second
}
h.HealthChecks.Active.stopChan = make(chan struct{})
h.HealthChecks.Active.httpClient = &http.Client{
Timeout: timeout,
Transport: h.Transport,
}
if h.HealthChecks.Active.Interval == 0 {
h.HealthChecks.Active.Interval = caddy.Duration(30 * time.Second)
}
if h.HealthChecks.Active.ExpectBody != "" {
var err error
h.HealthChecks.Active.bodyRegexp, err = regexp.Compile(h.HealthChecks.Active.ExpectBody)
if err != nil {
return fmt.Errorf("expect_body: compiling regular expression: %v", err)
}
}
go h.activeHealthChecker()
}
for _, upstream := range h.Upstreams { for _, upstream := range h.Upstreams {
// url parser requires a scheme // url parser requires a scheme
if !strings.Contains(upstream.Address, "://") { if !strings.Contains(upstream.Address, "://") {
@ -130,8 +159,6 @@ func (h *Handler) Provision(ctx caddy.Context) error {
upstream.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount upstream.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount
} }
// TODO: active health checks
if h.HealthChecks != nil { if h.HealthChecks != nil {
// upstreams need independent access to the passive // upstreams need independent access to the passive
// health check policy so they can, you know, passively // health check policy so they can, you know, passively
@ -143,11 +170,20 @@ func (h *Handler) Provision(ctx caddy.Context) error {
return nil return nil
} }
// Cleanup cleans up the resources made by h during provisioning.
func (h *Handler) Cleanup() error { func (h *Handler) Cleanup() error {
// TODO: finish this up, make sure it takes care of any active health checkers or whatever // stop the active health checker
if h.HealthChecks != nil &&
h.HealthChecks.Active != nil &&
h.HealthChecks.Active.stopChan != nil {
close(h.HealthChecks.Active.stopChan)
}
// remove hosts from our config from the pool
for _, upstream := range h.Upstreams { for _, upstream := range h.Upstreams {
hosts.Delete(upstream.hostURL.String()) hosts.Delete(upstream.hostURL.String())
} }
return nil return nil
} }
@ -539,38 +575,6 @@ func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, er
} }
} }
// countFailure remembers 1 failure for upstream for the
// configured duration. If passive health checks are
// disabled or failure expiry is 0, this is a no-op.
func (h Handler) countFailure(upstream *Upstream) {
// only count failures if passive health checking is enabled
// and if failures are configured have a non-zero expiry
if h.HealthChecks == nil || h.HealthChecks.Passive == nil {
return
}
failDuration := time.Duration(h.HealthChecks.Passive.FailDuration)
if failDuration == 0 {
return
}
// count failure immediately
err := upstream.Host.CountFail(1)
if err != nil {
log.Printf("[ERROR] proxy: upstream %s: counting failure: %v",
upstream.hostURL, err)
}
// forget it later
go func(host Host, failDuration time.Duration) {
time.Sleep(failDuration)
err := host.CountFail(-1)
if err != nil {
log.Printf("[ERROR] proxy: upstream %s: expiring failure: %v",
upstream.hostURL, err)
}
}(upstream.Host, failDuration)
}
type writeFlusher interface { type writeFlusher interface {
io.Writer io.Writer
http.Flusher http.Flusher
@ -722,29 +726,6 @@ type Selector interface {
Select(HostPool, *http.Request) *Upstream Select(HostPool, *http.Request) *Upstream
} }
type HealthChecks struct {
Active *ActiveHealthChecks `json:"active,omitempty"`
Passive *PassiveHealthChecks `json:"passive,omitempty"`
}
type ActiveHealthChecks struct {
Path string `json:"path,omitempty"`
Port int `json:"port,omitempty"`
Interval caddy.Duration `json:"interval,omitempty"`
Timeout caddy.Duration `json:"timeout,omitempty"`
MaxSize int `json:"max_size,omitempty"`
ExpectStatus int `json:"expect_status,omitempty"`
ExpectBody string `json:"expect_body,omitempty"`
}
type PassiveHealthChecks struct {
MaxFails int `json:"max_fails,omitempty"`
FailDuration caddy.Duration `json:"fail_duration,omitempty"`
UnhealthyRequestCount int `json:"unhealthy_request_count,omitempty"`
UnhealthyStatus []int `json:"unhealthy_status,omitempty"`
UnhealthyLatency caddy.Duration `json:"unhealthy_latency,omitempty"`
}
// Hop-by-hop headers. These are removed when sent to the backend. // Hop-by-hop headers. These are removed when sent to the backend.
// As of RFC 7230, hop-by-hop headers are required to appear in the // As of RFC 7230, hop-by-hop headers are required to appear in the
// Connection header field. These are the headers defined by the // Connection header field. These are the headers defined by the
@ -762,22 +743,33 @@ var hopHeaders = []string{
"Upgrade", "Upgrade",
} }
var bufPool = sync.Pool{ // Host represents a remote host which can be proxied to.
New: func() interface{} { // Its methods must be safe for concurrent use.
return new(bytes.Buffer)
},
}
//////////////////////////////////
// TODO:
type Host interface { type Host interface {
// NumRequests returns the numnber of requests
// currently in process with the host.
NumRequests() int NumRequests() int
// Fails returns the count of recent failures.
Fails() int Fails() int
// Unhealthy returns true if the backend is unhealthy.
Unhealthy() bool Unhealthy() bool
// CountRequest counts the given number of requests
// as currently in process with the host. The count
// should not go below 0.
CountRequest(int) error CountRequest(int) error
// CountFail counts the given number of failures
// with the host. The count should not go below 0.
CountFail(int) error CountFail(int) error
// SetHealthy marks the host as either healthy (true)
// or unhealthy (false). If the given status is the
// same, this should be a no-op. It returns true if
// the given status was different, false otherwise.
SetHealthy(bool) (bool, error)
} }
type HostPool []*Upstream type HostPool []*Upstream
@ -788,13 +780,13 @@ type upstreamHost struct {
unhealthy int32 unhealthy int32
} }
func (uh upstreamHost) NumRequests() int { func (uh *upstreamHost) NumRequests() int {
return int(atomic.LoadInt64(&uh.numRequests)) return int(atomic.LoadInt64(&uh.numRequests))
} }
func (uh upstreamHost) Fails() int { func (uh *upstreamHost) Fails() int {
return int(atomic.LoadInt64(&uh.fails)) return int(atomic.LoadInt64(&uh.fails))
} }
func (uh upstreamHost) Unhealthy() bool { func (uh *upstreamHost) Unhealthy() bool {
return atomic.LoadInt32(&uh.unhealthy) == 1 return atomic.LoadInt32(&uh.unhealthy) == 1
} }
func (uh *upstreamHost) CountRequest(delta int) error { func (uh *upstreamHost) CountRequest(delta int) error {
@ -811,6 +803,14 @@ func (uh *upstreamHost) CountFail(delta int) error {
} }
return nil return nil
} }
func (uh *upstreamHost) SetHealthy(healthy bool) (bool, error) {
var unhealthy, compare int32 = 1, 0
if healthy {
unhealthy, compare = 0, 1
}
swapped := atomic.CompareAndSwapInt32(&uh.unhealthy, compare, unhealthy)
return swapped, nil
}
type Upstream struct { type Upstream struct {
Host `json:"-"` Host `json:"-"`
@ -854,6 +854,13 @@ var hosts = caddy.NewUsagePool()
type UpstreamProvider interface { type UpstreamProvider interface {
} }
// TODO: see if we can use this
// var bufPool = sync.Pool{
// New: func() interface{} {
// return new(bytes.Buffer)
// },
// }
// Interface guards // Interface guards
var ( var (
_ caddyhttp.MiddlewareHandler = (*Handler)(nil) _ caddyhttp.MiddlewareHandler = (*Handler)(nil)

View File

@ -80,6 +80,14 @@ func (up *UsagePool) LoadOrStore(key, val interface{}) (actual interface{}, load
return return
} }
// Range iterates the pool the same way sync.Map.Range does.
// This does not affect usage counts.
func (up *UsagePool) Range(f func(key, value interface{}) bool) {
up.pool.Range(func(key, value interface{}) bool {
return f(key, value.(*usagePoolVal).value)
})
}
type usagePoolVal struct { type usagePoolVal struct {
usage int32 // accessed atomically; must be 64-bit aligned for 32-bit systems usage int32 // accessed atomically; must be 64-bit aligned for 32-bit systems
value interface{} value interface{}