// Package websocket implements a WebSocket server by executing // a command and piping its input and output through the WebSocket // connection. package websocket import ( "bufio" "bytes" "io" "net" "net/http" "os" "os/exec" "strings" "time" "github.com/gorilla/websocket" "github.com/mholt/caddy/middleware" ) const ( // Time allowed to write a message to the peer. writeWait = 10 * time.Second // Time allowed to read the next pong message from the peer. pongWait = 60 * time.Second // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 // Maximum message size allowed from peer. maxMessageSize = 1024 * 1024 * 10 // 10 MB default. ) var ( // GatewayInterface is the dialect of CGI being used by the server // to communicate with the script. See CGI spec, 4.1.4 GatewayInterface string // ServerSoftware is the name and version of the information server // software making the CGI request. See CGI spec, 4.1.17 ServerSoftware string ) type ( // WebSocket is a type that holds configuration for the // websocket middleware generally, like a list of all the // websocket endpoints. WebSocket struct { // Next is the next HTTP handler in the chain for when the path doesn't match Next middleware.Handler // Sockets holds all the web socket endpoint configurations Sockets []Config } // Config holds the configuration for a single websocket // endpoint which may serve multiple websocket connections. Config struct { Path string Command string Arguments []string Respawn bool // TODO: Not used, but parser supports it until we decide on it } ) // ServeHTTP converts the HTTP request to a WebSocket connection and serves it up. func (ws WebSocket) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) { for _, sockconfig := range ws.Sockets { if middleware.Path(r.URL.Path).Matches(sockconfig.Path) { return serveWS(w, r, &sockconfig) } } // Didn't match a websocket path, so pass-thru return ws.Next.ServeHTTP(w, r) } // serveWS is used for setting and upgrading the HTTP connection to a websocket connection. // It also spawns the child process that is associated with matched HTTP path/url. func serveWS(w http.ResponseWriter, r *http.Request, config *Config) (int, error) { upgrader := websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { return true }, } conn, err := upgrader.Upgrade(w, r, nil) if err != nil { return http.StatusBadRequest, err } defer conn.Close() cmd := exec.Command(config.Command, config.Arguments...) stdout, err := cmd.StdoutPipe() if err != nil { return http.StatusBadGateway, err } defer stdout.Close() stdin, err := cmd.StdinPipe() if err != nil { return http.StatusBadGateway, err } defer stdin.Close() metavars, err := buildEnv(cmd.Path, r) if err != nil { return http.StatusBadGateway, err } cmd.Env = metavars if err := cmd.Start(); err != nil { return http.StatusBadGateway, err } done := make(chan struct{}) go pumpStdout(conn, stdout, done) pumpStdin(conn, stdin) stdin.Close() // close stdin to end the process if err := cmd.Process.Signal(os.Interrupt); err != nil { // signal an interrupt to kill the process return http.StatusInternalServerError, err } select { case <-done: case <-time.After(time.Second): // terminate with extreme prejudice. if err := cmd.Process.Signal(os.Kill); err != nil { return http.StatusInternalServerError, err } <-done } // not sure what we want to do here. // status for an "exited" process is greater // than 0, but isn't really an error per se. // just going to ignore it for now. cmd.Wait() return 0, nil } // buildEnv creates the meta-variables for the child process according // to the CGI 1.1 specification: http://tools.ietf.org/html/rfc3875#section-4.1 // cmdPath should be the path of the command being run. // The returned string slice can be set to the command's Env property. func buildEnv(cmdPath string, r *http.Request) (metavars []string, err error) { if !strings.Contains(r.RemoteAddr, ":") { r.RemoteAddr += ":" } remoteHost, remotePort, err := net.SplitHostPort(r.RemoteAddr) if err != nil { return } if !strings.Contains(r.Host, ":") { r.Host += ":" } serverHost, serverPort, err := net.SplitHostPort(r.Host) if err != nil { return } metavars = []string{ `AUTH_TYPE=`, // Not used `CONTENT_LENGTH=`, // Not used `CONTENT_TYPE=`, // Not used `GATEWAY_INTERFACE=` + GatewayInterface, `PATH_INFO=`, // TODO `PATH_TRANSLATED=`, // TODO `QUERY_STRING=` + r.URL.RawQuery, `REMOTE_ADDR=` + remoteHost, `REMOTE_HOST=` + remoteHost, // Host lookups are slow - don't do them `REMOTE_IDENT=`, // Not used `REMOTE_PORT=` + remotePort, `REMOTE_USER=`, // Not used, `REQUEST_METHOD=` + r.Method, `REQUEST_URI=` + r.RequestURI, `SCRIPT_NAME=` + cmdPath, // path of the program being executed `SERVER_NAME=` + serverHost, `SERVER_PORT=` + serverPort, `SERVER_PROTOCOL=` + r.Proto, `SERVER_SOFTWARE=` + ServerSoftware, } // Add each HTTP header to the environment as well for header, values := range r.Header { value := strings.Join(values, ", ") header = strings.ToUpper(header) header = strings.Replace(header, "-", "_", -1) value = strings.Replace(value, "\n", " ", -1) metavars = append(metavars, "HTTP_"+header+"="+value) } return } // pumpStdin handles reading data from the websocket connection and writing // it to stdin of the process. func pumpStdin(conn *websocket.Conn, stdin io.WriteCloser) { // Setup our connection's websocket ping/pong handlers from our const values. defer conn.Close() conn.SetReadLimit(maxMessageSize) conn.SetReadDeadline(time.Now().Add(pongWait)) conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) for { _, message, err := conn.ReadMessage() if err != nil { break } message = append(message, '\n') if _, err := stdin.Write(message); err != nil { break } } } // pumpStdout handles reading data from stdout of the process and writing // it to websocket connection. func pumpStdout(conn *websocket.Conn, stdout io.Reader, done chan struct{}) { go pinger(conn, done) defer func() { conn.Close() close(done) // make sure to close the pinger when we are done. }() s := bufio.NewScanner(stdout) for s.Scan() { conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := conn.WriteMessage(websocket.TextMessage, bytes.TrimSpace(s.Bytes())); err != nil { break } } if s.Err() != nil { conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseGoingAway, s.Err().Error()), time.Time{}) } } // pinger simulates the websocket to keep it alive with ping messages. func pinger(conn *websocket.Conn, done chan struct{}) { ticker := time.NewTicker(pingPeriod) defer ticker.Stop() for { // blocking loop with select to wait for stimulation. select { case <-ticker.C: if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)); err != nil { conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseGoingAway, err.Error()), time.Time{}) return } case <-done: return // clean up this routine. } } }