rc: factor server code into rcserver and implement serving objects

If a GET or HEAD request is receivied with a URL parameter of fs then
it will be served from that remote.
This commit is contained in:
Nick Craig-Wood 2018-10-27 18:29:20 +01:00
parent aa9b2c31f4
commit 0bfa9811f7
9 changed files with 253 additions and 199 deletions

View File

@ -31,8 +31,8 @@ import (
"github.com/ncw/rclone/fs/config" "github.com/ncw/rclone/fs/config"
"github.com/ncw/rclone/fs/config/configmap" "github.com/ncw/rclone/fs/config/configmap"
"github.com/ncw/rclone/fs/object" "github.com/ncw/rclone/fs/object"
"github.com/ncw/rclone/fs/rc"
"github.com/ncw/rclone/fs/rc/rcflags" "github.com/ncw/rclone/fs/rc/rcflags"
"github.com/ncw/rclone/fs/rc/rcserver"
"github.com/ncw/rclone/fstest" "github.com/ncw/rclone/fstest"
"github.com/ncw/rclone/vfs" "github.com/ncw/rclone/vfs"
"github.com/ncw/rclone/vfs/vfsflags" "github.com/ncw/rclone/vfs/vfsflags"
@ -693,7 +693,7 @@ func TestInternalChangeSeenAfterDirCacheFlush(t *testing.T) {
func TestInternalChangeSeenAfterRc(t *testing.T) { func TestInternalChangeSeenAfterRc(t *testing.T) {
rcflags.Opt.Enabled = true rcflags.Opt.Enabled = true
rc.Start(&rcflags.Opt) rcserver.Start(&rcflags.Opt)
id := fmt.Sprintf("ticsarc%v", time.Now().Unix()) id := fmt.Sprintf("ticsarc%v", time.Now().Unix())
rootFs, boltDb := runInstance.newCacheFs(t, remoteName, id, false, true, nil, nil) rootFs, boltDb := runInstance.newCacheFs(t, remoteName, id, false, true, nil, nil)

View File

@ -29,8 +29,8 @@ import (
"github.com/ncw/rclone/fs/fserrors" "github.com/ncw/rclone/fs/fserrors"
"github.com/ncw/rclone/fs/fspath" "github.com/ncw/rclone/fs/fspath"
fslog "github.com/ncw/rclone/fs/log" fslog "github.com/ncw/rclone/fs/log"
"github.com/ncw/rclone/fs/rc"
"github.com/ncw/rclone/fs/rc/rcflags" "github.com/ncw/rclone/fs/rc/rcflags"
"github.com/ncw/rclone/fs/rc/rcserver"
"github.com/ncw/rclone/lib/atexit" "github.com/ncw/rclone/lib/atexit"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -352,8 +352,8 @@ func initConfig() {
// Write the args for debug purposes // Write the args for debug purposes
fs.Debugf("rclone", "Version %q starting with parameters %q", fs.Version, os.Args) fs.Debugf("rclone", "Version %q starting with parameters %q", fs.Version, os.Args)
// Start the remote control if configured // Start the remote control server if configured
rc.Start(&rcflags.Opt) rcserver.Start(&rcflags.Opt)
// Setup CPU profiling if desired // Setup CPU profiling if desired
if *cpuProfile != "" { if *cpuProfile != "" {

View File

@ -4,8 +4,8 @@ import (
"log" "log"
"github.com/ncw/rclone/cmd" "github.com/ncw/rclone/cmd"
"github.com/ncw/rclone/fs/rc"
"github.com/ncw/rclone/fs/rc/rcflags" "github.com/ncw/rclone/fs/rc/rcflags"
"github.com/ncw/rclone/fs/rc/rcserver"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -35,7 +35,7 @@ the browser when rclone is run.
if len(args) > 0 { if len(args) > 0 {
rcflags.Opt.Files = args[0] rcflags.Opt.Files = args[0]
} }
rc.Start(&rcflags.Opt) rcserver.Start(&rcflags.Opt)
// Run the rc forever // Run the rc forever
select {} select {}
}, },

View File

@ -362,9 +362,31 @@ blob in the body. There are examples of these below using `curl`.
The response will be a JSON blob in the body of the response. This is The response will be a JSON blob in the body of the response. This is
formatted to be reasonably human readable. formatted to be reasonably human readable.
If an error occurs then there will be an HTTP error status (usually ### Error returns
400) and the body of the response will contain a JSON encoded error
object. If an error occurs then there will be an HTTP error status (eg 500)
and the body of the response will contain a JSON encoded error object,
eg
```
{
"error": "Expecting string value for key \"remote\" (was float64)",
"input": {
"fs": "/tmp",
"remote": 3
},
"status": 400
"path": "operations/rmdir",
}
```
The keys in the error response are
- error - error string
- input - the input parameters to the call
- status - the HTTP status code
- path - the path of the call
### CORS
The sever implements basic CORS support and allows all origins for that. The sever implements basic CORS support and allows all origins for that.
The response to a preflight OPTIONS request will echo the requested "Access-Control-Request-Headers" back. The response to a preflight OPTIONS request will echo the requested "Access-Control-Request-Headers" back.

View File

@ -14,16 +14,11 @@ var (
fsNewFs = fs.NewFs // for tests fsNewFs = fs.NewFs // for tests
) )
// GetFsNamed gets a fs.Fs named fsName either from the cache or creates it afresh // GetCachedFs gets a fs.Fs named fsString either from the cache or creates it afresh
func GetFsNamed(in Params, fsName string) (f fs.Fs, err error) { func GetCachedFs(fsString string) (f fs.Fs, err error) {
fsCacheMu.Lock() fsCacheMu.Lock()
defer fsCacheMu.Unlock() defer fsCacheMu.Unlock()
fsString, err := in.GetString(fsName)
if err != nil {
return nil, err
}
f = fsCache[fsString] f = fsCache[fsString]
if f == nil { if f == nil {
f, err = fsNewFs(fsString) f, err = fsNewFs(fsString)
@ -34,6 +29,16 @@ func GetFsNamed(in Params, fsName string) (f fs.Fs, err error) {
return f, err return f, err
} }
// GetFsNamed gets a fs.Fs named fsName either from the cache or creates it afresh
func GetFsNamed(in Params, fsName string) (f fs.Fs, err error) {
fsString, err := in.GetString(fsName)
if err != nil {
return nil, err
}
return GetCachedFs(fsString)
}
// GetFs gets a fs.Fs named "fs" either from the cache or creates it afresh // GetFs gets a fs.Fs named "fs" either from the cache or creates it afresh
func GetFs(in Params) (f fs.Fs, err error) { func GetFs(in Params) (f fs.Fs, err error) {
return GetFsNamed(in, "fs") return GetFsNamed(in, "fs")

View File

@ -84,7 +84,7 @@ func rcError(in Params) (out Params, err error) {
// List the registered commands // List the registered commands
func rcList(in Params) (out Params, err error) { func rcList(in Params) (out Params, err error) {
out = make(Params) out = make(Params)
out["commands"] = registry.list() out["commands"] = Calls.List()
return out, nil return out, nil
} }
@ -125,7 +125,6 @@ func rcMemStats(in Params) (out Params, err error) {
// Do a garbage collection run // Do a garbage collection run
func rcGc(in Params) (out Params, err error) { func rcGc(in Params) (out Params, err error) {
out = make(Params)
runtime.GC() runtime.GC()
return out, nil return nil, nil
} }

View File

@ -10,15 +10,9 @@ package rc
import ( import (
"encoding/json" "encoding/json"
"io" "io"
"mime"
"net/http"
_ "net/http/pprof" // install the pprof http handlers _ "net/http/pprof" // install the pprof http handlers
"strings"
"github.com/ncw/rclone/cmd/serve/httplib" "github.com/ncw/rclone/cmd/serve/httplib"
"github.com/ncw/rclone/fs"
"github.com/pkg/errors"
"github.com/skratchdot/open-golang/open"
) )
// Options contains options for the remote control server // Options contains options for the remote control server
@ -38,174 +32,9 @@ func init() {
DefaultOpt.HTTPOptions.ListenAddr = "localhost:5572" DefaultOpt.HTTPOptions.ListenAddr = "localhost:5572"
} }
// Start the remote control server if configured
func Start(opt *Options) {
if opt.Enabled {
s := newServer(opt)
go s.serve()
}
}
// server contains everything to run the server
type server struct {
srv *httplib.Server
files http.Handler
}
func newServer(opt *Options) *server {
// Serve on the DefaultServeMux so can have global registrations appear
mux := http.DefaultServeMux
s := &server{
srv: httplib.NewServer(mux, &opt.HTTPOptions),
}
mux.HandleFunc("/", s.handler)
// Add some more mime types which are often missing
mime.AddExtensionType(".wasm", "application/wasm")
mime.AddExtensionType(".js", "application/javascript")
// File handling
s.files = http.NewServeMux()
if opt.Files != "" {
fs.Logf(nil, "Serving files from %q", opt.Files)
s.files = http.FileServer(http.Dir(opt.Files))
}
return s
}
// serve runs the http server - doesn't return
func (s *server) serve() {
err := s.srv.Serve()
if err != nil {
fs.Errorf(nil, "Opening listener: %v", err)
}
fs.Logf(nil, "Serving remote control on %s", s.srv.URL())
// Open the files in the browser if set
if s.files != nil {
_ = open.Start(s.srv.URL())
}
s.srv.Wait()
}
// WriteJSON writes JSON in out to w // WriteJSON writes JSON in out to w
func WriteJSON(w io.Writer, out Params) error { func WriteJSON(w io.Writer, out Params) error {
enc := json.NewEncoder(w) enc := json.NewEncoder(w)
enc.SetIndent("", "\t") enc.SetIndent("", "\t")
return enc.Encode(out) return enc.Encode(out)
} }
// writeError writes a formatted error to the output
func writeError(path string, in Params, w http.ResponseWriter, err error, status int) {
fs.Errorf(nil, "rc: %q: error: %v", path, err)
// Adjust the error return for some well known errors
switch errors.Cause(err) {
case fs.ErrorDirNotFound, fs.ErrorObjectNotFound:
status = http.StatusNotFound
}
w.WriteHeader(status)
err = WriteJSON(w, Params{
"error": err.Error(),
"input": in,
})
if err != nil {
// can't return the error at this point
fs.Errorf(nil, "rc: failed to write JSON output: %v", err)
}
}
// handler reads incoming requests and dispatches them
func (s *server) handler(w http.ResponseWriter, r *http.Request) {
path := strings.Trim(r.URL.Path, "/")
w.Header().Add("Access-Control-Allow-Origin", "*")
// echo back access control headers client needs
reqAccessHeaders := r.Header.Get("Access-Control-Request-Headers")
w.Header().Add("Access-Control-Allow-Headers", reqAccessHeaders)
switch r.Method {
case "POST":
s.handlePost(w, r, path)
case "OPTIONS":
s.handleOptions(w, r, path)
case "GET":
s.handleGet(w, r, path)
default:
writeError(path, nil, w, errors.Errorf("method %q not allowed", r.Method), http.StatusMethodNotAllowed)
return
}
}
func (s *server) handlePost(w http.ResponseWriter, r *http.Request, path string) {
// Parse the POST and URL parameters into r.Form, for others r.Form will be empty value
err := r.ParseForm()
if err != nil {
writeError(path, nil, w, errors.Wrap(err, "failed to parse form/URL parameters"), http.StatusBadRequest)
return
}
// Read the POST and URL parameters into in
in := make(Params)
for k, vs := range r.Form {
if len(vs) > 0 {
in[k] = vs[len(vs)-1]
}
}
// Parse a JSON blob from the input
if r.Header.Get("Content-Type") == "application/json" {
err := json.NewDecoder(r.Body).Decode(&in)
if err != nil {
writeError(path, in, w, errors.Wrap(err, "failed to read input JSON"), http.StatusBadRequest)
return
}
}
// Find the call
call := registry.get(path)
if call == nil {
writeError(path, in, w, errors.Errorf("couldn't find method %q", path), http.StatusMethodNotAllowed)
return
}
// Check to see if it is async or not
isAsync, err := in.GetBool("_async")
if err != nil {
writeError(path, in, w, err, http.StatusBadRequest)
return
}
fs.Debugf(nil, "rc: %q: with parameters %+v", path, in)
var out Params
if isAsync {
out, err = StartJob(call.Fn, in)
} else {
out, err = call.Fn(in)
}
if err != nil {
writeError(path, in, w, err, http.StatusInternalServerError)
return
}
if out == nil {
out = make(Params)
}
fs.Debugf(nil, "rc: %q: reply %+v: %v", path, out, err)
err = WriteJSON(w, out)
if err != nil {
// can't return the error at this point
fs.Errorf(nil, "rc: failed to write JSON output: %v", err)
}
}
func (s *server) handleOptions(w http.ResponseWriter, r *http.Request, path string) {
w.WriteHeader(http.StatusOK)
}
func (s *server) handleGet(w http.ResponseWriter, r *http.Request, path string) {
if s.files == nil {
w.WriteHeader(http.StatusNotFound)
return
}
s.files.ServeHTTP(w, r)
}

199
fs/rc/rcserver/rcserver.go Normal file
View File

@ -0,0 +1,199 @@
// Package rcserver implements the HTTP endpoint to serve the remote control
package rcserver
import (
"encoding/json"
"mime"
"net/http"
"strings"
"github.com/ncw/rclone/cmd/serve/httplib"
"github.com/ncw/rclone/cmd/serve/httplib/serve"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/rc"
"github.com/pkg/errors"
"github.com/skratchdot/open-golang/open"
)
// Start the remote control server if configured
func Start(opt *rc.Options) {
if opt.Enabled {
s := newServer(opt)
go s.serve()
}
}
// server contains everything to run the server
type server struct {
srv *httplib.Server
files http.Handler
}
func newServer(opt *rc.Options) *server {
// Serve on the DefaultServeMux so can have global registrations appear
mux := http.DefaultServeMux
s := &server{
srv: httplib.NewServer(mux, &opt.HTTPOptions),
}
mux.HandleFunc("/", s.handler)
// Add some more mime types which are often missing
_ = mime.AddExtensionType(".wasm", "application/wasm")
_ = mime.AddExtensionType(".js", "application/javascript")
// File handling
if opt.Files != "" {
fs.Logf(nil, "Serving files from %q", opt.Files)
s.files = http.FileServer(http.Dir(opt.Files))
}
return s
}
// serve runs the http server - doesn't return
func (s *server) serve() {
err := s.srv.Serve()
if err != nil {
fs.Errorf(nil, "Opening listener: %v", err)
}
fs.Logf(nil, "Serving remote control on %s", s.srv.URL())
// Open the files in the browser if set
if s.files != nil {
_ = open.Start(s.srv.URL())
}
s.srv.Wait()
}
// writeError writes a formatted error to the output
func writeError(path string, in rc.Params, w http.ResponseWriter, err error, status int) {
fs.Errorf(nil, "rc: %q: error: %v", path, err)
// Adjust the error return for some well known errors
errOrig := errors.Cause(err)
switch {
case errOrig == fs.ErrorDirNotFound || errOrig == fs.ErrorObjectNotFound:
status = http.StatusNotFound
case rc.IsErrParamInvalid(err) || rc.IsErrParamNotFound(err):
status = http.StatusBadRequest
}
w.WriteHeader(status)
err = rc.WriteJSON(w, rc.Params{
"status": status,
"error": err.Error(),
"input": in,
"path": path,
})
if err != nil {
// can't return the error at this point
fs.Errorf(nil, "rc: failed to write JSON output: %v", err)
}
}
// handler reads incoming requests and dispatches them
func (s *server) handler(w http.ResponseWriter, r *http.Request) {
path := strings.Trim(r.URL.Path, "/")
w.Header().Add("Access-Control-Allow-Origin", "*")
// echo back access control headers client needs
reqAccessHeaders := r.Header.Get("Access-Control-Request-Headers")
w.Header().Add("Access-Control-Allow-Headers", reqAccessHeaders)
switch r.Method {
case "POST":
s.handlePost(w, r, path)
case "OPTIONS":
s.handleOptions(w, r, path)
case "GET":
s.handleGet(w, r, path)
default:
writeError(path, nil, w, errors.Errorf("method %q not allowed", r.Method), http.StatusMethodNotAllowed)
return
}
}
func (s *server) handlePost(w http.ResponseWriter, r *http.Request, path string) {
// Parse the POST and URL parameters into r.Form, for others r.Form will be empty value
err := r.ParseForm()
if err != nil {
writeError(path, nil, w, errors.Wrap(err, "failed to parse form/URL parameters"), http.StatusBadRequest)
return
}
// Read the POST and URL parameters into in
in := make(rc.Params)
for k, vs := range r.Form {
if len(vs) > 0 {
in[k] = vs[len(vs)-1]
}
}
// Parse a JSON blob from the input
if r.Header.Get("Content-Type") == "application/json" {
err := json.NewDecoder(r.Body).Decode(&in)
if err != nil {
writeError(path, in, w, errors.Wrap(err, "failed to read input JSON"), http.StatusBadRequest)
return
}
}
// Find the call
call := rc.Calls.Get(path)
if call == nil {
writeError(path, in, w, errors.Errorf("couldn't find method %q", path), http.StatusMethodNotAllowed)
return
}
// Check to see if it is async or not
isAsync, err := in.GetBool("_async")
if rc.NotErrParamNotFound(err) {
writeError(path, in, w, err, http.StatusBadRequest)
return
}
fs.Debugf(nil, "rc: %q: with parameters %+v", path, in)
var out rc.Params
if isAsync {
out, err = rc.StartJob(call.Fn, in)
} else {
out, err = call.Fn(in)
}
if err != nil {
writeError(path, in, w, err, http.StatusInternalServerError)
return
}
if out == nil {
out = make(rc.Params)
}
fs.Debugf(nil, "rc: %q: reply %+v: %v", path, out, err)
err = rc.WriteJSON(w, out)
if err != nil {
// can't return the error at this point
fs.Errorf(nil, "rc: failed to write JSON output: %v", err)
}
}
func (s *server) handleOptions(w http.ResponseWriter, r *http.Request, path string) {
w.WriteHeader(http.StatusOK)
}
func (s *server) handleGet(w http.ResponseWriter, r *http.Request, path string) {
// if we have an &fs parameter we are serving from a different fs
fsName := r.URL.Query().Get("fs")
if fsName != "" {
f, err := rc.GetCachedFs(fsName)
if err != nil {
writeError(path, nil, w, errors.Wrap(err, "failed to make Fs"), http.StatusInternalServerError)
return
}
o, err := f.NewObject(path)
if err != nil {
writeError(path, nil, w, errors.Wrap(err, "failed to find object"), http.StatusInternalServerError)
return
}
serve.Object(w, r, o)
} else if s.files == nil {
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
} else {
s.files.ServeHTTP(w, r)
}
}

View File

@ -36,7 +36,7 @@ func NewRegistry() *Registry {
} }
// Add a call to the registry // Add a call to the registry
func (r *Registry) add(call Call) { func (r *Registry) Add(call Call) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
call.Path = strings.Trim(call.Path, "/") call.Path = strings.Trim(call.Path, "/")
@ -45,15 +45,15 @@ func (r *Registry) add(call Call) {
r.call[call.Path] = &call r.call[call.Path] = &call
} }
// get a Call from a path or nil // Get a Call from a path or nil
func (r *Registry) get(path string) *Call { func (r *Registry) Get(path string) *Call {
r.mu.RLock() r.mu.RLock()
defer r.mu.RUnlock() defer r.mu.RUnlock()
return r.call[path] return r.call[path]
} }
// get a list of all calls in alphabetical order // List of all calls in alphabetical order
func (r *Registry) list() (out []*Call) { func (r *Registry) List() (out []*Call) {
r.mu.RLock() r.mu.RLock()
defer r.mu.RUnlock() defer r.mu.RUnlock()
var keys []string var keys []string
@ -67,10 +67,10 @@ func (r *Registry) list() (out []*Call) {
return out return out
} }
// The global registry // Calls is the global registry of Call objects
var registry = NewRegistry() var Calls = NewRegistry()
// Add a function to the global registry // Add a function to the global registry
func Add(call Call) { func Add(call Call) {
registry.add(call) Calls.Add(call)
} }