From 8cb4e9085273e9678d73b4ebce51fbcb18852bf9 Mon Sep 17 00:00:00 2001 From: Sebastian Schmittner Date: Wed, 28 Sep 2016 02:12:22 +0200 Subject: [PATCH] Add fix and tests for FastCGI persistent connections (#1134) * keep fastcgi connection open * poor mans serialisation to make up for the lack of demuxing * pointing includes to echse's repo * Revert "pointing includes to echse's repo" This reverts commit 281daad8d437ba88fadd65fae5af2da99d449bea. * switch for persistent fcgi connections on/off added * fixing ineffectual assignments * camel case instead of _ * only activate persistent sockets on windows (and some naming conventions/cleanup) * gitfm import sorting * Revert "fixing ineffectual assignments" This reverts commit 79760344e7b1231e59a2867246e7689b05e92e18. # Conflicts: # caddyhttp/staticfiles/fileserver.go * added another mutex and deleting map entries. thx to mholts QA comments! * thinking about it, this RW lock was not a good idea here * thread safety * I keep learning about mutexs in go * some cosmetics * adding persistant fastcgi connections switch to directive * Support for configurable connection pool. * ensure positive integer pool size config * abisofts pool fix + nicer logging for the fastcgi_test * abisoft wants to have dialer comparison in _test instead of next to struct * Do not put dead connections back into pool * Fix fastcgi header error * Do not put dead connections back into pool * some code style improvements from the discussion in https://github.com/mholt/caddy/pull/1134 * abisofts naming convention --- caddyhttp/fastcgi/dialer.go | 1 - caddyhttp/fastcgi/fastcgi.go | 5 +- caddyhttp/fastcgi/fastcgi_test.go | 96 +++++++++++++++++++++++++++++++ caddyhttp/fastcgi/fcgiclient.go | 8 ++- caddyhttp/fastcgi/setup_test.go | 82 +++++++++++++++++++++++++- 5 files changed, 184 insertions(+), 8 deletions(-) diff --git a/caddyhttp/fastcgi/dialer.go b/caddyhttp/fastcgi/dialer.go index df33fab62..58a8f1563 100644 --- a/caddyhttp/fastcgi/dialer.go +++ b/caddyhttp/fastcgi/dialer.go @@ -27,7 +27,6 @@ type persistentDialer struct { func (p *persistentDialer) Dial() (*FCGIClient, error) { p.Lock() - // connection is available, return first one. if len(p.pool) > 0 { client := p.pool[0] diff --git a/caddyhttp/fastcgi/fastcgi.go b/caddyhttp/fastcgi/fastcgi.go index 5b40b9367..c3e1cab95 100644 --- a/caddyhttp/fastcgi/fastcgi.go +++ b/caddyhttp/fastcgi/fastcgi.go @@ -90,8 +90,6 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) resp, err = fcgiBackend.Post(env, r.Method, r.Header.Get("Content-Type"), r.Body, contentLength) } - defer fcgiBackend.Close() - if err != nil && err != io.EOF { return http.StatusBadGateway, err } @@ -105,6 +103,8 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) return http.StatusBadGateway, err } + defer rule.dialer.Close(fcgiBackend) + // Log any stderr output from upstream if fcgiBackend.stderr.Len() != 0 { // Remove trailing newline, error logger already does this. @@ -269,6 +269,7 @@ func (h Handler) buildEnv(r *http.Request, rule Rule, fpath string) (map[string] } // Rule represents a FastCGI handling rule. +// It is parsed from the fastcgi directive in the Caddyfile, see setup.go. type Rule struct { // The base path to match. Required. Path string diff --git a/caddyhttp/fastcgi/fastcgi_test.go b/caddyhttp/fastcgi/fastcgi_test.go index 3d23c44c5..46f06c9bc 100644 --- a/caddyhttp/fastcgi/fastcgi_test.go +++ b/caddyhttp/fastcgi/fastcgi_test.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "net/url" "strconv" + "sync" "testing" ) @@ -51,6 +52,101 @@ func TestServeHTTP(t *testing.T) { } } +// connectionCounter in fact is a listener with an added counter to keep track +// of the number of accepted connections. +type connectionCounter struct { + net.Listener + sync.Mutex + counter int +} + +func (l *connectionCounter) Accept() (net.Conn, error) { + l.Lock() + l.counter++ + l.Unlock() + return l.Listener.Accept() +} + +// TestPersistent ensures that persistent +// as well as the non-persistent fastCGI servers +// send the answers corresnponding to the correct request. +// It also checks the number of tcp connections used. +func TestPersistent(t *testing.T) { + numberOfRequests := 32 + + for _, poolsize := range []int{0, 1, 5, numberOfRequests} { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("Unable to create listener for test: %v", err) + } + + listener := &connectionCounter{l, *new(sync.Mutex), 0} + + // this fcgi server replies with the request URL + go fcgi.Serve(listener, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body := "This answers a request to " + r.URL.Path + bodyLenStr := strconv.Itoa(len(body)) + + w.Header().Set("Content-Length", bodyLenStr) + w.Write([]byte(body)) + })) + + network, address := parseAddress(listener.Addr().String()) + handler := Handler{ + Next: nil, + Rules: []Rule{{Path: "/", Address: listener.Addr().String(), dialer: &persistentDialer{size: poolsize, network: network, address: address}}}, + } + + var semaphore sync.WaitGroup + serialMutex := new(sync.Mutex) + + serialCounter := 0 + parallelCounter := 0 + // make some serial followed by some + // parallel requests to challenge the handler + for _, serialize := range []bool{true, false, false, false} { + if serialize { + serialCounter++ + } else { + parallelCounter++ + } + semaphore.Add(numberOfRequests) + + for i := 0; i < numberOfRequests; i++ { + go func(i int, serialize bool) { + defer semaphore.Done() + if serialize { + serialMutex.Lock() + defer serialMutex.Unlock() + } + r, err := http.NewRequest("GET", "/"+strconv.Itoa(i), nil) + if err != nil { + t.Errorf("Unable to create request: %v", err) + } + w := httptest.NewRecorder() + + status, err := handler.ServeHTTP(w, r) + + if status != 0 { + t.Errorf("Handler(pool: %v) return status %v", poolsize, status) + } + if err != nil { + t.Errorf("Handler(pool: %v) Error: %v", poolsize, err) + } + want := "This answers a request to /" + strconv.Itoa(i) + if got := w.Body.String(); got != want { + t.Errorf("Expected response from handler(pool: %v) to be '%s', got: '%s'", poolsize, want, got) + } + }(i, serialize) + } //next request + semaphore.Wait() + } // next set of requests (serial/parallel) + + listener.Close() + t.Logf("The pool: %v test used %v tcp connections to answer %v * %v serial and %v * %v parallel requests.", poolsize, listener.counter, serialCounter, numberOfRequests, parallelCounter, numberOfRequests) + } // next handler (persistent/non-persistent) +} + func TestRuleParseAddress(t *testing.T) { getClientTestTable := []struct { rule *Rule diff --git a/caddyhttp/fastcgi/fcgiclient.go b/caddyhttp/fastcgi/fcgiclient.go index c06caf73d..d3773c9b4 100644 --- a/caddyhttp/fastcgi/fcgiclient.go +++ b/caddyhttp/fastcgi/fcgiclient.go @@ -138,7 +138,7 @@ func (rec *record) read(r io.Reader) (buf []byte, err error) { return } if rec.h.Version != 1 { - err = errors.New("fcgi: invalid header version") + err = errInvalidHeaderVersion return } if rec.h.Type == EndRequest { @@ -358,7 +358,9 @@ func (w *streamReader) Read(p []byte) (n int, err error) { rec := &record{} var buf []byte buf, err = rec.read(w.c.rwc) - if err != nil { + if err == errInvalidHeaderVersion { + continue + } else if err != nil { return } // standard error output @@ -561,3 +563,5 @@ func (c *FCGIClient) PostFile(p map[string]string, data url.Values, file map[str // Checks whether chunked is part of the encodings stack func chunked(te []string) bool { return len(te) > 0 && te[0] == "chunked" } + +var errInvalidHeaderVersion = errors.New("fcgi: invalid header version") diff --git a/caddyhttp/fastcgi/setup_test.go b/caddyhttp/fastcgi/setup_test.go index 123b4b9e8..b9fd6b0d0 100644 --- a/caddyhttp/fastcgi/setup_test.go +++ b/caddyhttp/fastcgi/setup_test.go @@ -2,6 +2,7 @@ package fastcgi import ( "fmt" + "reflect" "testing" "github.com/mholt/caddy" @@ -35,7 +36,34 @@ func TestSetup(t *testing.T) { } +func (p *persistentDialer) Equals(q *persistentDialer) bool { + if p.size != q.size { + return false + } + if p.network != q.network { + return false + } + if p.address != q.address { + return false + } + + if len(p.pool) != len(q.pool) { + return false + } + for i, client := range p.pool { + if client != q.pool[i] { + return false + } + } + // ignore mutex state + return true +} + func TestFastcgiParse(t *testing.T) { + defaultAddress := "127.0.0.1:9001" + network, address := parseAddress(defaultAddress) + t.Logf("Address '%v' was parsed to network '%v' and address '%v'", defaultAddress, network, address) + tests := []struct { inputFastcgiConfig string shouldErr bool @@ -48,19 +76,21 @@ func TestFastcgiParse(t *testing.T) { Address: "127.0.0.1:9000", Ext: ".php", SplitPath: ".php", + dialer: basicDialer{network: "tcp", address: "127.0.0.1:9000"}, IndexFiles: []string{"index.php"}, }}}, - {`fastcgi / 127.0.0.1:9001 { + {`fastcgi / ` + defaultAddress + ` { split .html }`, false, []Rule{{ Path: "/", - Address: "127.0.0.1:9001", + Address: defaultAddress, Ext: "", SplitPath: ".html", + dialer: basicDialer{network: network, address: address}, IndexFiles: []string{}, }}}, - {`fastcgi / 127.0.0.1:9001 { + {`fastcgi / ` + defaultAddress + ` { split .html except /admin /user }`, @@ -69,9 +99,32 @@ func TestFastcgiParse(t *testing.T) { Address: "127.0.0.1:9001", Ext: "", SplitPath: ".html", + dialer: basicDialer{network: network, address: address}, IndexFiles: []string{}, IgnoredSubPaths: []string{"/admin", "/user"}, }}}, + {`fastcgi / ` + defaultAddress + ` { + pool 0 + }`, + false, []Rule{{ + Path: "/", + Address: defaultAddress, + Ext: "", + SplitPath: "", + dialer: &persistentDialer{size: 0, network: network, address: address}, + IndexFiles: []string{}, + }}}, + {`fastcgi / ` + defaultAddress + ` { + pool 5 + }`, + false, []Rule{{ + Path: "/", + Address: defaultAddress, + Ext: "", + SplitPath: "", + dialer: &persistentDialer{size: 5, network: network, address: address}, + IndexFiles: []string{}, + }}}, } for i, test := range tests { actualFastcgiConfigs, err := fastcgiParse(caddy.NewTestController("http", test.inputFastcgiConfig)) @@ -107,6 +160,29 @@ func TestFastcgiParse(t *testing.T) { i, j, test.expectedFastcgiConfig[j].SplitPath, actualFastcgiConfig.SplitPath) } + if reflect.TypeOf(actualFastcgiConfig.dialer) != reflect.TypeOf(test.expectedFastcgiConfig[j].dialer) { + t.Errorf("Test %d expected %dth FastCGI dialer to be of type %T, but got %T", + i, j, test.expectedFastcgiConfig[j].dialer, actualFastcgiConfig.dialer) + } else { + equal := true + switch actual := actualFastcgiConfig.dialer.(type) { + case basicDialer: + equal = actualFastcgiConfig.dialer == test.expectedFastcgiConfig[j].dialer + case *persistentDialer: + if expected, ok := test.expectedFastcgiConfig[j].dialer.(*persistentDialer); ok { + equal = actual.Equals(expected) + } else { + equal = false + } + default: + t.Errorf("Unkonw dialer type %T", actualFastcgiConfig.dialer) + } + if !equal { + t.Errorf("Test %d expected %dth FastCGI dialer to be %v, but got %v", + i, j, test.expectedFastcgiConfig[j].dialer, actualFastcgiConfig.dialer) + } + } + if fmt.Sprint(actualFastcgiConfig.IndexFiles) != fmt.Sprint(test.expectedFastcgiConfig[j].IndexFiles) { t.Errorf("Test %d expected %dth FastCGI IndexFiles to be %s , but got %s", i, j, test.expectedFastcgiConfig[j].IndexFiles, actualFastcgiConfig.IndexFiles)