From bac54de9ebb23ab406ec30c3f2221e9c80f10183 Mon Sep 17 00:00:00 2001 From: Abiola Ibrahim Date: Sat, 24 Sep 2016 06:29:23 +0100 Subject: [PATCH] Fastcgi persistent fix (#1129) * Support for configurable connection pool. * ensure positive integer pool size config --- caddyhttp/fastcgi/dialer.go | 60 +++++++++++++++++++++++ caddyhttp/fastcgi/fastcgi.go | 81 ++++++++----------------------- caddyhttp/fastcgi/fastcgi_test.go | 7 +-- caddyhttp/fastcgi/fcgiclient.go | 10 ++-- caddyhttp/fastcgi/setup.go | 17 +++++++ 5 files changed, 105 insertions(+), 70 deletions(-) create mode 100644 caddyhttp/fastcgi/dialer.go diff --git a/caddyhttp/fastcgi/dialer.go b/caddyhttp/fastcgi/dialer.go new file mode 100644 index 000000000..df33fab62 --- /dev/null +++ b/caddyhttp/fastcgi/dialer.go @@ -0,0 +1,60 @@ +package fastcgi + +import "sync" + +type dialer interface { + Dial() (*FCGIClient, error) + Close(*FCGIClient) error +} + +// basicDialer is a basic dialer that wraps default fcgi functions. +type basicDialer struct { + network, address string +} + +func (b basicDialer) Dial() (*FCGIClient, error) { return Dial(b.network, b.address) } +func (b basicDialer) Close(c *FCGIClient) error { return c.Close() } + +// persistentDialer keeps a pool of fcgi connections. +// connections are not closed after use, rather added back to the pool for reuse. +type persistentDialer struct { + size int + network string + address string + pool []*FCGIClient + sync.Mutex +} + +func (p *persistentDialer) Dial() (*FCGIClient, error) { + p.Lock() + + // connection is available, return first one. + if len(p.pool) > 0 { + client := p.pool[0] + p.pool = p.pool[1:] + p.Unlock() + + return client, nil + } + + p.Unlock() + + // no connection available, create new one + return Dial(p.network, p.address) +} + +func (p *persistentDialer) Close(client *FCGIClient) error { + p.Lock() + if len(p.pool) < p.size { + // pool is not full yet, add connection for reuse + p.pool = append(p.pool, client) + p.Unlock() + + return nil + } + + p.Unlock() + + // otherwise, close the connection. + return client.Close() +} diff --git a/caddyhttp/fastcgi/fastcgi.go b/caddyhttp/fastcgi/fastcgi.go index 3a5628913..5b40b9367 100644 --- a/caddyhttp/fastcgi/fastcgi.go +++ b/caddyhttp/fastcgi/fastcgi.go @@ -12,29 +12,10 @@ import ( "path/filepath" "strconv" "strings" - "sync" "github.com/mholt/caddy/caddyhttp/httpserver" ) -// persistent fastcgi connections - -type serialClient struct { - // for read/write serialisation - sync.Mutex - backend *FCGIClient -} -type concurrentPersistentConnectionsMap struct { - // for thread safe acces to the map - sync.Mutex - clientMap map[string]*serialClient -} - -var persistentConnections = &(concurrentPersistentConnectionsMap{clientMap: make(map[string]*serialClient)}) - -// UsePersistentFcgiConnections TODO: add an option in Caddyfile and pass it down to here -var UsePersistentFcgiConnections = true - // Handler is a middleware type that can handle requests as a FastCGI client. type Handler struct { Next httpserver.Handler @@ -91,26 +72,9 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) } // Connect to FastCGI gateway - network, address := rule.parseAddress() - - var fcgiBackend *FCGIClient - var client *serialClient - reuse := false - if UsePersistentFcgiConnections { - persistentConnections.Lock() - client, reuse = persistentConnections.clientMap[network+address] - persistentConnections.Unlock() - } - - if reuse { - client.Lock() - defer client.Unlock() - fcgiBackend = client.backend - } else { - fcgiBackend, err = Dial(network, address) - if err != nil { - return http.StatusBadGateway, err - } + fcgiBackend, err := rule.dialer.Dial() + if err != nil { + return http.StatusBadGateway, err } var resp *http.Response @@ -126,18 +90,12 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) resp, err = fcgiBackend.Post(env, r.Method, r.Header.Get("Content-Type"), r.Body, contentLength) } + defer fcgiBackend.Close() + if err != nil && err != io.EOF { return http.StatusBadGateway, err } - if UsePersistentFcgiConnections { - persistentConnections.Lock() - persistentConnections.clientMap[network+address] = &(serialClient{backend: fcgiBackend}) - persistentConnections.Unlock() - } else { - defer fcgiBackend.Close() - } - // Write response header writeHeader(w, resp) @@ -151,10 +109,6 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) if fcgiBackend.stderr.Len() != 0 { // Remove trailing newline, error logger already does this. err = LogError(strings.TrimSuffix(fcgiBackend.stderr.String(), "\n")) - persistentConnections.Lock() - delete(persistentConnections.clientMap, network+address) - persistentConnections.Unlock() - fcgiBackend.Close() } // Normally we would return the status code if it is an error status (>= 400), @@ -170,28 +124,28 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) return h.Next.ServeHTTP(w, r) } -// parseAddress returns the network and address of r. +// parseAddress returns the network and address of fcgiAddress. // The first string is the network, "tcp" or "unix", implied from the scheme and address. -// The second string is r.Address, with scheme prefixes removed. +// The second string is fcgiAddress, with scheme prefixes removed. // The two returned strings can be used as parameters to the Dial() function. -func (r Rule) parseAddress() (string, string) { +func parseAddress(fcgiAddress string) (string, string) { // check if address has tcp scheme explicitly set - if strings.HasPrefix(r.Address, "tcp://") { - return "tcp", r.Address[len("tcp://"):] + if strings.HasPrefix(fcgiAddress, "tcp://") { + return "tcp", fcgiAddress[len("tcp://"):] } // check if address has fastcgi scheme explicitly set - if strings.HasPrefix(r.Address, "fastcgi://") { - return "tcp", r.Address[len("fastcgi://"):] + if strings.HasPrefix(fcgiAddress, "fastcgi://") { + return "tcp", fcgiAddress[len("fastcgi://"):] } // check if unix socket - if trim := strings.HasPrefix(r.Address, "unix"); strings.HasPrefix(r.Address, "/") || trim { + if trim := strings.HasPrefix(fcgiAddress, "unix"); strings.HasPrefix(fcgiAddress, "/") || trim { if trim { - return "unix", r.Address[len("unix:"):] + return "unix", fcgiAddress[len("unix:"):] } - return "unix", r.Address + return "unix", fcgiAddress } // default case, a plain tcp address with no scheme - return "tcp", r.Address + return "tcp", fcgiAddress } func writeHeader(w http.ResponseWriter, r *http.Response) { @@ -340,6 +294,9 @@ type Rule struct { // Ignored paths IgnoredSubPaths []string + + // FCGI dialer + dialer dialer } // canSplit checks if path can split into two based on rule.SplitPath. diff --git a/caddyhttp/fastcgi/fastcgi_test.go b/caddyhttp/fastcgi/fastcgi_test.go index 8b813c246..3d23c44c5 100644 --- a/caddyhttp/fastcgi/fastcgi_test.go +++ b/caddyhttp/fastcgi/fastcgi_test.go @@ -24,9 +24,10 @@ func TestServeHTTP(t *testing.T) { w.Write([]byte(body)) })) + network, address := parseAddress(listener.Addr().String()) handler := Handler{ Next: nil, - Rules: []Rule{{Path: "/", Address: listener.Addr().String()}}, + Rules: []Rule{{Path: "/", Address: listener.Addr().String(), dialer: basicDialer{network, address}}}, } r, err := http.NewRequest("GET", "/", nil) if err != nil { @@ -64,10 +65,10 @@ func TestRuleParseAddress(t *testing.T) { } for _, entry := range getClientTestTable { - if actualnetwork, _ := entry.rule.parseAddress(); actualnetwork != entry.expectednetwork { + if actualnetwork, _ := parseAddress(entry.rule.Address); actualnetwork != entry.expectednetwork { t.Errorf("Unexpected network for address string %v. Got %v, expected %v", entry.rule.Address, actualnetwork, entry.expectednetwork) } - if _, actualaddress := entry.rule.parseAddress(); actualaddress != entry.expectedaddress { + if _, actualaddress := parseAddress(entry.rule.Address); actualaddress != entry.expectedaddress { t.Errorf("Unexpected parsed address for address string %v. Got %v, expected %v", entry.rule.Address, actualaddress, entry.expectedaddress) } } diff --git a/caddyhttp/fastcgi/fcgiclient.go b/caddyhttp/fastcgi/fcgiclient.go index 667ce1018..c06caf73d 100644 --- a/caddyhttp/fastcgi/fcgiclient.go +++ b/caddyhttp/fastcgi/fcgiclient.go @@ -193,9 +193,9 @@ func Dial(network, address string) (fcgi *FCGIClient, err error) { return DialWithDialer(network, address, net.Dialer{}) } -// Close closes fcgi connnection -func (c *FCGIClient) Close() { - c.rwc.Close() +// Close closes fcgi connnection. +func (c *FCGIClient) Close() error { + return c.rwc.Close() } func (c *FCGIClient) writeRecord(recType uint8, content []byte) (err error) { @@ -408,11 +408,11 @@ func (c *FCGIClient) Do(p map[string]string, req io.Reader) (r io.Reader, err er // clientCloser is a io.ReadCloser. It wraps a io.Reader with a Closer // that closes FCGIClient connection. type clientCloser struct { - *FCGIClient + f *FCGIClient io.Reader } -func (f clientCloser) Close() error { return f.rwc.Close() } +func (c clientCloser) Close() error { return c.f.Close() } // Request returns a HTTP Response with Header and Body // from fcgi responder diff --git a/caddyhttp/fastcgi/setup.go b/caddyhttp/fastcgi/setup.go index c7650faa0..9b8bad6c3 100644 --- a/caddyhttp/fastcgi/setup.go +++ b/caddyhttp/fastcgi/setup.go @@ -4,6 +4,7 @@ import ( "errors" "net/http" "path/filepath" + "strconv" "github.com/mholt/caddy" "github.com/mholt/caddy/caddyhttp/httpserver" @@ -72,6 +73,9 @@ func fastcgiParse(c *caddy.Controller) ([]Rule, error) { } } + network, address := parseAddress(rule.Address) + rule.dialer = basicDialer{network: network, address: address} + for c.NextBlock() { switch c.Val() { case "ext": @@ -102,6 +106,19 @@ func fastcgiParse(c *caddy.Controller) ([]Rule, error) { return rules, c.ArgErr() } rule.IgnoredSubPaths = ignoredPaths + case "pool": + if !c.NextArg() { + return rules, c.ArgErr() + } + pool, err := strconv.Atoi(c.Val()) + if err != nil { + return rules, err + } + if pool >= 0 { + rule.dialer = &persistentDialer{size: pool, network: network, address: address} + } else { + return rules, c.Errf("positive integer expected, found %d", pool) + } } }