diff --git a/modules/caddyhttp/reverseproxy/caddyfile.go b/modules/caddyhttp/reverseproxy/caddyfile.go index a2c85f905..f746ee509 100644 --- a/modules/caddyhttp/reverseproxy/caddyfile.go +++ b/modules/caddyhttp/reverseproxy/caddyfile.go @@ -52,73 +52,73 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error) // UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax: // -// reverse_proxy [] [] { -// # backends -// to -// dynamic [...] +// reverse_proxy [] [] { +// # backends +// to +// dynamic [...] // -// # load balancing -// lb_policy [] -// lb_retries -// lb_try_duration -// lb_try_interval -// lb_retry_match +// # load balancing +// lb_policy [] +// lb_retries +// lb_try_duration +// lb_try_interval +// lb_retry_match // -// # active health checking -// health_uri -// health_port -// health_interval -// health_timeout -// health_status -// health_body -// health_headers { -// [] -// } +// # active health checking +// health_uri +// health_port +// health_interval +// health_timeout +// health_status +// health_body +// health_headers { +// [] +// } // -// # passive health checking -// fail_duration -// max_fails -// unhealthy_status -// unhealthy_latency -// unhealthy_request_count +// # passive health checking +// fail_duration +// max_fails +// unhealthy_status +// unhealthy_latency +// unhealthy_request_count // -// # streaming -// flush_interval -// buffer_requests -// buffer_responses -// max_buffer_size +// # streaming +// flush_interval +// buffer_requests +// buffer_responses +// max_buffer_size // -// # request manipulation -// trusted_proxies [private_ranges] -// header_up [+|-] [ []] -// header_down [+|-] [ []] -// method -// rewrite +// # request manipulation +// trusted_proxies [private_ranges] +// header_up [+|-] [ []] +// header_down [+|-] [ []] +// method +// rewrite // -// # round trip -// transport { -// ... -// } +// # round trip +// transport { +// ... +// } // -// # optionally intercept responses from upstream -// @name { -// status -// header [] -// } -// replace_status [] -// handle_response [] { -// +// # optionally intercept responses from upstream +// @name { +// status +// header [] +// } +// replace_status [] +// handle_response [] { +// // -// # special directives only available in handle_response -// copy_response [] [] { -// status -// } -// copy_response_headers [] { -// include -// exclude -// } -// } -// } +// # special directives only available in handle_response +// copy_response [] [] { +// status +// } +// copy_response_headers [] { +// include +// exclude +// } +// } +// } // // Proxy upstream addresses should be network dial addresses such // as `host:port`, or a URL such as `scheme://host:port`. Scheme @@ -824,33 +824,32 @@ func (h *Handler) FinalizeUnmarshalCaddyfile(helper httpcaddyfile.Helper) error // UnmarshalCaddyfile deserializes Caddyfile tokens into h. // -// transport http { -// read_buffer -// write_buffer -// max_response_header -// dial_timeout -// dial_fallback_delay -// response_header_timeout -// expect_continue_timeout -// resolvers -// tls -// tls_client_auth | -// tls_insecure_skip_verify -// tls_timeout -// tls_trusted_ca_certs -// tls_server_name -// tls_renegotiation -// tls_except_ports -// keepalive [off|] -// keepalive_interval -// keepalive_idle_conns -// keepalive_idle_conns_per_host -// versions -// compression off -// max_conns_per_host -// max_idle_conns_per_host -// } -// +// transport http { +// read_buffer +// write_buffer +// max_response_header +// dial_timeout +// dial_fallback_delay +// response_header_timeout +// expect_continue_timeout +// resolvers +// tls +// tls_client_auth | +// tls_insecure_skip_verify +// tls_timeout +// tls_trusted_ca_certs +// tls_server_name +// tls_renegotiation +// tls_except_ports +// keepalive [off|] +// keepalive_interval +// keepalive_idle_conns +// keepalive_idle_conns_per_host +// versions +// compression off +// max_conns_per_host +// max_idle_conns_per_host +// } func (h *HTTPTransport) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { for d.Next() { for d.NextBlock(0) { @@ -1138,10 +1137,9 @@ func parseCopyResponseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHan // UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax: // -// copy_response [] [] { -// status -// } -// +// copy_response [] [] { +// status +// } func (h *CopyResponseHandler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { for d.Next() { args := d.RemainingArgs() @@ -1178,11 +1176,10 @@ func parseCopyResponseHeadersCaddyfile(h httpcaddyfile.Helper) (caddyhttp.Middle // UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax: // -// copy_response_headers [] { -// include -// exclude -// } -// +// copy_response_headers [] { +// include +// exclude +// } func (h *CopyResponseHeadersHandler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { for d.Next() { args := d.RemainingArgs() @@ -1208,16 +1205,15 @@ func (h *CopyResponseHeadersHandler) UnmarshalCaddyfile(d *caddyfile.Dispenser) // UnmarshalCaddyfile deserializes Caddyfile tokens into h. // -// dynamic srv [] { -// service -// proto -// name -// refresh -// resolvers -// dial_timeout -// dial_fallback_delay -// } -// +// dynamic srv [] { +// service +// proto +// name +// refresh +// resolvers +// dial_timeout +// dial_fallback_delay +// } func (u *SRVUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { for d.Next() { args := d.RemainingArgs() @@ -1307,15 +1303,14 @@ func (u *SRVUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { // UnmarshalCaddyfile deserializes Caddyfile tokens into h. // -// dynamic a [ -// port -// refresh -// resolvers -// dial_timeout -// dial_fallback_delay -// } -// +// dynamic a [ +// port +// refresh +// resolvers +// dial_timeout +// dial_fallback_delay +// } func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { for d.Next() { args := d.RemainingArgs() @@ -1324,7 +1319,9 @@ func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } if len(args) > 0 { u.Name = args[0] - u.Port = args[1] + if len(args) == 2 { + u.Port = args[1] + } } for d.NextBlock(0) { @@ -1395,6 +1392,35 @@ func (u *AUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { return nil } +// UnmarshalCaddyfile deserializes Caddyfile tokens into h. +// +// dynamic multi { +// [...] +// } +func (u *MultiUpstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { + for d.Next() { + if d.NextArg() { + return d.ArgErr() + } + + for nesting := d.Nesting(); d.NextBlock(nesting); { + dynModule := d.Val() + modID := "http.reverse_proxy.upstreams." + dynModule + unm, err := caddyfile.UnmarshalModule(d, modID) + if err != nil { + return err + } + source, ok := unm.(UpstreamSource) + if !ok { + return d.Errf("module %s (%T) is not an UpstreamSource", modID, unm) + } + u.SourcesRaw = append(u.SourcesRaw, caddyconfig.JSONModuleObject(source, "source", dynModule, nil)) + } + } + + return nil +} + const matcherPrefix = "@" // Interface guards @@ -1403,4 +1429,5 @@ var ( _ caddyfile.Unmarshaler = (*HTTPTransport)(nil) _ caddyfile.Unmarshaler = (*SRVUpstreams)(nil) _ caddyfile.Unmarshaler = (*AUpstreams)(nil) + _ caddyfile.Unmarshaler = (*MultiUpstreams)(nil) ) diff --git a/modules/caddyhttp/reverseproxy/upstreams.go b/modules/caddyhttp/reverseproxy/upstreams.go index f788dadbc..f49bb289b 100644 --- a/modules/caddyhttp/reverseproxy/upstreams.go +++ b/modules/caddyhttp/reverseproxy/upstreams.go @@ -2,6 +2,7 @@ package reverseproxy import ( "context" + "encoding/json" "fmt" weakrand "math/rand" "net" @@ -18,6 +19,7 @@ import ( func init() { caddy.RegisterModule(SRVUpstreams{}) caddy.RegisterModule(AUpstreams{}) + caddy.RegisterModule(MultiUpstreams{}) } // SRVUpstreams provides upstreams from SRV lookups. @@ -211,11 +213,6 @@ func (sl srvLookup) isFresh() bool { return time.Since(sl.freshness) < time.Duration(sl.srvUpstreams.Refresh) } -var ( - srvs = make(map[string]srvLookup) - srvsMu sync.RWMutex -) - // AUpstreams provides upstreams from A/AAAA lookups. // Results are cached and refreshed at the configured // refresh interval. @@ -355,6 +352,77 @@ func (al aLookup) isFresh() bool { return time.Since(al.freshness) < time.Duration(al.aUpstreams.Refresh) } +// MultiUpstreams is a single dynamic upstream source that +// aggregates the results of multiple dynamic upstream sources. +// All configured sources will be queried in order, with their +// results appended to the end of the list. Errors returned +// from individual sources will be logged and the next source +// will continue to be invoked. +// +// This module makes it easy to implement redundant cluster +// failovers, especially in conjunction with the `first` load +// balancing policy: if the first source returns an error or +// no upstreams, the second source's upstreams will be used +// naturally. +type MultiUpstreams struct { + // The list of upstream source modules to get upstreams from. + // They will be queried in order, with their results appended + // in the order they are returned. + SourcesRaw []json.RawMessage `json:"sources,omitempty" caddy:"namespace=http.reverse_proxy.upstreams inline_key=source"` + sources []UpstreamSource + + logger *zap.Logger +} + +// CaddyModule returns the Caddy module information. +func (MultiUpstreams) CaddyModule() caddy.ModuleInfo { + return caddy.ModuleInfo{ + ID: "http.reverse_proxy.upstreams.multi", + New: func() caddy.Module { return new(MultiUpstreams) }, + } +} + +func (mu *MultiUpstreams) Provision(ctx caddy.Context) error { + mu.logger = ctx.Logger(mu) + + if mu.SourcesRaw != nil { + mod, err := ctx.LoadModule(mu, "SourcesRaw") + if err != nil { + return fmt.Errorf("loading upstream source modules: %v", err) + } + for _, src := range mod.([]any) { + mu.sources = append(mu.sources, src.(UpstreamSource)) + } + } + + return nil +} + +func (mu MultiUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) { + var upstreams []*Upstream + + for i, src := range mu.sources { + select { + case <-r.Context().Done(): + return upstreams, context.Canceled + default: + } + + up, err := src.GetUpstreams(r) + if err != nil { + mu.logger.Error("upstream source returned error", + zap.Int("source_idx", i), + zap.Error(err)) + } else if len(up) == 0 { + mu.logger.Warn("upstream source returned 0 upstreams", zap.Int("source_idx", i)) + } else { + upstreams = append(upstreams, up...) + } + } + + return upstreams, nil +} + // UpstreamResolver holds the set of addresses of DNS resolvers of // upstream addresses type UpstreamResolver struct { @@ -391,6 +459,9 @@ func (u *UpstreamResolver) ParseAddresses() error { } var ( + srvs = make(map[string]srvLookup) + srvsMu sync.RWMutex + aAaaa = make(map[string]aLookup) aAaaaMu sync.RWMutex )