diff --git a/modules/caddyhttp/reverseproxy/healthchecks.go b/modules/caddyhttp/reverseproxy/healthchecks.go new file mode 100644 index 000000000..96649a4ee --- /dev/null +++ b/modules/caddyhttp/reverseproxy/healthchecks.go @@ -0,0 +1,220 @@ +// Copyright 2015 Matthew Holt and The Caddy Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reverseproxy + +import ( + "fmt" + "io" + "io/ioutil" + "log" + "net" + "net/http" + "net/url" + "regexp" + "strconv" + "time" + + "github.com/caddyserver/caddy/v2" + "github.com/caddyserver/caddy/v2/modules/caddyhttp" +) + +type HealthChecks struct { + Active *ActiveHealthChecks `json:"active,omitempty"` + Passive *PassiveHealthChecks `json:"passive,omitempty"` +} + +type ActiveHealthChecks struct { + Path string `json:"path,omitempty"` + Port int `json:"port,omitempty"` + Interval caddy.Duration `json:"interval,omitempty"` + Timeout caddy.Duration `json:"timeout,omitempty"` + MaxSize int64 `json:"max_size,omitempty"` + ExpectStatus int `json:"expect_status,omitempty"` + ExpectBody string `json:"expect_body,omitempty"` + + stopChan chan struct{} + httpClient *http.Client + bodyRegexp *regexp.Regexp +} + +type PassiveHealthChecks struct { + MaxFails int `json:"max_fails,omitempty"` + FailDuration caddy.Duration `json:"fail_duration,omitempty"` + UnhealthyRequestCount int `json:"unhealthy_request_count,omitempty"` + UnhealthyStatus []int `json:"unhealthy_status,omitempty"` + UnhealthyLatency caddy.Duration `json:"unhealthy_latency,omitempty"` +} + +func (h *Handler) activeHealthChecker() { + ticker := time.NewTicker(time.Duration(h.HealthChecks.Active.Interval)) + h.doActiveHealthChecksForAllHosts() + for { + select { + case <-ticker.C: + h.doActiveHealthChecksForAllHosts() + case <-h.HealthChecks.Active.stopChan: + ticker.Stop() + return + } + } +} + +func (h *Handler) doActiveHealthChecksForAllHosts() { + hosts.Range(func(key, value interface{}) bool { + addr := key.(string) + host := value.(Host) + + go func(addr string, host Host) { + err := h.doActiveHealthCheck(addr, host) + if err != nil { + log.Printf("[ERROR] reverse_proxy: active health check for host %s: %v", addr, err) + } + }(addr, host) + + // continue to iterate all hosts + return true + }) +} + +// doActiveHealthCheck performs a health check to host which +// can be reached at address hostAddr. The actual address for +// the request will be built according to active health checker +// config. The health status of the host will be updated +// according to whether it passes the health check. An error is +// returned only if the health check fails to occur or if marking +// the host's health status fails. +func (h *Handler) doActiveHealthCheck(hostAddr string, host Host) error { + // create the URL for the health check + u, err := url.Parse(hostAddr) + if err != nil { + return err + } + if h.HealthChecks.Active.Path != "" { + u.Path = h.HealthChecks.Active.Path + } + if h.HealthChecks.Active.Port != 0 { + portStr := strconv.Itoa(h.HealthChecks.Active.Port) + u.Host = net.JoinHostPort(u.Hostname(), portStr) + } + + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return err + } + + // do the request, careful to tame the response body + resp, err := h.HealthChecks.Active.httpClient.Do(req) + if err != nil { + log.Printf("[INFO] reverse_proxy: active health check: %s is down (HTTP request failed: %v)", hostAddr, err) + _, err2 := host.SetHealthy(false) + if err2 != nil { + return fmt.Errorf("marking unhealthy: %v", err2) + } + return nil + } + var body io.Reader = resp.Body + if h.HealthChecks.Active.MaxSize > 0 { + body = io.LimitReader(body, h.HealthChecks.Active.MaxSize) + } + defer func() { + // drain any remaining body so connection can be re-used + io.Copy(ioutil.Discard, body) + resp.Body.Close() + }() + + // if status code is outside criteria, mark down + if h.HealthChecks.Active.ExpectStatus > 0 { + if !caddyhttp.StatusCodeMatches(resp.StatusCode, h.HealthChecks.Active.ExpectStatus) { + log.Printf("[INFO] reverse_proxy: active health check: %s is down (status code %d unexpected)", hostAddr, resp.StatusCode) + _, err := host.SetHealthy(false) + if err != nil { + return fmt.Errorf("marking unhealthy: %v", err) + } + return nil + } + } else if resp.StatusCode < 200 || resp.StatusCode >= 400 { + log.Printf("[INFO] reverse_proxy: active health check: %s is down (status code %d out of tolerances)", hostAddr, resp.StatusCode) + _, err := host.SetHealthy(false) + if err != nil { + return fmt.Errorf("marking unhealthy: %v", err) + } + return nil + } + + // if body does not match regex, mark down + if h.HealthChecks.Active.bodyRegexp != nil { + bodyBytes, err := ioutil.ReadAll(body) + if err != nil { + log.Printf("[INFO] reverse_proxy: active health check: %s is down (failed to read response body)", hostAddr) + _, err := host.SetHealthy(false) + if err != nil { + return fmt.Errorf("marking unhealthy: %v", err) + } + return nil + } + if !h.HealthChecks.Active.bodyRegexp.Match(bodyBytes) { + log.Printf("[INFO] reverse_proxy: active health check: %s is down (response body failed expectations)", hostAddr) + _, err := host.SetHealthy(false) + if err != nil { + return fmt.Errorf("marking unhealthy: %v", err) + } + return nil + } + } + + // passed health check parameters, so mark as healthy + swapped, err := host.SetHealthy(true) + if swapped { + log.Printf("[INFO] reverse_proxy: active health check: %s is back up", hostAddr) + } + if err != nil { + return fmt.Errorf("marking healthy: %v", err) + } + + return nil +} + +// countFailure is used with passive health checks. It +// remembers 1 failure for upstream for the configured +// duration. If passive health checks are disabled or +// failure expiry is 0, this is a no-op. +func (h Handler) countFailure(upstream *Upstream) { + // only count failures if passive health checking is enabled + // and if failures are configured have a non-zero expiry + if h.HealthChecks == nil || h.HealthChecks.Passive == nil { + return + } + failDuration := time.Duration(h.HealthChecks.Passive.FailDuration) + if failDuration == 0 { + return + } + + // count failure immediately + err := upstream.Host.CountFail(1) + if err != nil { + log.Printf("[ERROR] proxy: upstream %s: counting failure: %v", + upstream.hostURL, err) + } + + // forget it later + go func(host Host, failDuration time.Duration) { + time.Sleep(failDuration) + err := host.CountFail(-1) + if err != nil { + log.Printf("[ERROR] proxy: upstream %s: expiring failure: %v", + upstream.hostURL, err) + } + }(upstream.Host, failDuration) +} diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index e312d714a..ebf6ac199 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -15,15 +15,14 @@ package reverseproxy import ( - "bytes" "context" "encoding/json" "fmt" "io" - "log" "net" "net/http" "net/url" + "regexp" "strings" "sync" "sync/atomic" @@ -90,11 +89,41 @@ func (h *Handler) Provision(ctx caddy.Context) error { if h.LoadBalancing.TryDuration > 0 && h.LoadBalancing.TryInterval == 0 { // a non-zero try_duration with a zero try_interval // will always spin the CPU for try_duration if the - // upstream is local or low-latency; default to some - // sane waiting period before try attempts + // upstream is local or low-latency; avoid that by + // defaulting to a sane wait period between attempts h.LoadBalancing.TryInterval = caddy.Duration(250 * time.Millisecond) } + // if active health checks are enabled, configure them and start a worker + if h.HealthChecks != nil && + h.HealthChecks.Active != nil && + (h.HealthChecks.Active.Path != "" || h.HealthChecks.Active.Port != 0) { + timeout := time.Duration(h.HealthChecks.Active.Timeout) + if timeout == 0 { + timeout = 10 * time.Second + } + + h.HealthChecks.Active.stopChan = make(chan struct{}) + h.HealthChecks.Active.httpClient = &http.Client{ + Timeout: timeout, + Transport: h.Transport, + } + + if h.HealthChecks.Active.Interval == 0 { + h.HealthChecks.Active.Interval = caddy.Duration(30 * time.Second) + } + + if h.HealthChecks.Active.ExpectBody != "" { + var err error + h.HealthChecks.Active.bodyRegexp, err = regexp.Compile(h.HealthChecks.Active.ExpectBody) + if err != nil { + return fmt.Errorf("expect_body: compiling regular expression: %v", err) + } + } + + go h.activeHealthChecker() + } + for _, upstream := range h.Upstreams { // url parser requires a scheme if !strings.Contains(upstream.Address, "://") { @@ -130,8 +159,6 @@ func (h *Handler) Provision(ctx caddy.Context) error { upstream.MaxRequests = h.HealthChecks.Passive.UnhealthyRequestCount } - // TODO: active health checks - if h.HealthChecks != nil { // upstreams need independent access to the passive // health check policy so they can, you know, passively @@ -143,11 +170,20 @@ func (h *Handler) Provision(ctx caddy.Context) error { return nil } +// Cleanup cleans up the resources made by h during provisioning. func (h *Handler) Cleanup() error { - // TODO: finish this up, make sure it takes care of any active health checkers or whatever + // stop the active health checker + if h.HealthChecks != nil && + h.HealthChecks.Active != nil && + h.HealthChecks.Active.stopChan != nil { + close(h.HealthChecks.Active.stopChan) + } + + // remove hosts from our config from the pool for _, upstream := range h.Upstreams { hosts.Delete(upstream.hostURL.String()) } + return nil } @@ -539,38 +575,6 @@ func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, er } } -// countFailure remembers 1 failure for upstream for the -// configured duration. If passive health checks are -// disabled or failure expiry is 0, this is a no-op. -func (h Handler) countFailure(upstream *Upstream) { - // only count failures if passive health checking is enabled - // and if failures are configured have a non-zero expiry - if h.HealthChecks == nil || h.HealthChecks.Passive == nil { - return - } - failDuration := time.Duration(h.HealthChecks.Passive.FailDuration) - if failDuration == 0 { - return - } - - // count failure immediately - err := upstream.Host.CountFail(1) - if err != nil { - log.Printf("[ERROR] proxy: upstream %s: counting failure: %v", - upstream.hostURL, err) - } - - // forget it later - go func(host Host, failDuration time.Duration) { - time.Sleep(failDuration) - err := host.CountFail(-1) - if err != nil { - log.Printf("[ERROR] proxy: upstream %s: expiring failure: %v", - upstream.hostURL, err) - } - }(upstream.Host, failDuration) -} - type writeFlusher interface { io.Writer http.Flusher @@ -722,29 +726,6 @@ type Selector interface { Select(HostPool, *http.Request) *Upstream } -type HealthChecks struct { - Active *ActiveHealthChecks `json:"active,omitempty"` - Passive *PassiveHealthChecks `json:"passive,omitempty"` -} - -type ActiveHealthChecks struct { - Path string `json:"path,omitempty"` - Port int `json:"port,omitempty"` - Interval caddy.Duration `json:"interval,omitempty"` - Timeout caddy.Duration `json:"timeout,omitempty"` - MaxSize int `json:"max_size,omitempty"` - ExpectStatus int `json:"expect_status,omitempty"` - ExpectBody string `json:"expect_body,omitempty"` -} - -type PassiveHealthChecks struct { - MaxFails int `json:"max_fails,omitempty"` - FailDuration caddy.Duration `json:"fail_duration,omitempty"` - UnhealthyRequestCount int `json:"unhealthy_request_count,omitempty"` - UnhealthyStatus []int `json:"unhealthy_status,omitempty"` - UnhealthyLatency caddy.Duration `json:"unhealthy_latency,omitempty"` -} - // Hop-by-hop headers. These are removed when sent to the backend. // As of RFC 7230, hop-by-hop headers are required to appear in the // Connection header field. These are the headers defined by the @@ -762,22 +743,33 @@ var hopHeaders = []string{ "Upgrade", } -var bufPool = sync.Pool{ - New: func() interface{} { - return new(bytes.Buffer) - }, -} - -////////////////////////////////// -// TODO: - +// Host represents a remote host which can be proxied to. +// Its methods must be safe for concurrent use. type Host interface { + // NumRequests returns the numnber of requests + // currently in process with the host. NumRequests() int + + // Fails returns the count of recent failures. Fails() int + + // Unhealthy returns true if the backend is unhealthy. Unhealthy() bool + // CountRequest counts the given number of requests + // as currently in process with the host. The count + // should not go below 0. CountRequest(int) error + + // CountFail counts the given number of failures + // with the host. The count should not go below 0. CountFail(int) error + + // SetHealthy marks the host as either healthy (true) + // or unhealthy (false). If the given status is the + // same, this should be a no-op. It returns true if + // the given status was different, false otherwise. + SetHealthy(bool) (bool, error) } type HostPool []*Upstream @@ -788,13 +780,13 @@ type upstreamHost struct { unhealthy int32 } -func (uh upstreamHost) NumRequests() int { +func (uh *upstreamHost) NumRequests() int { return int(atomic.LoadInt64(&uh.numRequests)) } -func (uh upstreamHost) Fails() int { +func (uh *upstreamHost) Fails() int { return int(atomic.LoadInt64(&uh.fails)) } -func (uh upstreamHost) Unhealthy() bool { +func (uh *upstreamHost) Unhealthy() bool { return atomic.LoadInt32(&uh.unhealthy) == 1 } func (uh *upstreamHost) CountRequest(delta int) error { @@ -811,6 +803,14 @@ func (uh *upstreamHost) CountFail(delta int) error { } return nil } +func (uh *upstreamHost) SetHealthy(healthy bool) (bool, error) { + var unhealthy, compare int32 = 1, 0 + if healthy { + unhealthy, compare = 0, 1 + } + swapped := atomic.CompareAndSwapInt32(&uh.unhealthy, compare, unhealthy) + return swapped, nil +} type Upstream struct { Host `json:"-"` @@ -854,6 +854,13 @@ var hosts = caddy.NewUsagePool() type UpstreamProvider interface { } +// TODO: see if we can use this +// var bufPool = sync.Pool{ +// New: func() interface{} { +// return new(bytes.Buffer) +// }, +// } + // Interface guards var ( _ caddyhttp.MiddlewareHandler = (*Handler)(nil) diff --git a/usagepool.go b/usagepool.go index 3b8e975f0..dd4f60671 100644 --- a/usagepool.go +++ b/usagepool.go @@ -80,6 +80,14 @@ func (up *UsagePool) LoadOrStore(key, val interface{}) (actual interface{}, load return } +// Range iterates the pool the same way sync.Map.Range does. +// This does not affect usage counts. +func (up *UsagePool) Range(f func(key, value interface{}) bool) { + up.pool.Range(func(key, value interface{}) bool { + return f(key, value.(*usagePoolVal).value) + }) +} + type usagePoolVal struct { usage int32 // accessed atomically; must be 64-bit aligned for 32-bit systems value interface{}