From 4b358ff43ba941e34b1c66134b756b4e8a484822 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Wed, 20 Apr 2022 17:57:43 +0100 Subject: [PATCH] combine: backend to combine multiple remotes in one directory tree Fixes #5600 --- README.md | 13 + backend/all/all.go | 1 + backend/combine/combine.go | 941 +++++++++++++++++++++++ backend/combine/combine_internal_test.go | 94 +++ backend/combine/combine_test.go | 79 ++ bin/make_manual.py | 1 + docs/content/_index.md | 14 + docs/content/combine.md | 156 ++++ docs/content/docs.md | 1 + docs/layouts/chrome/navbar.html | 1 + fstest/test_all/config.yaml | 3 + 11 files changed, 1304 insertions(+) create mode 100644 backend/combine/combine.go create mode 100644 backend/combine/combine_internal_test.go create mode 100644 backend/combine/combine_test.go create mode 100644 docs/content/combine.md diff --git a/README.md b/README.md index 0235323b5..206517733 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,19 @@ Rclone *("rsync for cloud storage")* is a command-line program to sync files and Please see [the full list of all storage providers and their features](https://rclone.org/overview/) +### Virtual storage providers + +These backends adapt or modify other storage providers + + * Alias: rename existing remotes [:page_facing_up:](https://rclone.org/alias/) + * Cache: cache remotes (DEPRECATED) [:page_facing_up:](https://rclone.org/cache/) + * Chunker: split large files [:page_facing_up:](https://rclone.org/chunker/) + * Combine: combine multiple remotes into a directory tree [:page_facing_up:](https://rclone.org/combine/) + * Compress: compress files [:page_facing_up:](https://rclone.org/compress/) + * Crypt: encrypt files [:page_facing_up:](https://rclone.org/crypt/) + * Hasher: hash files [:page_facing_up:](https://rclone.org/hasher/) + * Union: join multiple remotes to work together [:page_facing_up:](https://rclone.org/union/) + ## Features * MD5/SHA-1 hashes checked at all times for file integrity diff --git a/backend/all/all.go b/backend/all/all.go index a93909eca..44414b287 100644 --- a/backend/all/all.go +++ b/backend/all/all.go @@ -9,6 +9,7 @@ import ( _ "github.com/rclone/rclone/backend/box" _ "github.com/rclone/rclone/backend/cache" _ "github.com/rclone/rclone/backend/chunker" + _ "github.com/rclone/rclone/backend/combine" _ "github.com/rclone/rclone/backend/compress" _ "github.com/rclone/rclone/backend/crypt" _ "github.com/rclone/rclone/backend/drive" diff --git a/backend/combine/combine.go b/backend/combine/combine.go new file mode 100644 index 000000000..c3c5ff38a --- /dev/null +++ b/backend/combine/combine.go @@ -0,0 +1,941 @@ +// Package combine implents a backend to combine multipe remotes in a directory tree +package combine + +/* + Have API to add/remove branches in the combine +*/ + +import ( + "context" + "errors" + "fmt" + "io" + "path" + "strings" + "sync" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/cache" + "github.com/rclone/rclone/fs/config/configmap" + "github.com/rclone/rclone/fs/config/configstruct" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/fs/operations" + "github.com/rclone/rclone/fs/walk" + "golang.org/x/sync/errgroup" +) + +// Register with Fs +func init() { + fsi := &fs.RegInfo{ + Name: "combine", + Description: "Combine several remotes into one", + NewFs: NewFs, + Options: []fs.Option{{ + Name: "upstreams", + Help: `Upstreams for combining + +These should be in the form + + dir=remote:path dir2=remote2:path + +Where before the = is specified the root directory and after is the remote to +put there. + +Embedded spaces can be added using quotes + + "dir=remote:path with space" "dir2=remote2:path with space" + +`, + Required: true, + Default: fs.SpaceSepList(nil), + }}, + } + fs.Register(fsi) +} + +// Options defines the configuration for this backend +type Options struct { + Upstreams fs.SpaceSepList `config:"upstreams"` +} + +// Fs represents a combine of upstreams +type Fs struct { + name string // name of this remote + features *fs.Features // optional features + opt Options // options for this Fs + root string // the path we are working on + hashSet hash.Set // common hashes + when time.Time // directory times + upstreams map[string]*upstream // map of upstreams +} + +// adjustment stores the info to add a prefix to a path or chop characters off +type adjustment struct { + root string + rootSlash string + mountpoint string + mountpointSlash string +} + +// newAdjustment makes a new path adjustment adjusting between mountpoint and root +// +// mountpoint is the point the upstream is mounted and root is the combine root +func newAdjustment(root, mountpoint string) (a adjustment) { + return adjustment{ + root: root, + rootSlash: root + "/", + mountpoint: mountpoint, + mountpointSlash: mountpoint + "/", + } +} + +var errNotUnderRoot = errors.New("file not under root") + +// do makes the adjustment on s, mapping an upstream path into a combine path +func (a *adjustment) do(s string) (string, error) { + absPath := join(a.mountpoint, s) + if a.root == "" { + return absPath, nil + } + if absPath == a.root { + return "", nil + } + if !strings.HasPrefix(absPath, a.rootSlash) { + return "", errNotUnderRoot + } + return absPath[len(a.rootSlash):], nil +} + +// undo makes the adjustment on s, mapping a combine path into an upstream path +func (a *adjustment) undo(s string) (string, error) { + absPath := join(a.root, s) + if absPath == a.mountpoint { + return "", nil + } + if !strings.HasPrefix(absPath, a.mountpointSlash) { + return "", errNotUnderRoot + } + return absPath[len(a.mountpointSlash):], nil +} + +// upstream represents an upstream Fs +type upstream struct { + f fs.Fs + parent *Fs + dir string // directory the upstream is mounted + pathAdjustment adjustment // how to fiddle with the path +} + +// Create an upstream from the directory it is mounted on and the remote +func (f *Fs) newUpstream(ctx context.Context, dir, remote string) (*upstream, error) { + uFs, err := cache.Get(ctx, remote) + if err == fs.ErrorIsFile { + return nil, fmt.Errorf("can't combine files yet, only directories %q: %w", remote, err) + } + if err != nil { + return nil, fmt.Errorf("failed to create upstream %q: %w", remote, err) + } + u := &upstream{ + f: uFs, + parent: f, + dir: dir, + pathAdjustment: newAdjustment(f.root, dir), + } + return u, nil +} + +// NewFs constructs an Fs from the path. +// +// The returned Fs is the actual Fs, referenced by remote in the config +func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (outFs fs.Fs, err error) { + // defer log.Trace(nil, "name=%q, root=%q, m=%v", name, root, m)("f=%+v, err=%v", &outFs, &err) + // Parse config into Options struct + opt := new(Options) + err = configstruct.Set(m, opt) + if err != nil { + return nil, err + } + // Backward compatible to old config + if len(opt.Upstreams) == 0 { + return nil, errors.New("combine can't point to an empty upstream - check the value of the upstreams setting") + } + for _, u := range opt.Upstreams { + if strings.HasPrefix(u, name+":") { + return nil, errors.New("can't point combine remote at itself - check the value of the upstreams setting") + } + } + isDir := false + for strings.HasSuffix(root, "/") { + root = root[:len(root)-1] + isDir = true + } + + f := &Fs{ + name: name, + root: root, + opt: *opt, + upstreams: make(map[string]*upstream, len(opt.Upstreams)), + when: time.Now(), + } + + g, gCtx := errgroup.WithContext(ctx) + var mu sync.Mutex + for _, upstream := range opt.Upstreams { + upstream := upstream + g.Go(func() (err error) { + equal := strings.IndexRune(upstream, '=') + if equal < 0 { + return fmt.Errorf("no \"=\" in upstream definition %q", upstream) + } + dir, remote := upstream[:equal], upstream[equal+1:] + if dir == "" { + return fmt.Errorf("empty dir in upstream definition %q", upstream) + } + if remote == "" { + return fmt.Errorf("empty remote in upstream definition %q", upstream) + } + if strings.IndexRune(dir, '/') >= 0 { + return fmt.Errorf("dirs can't contain / (yet): %q", dir) + } + u, err := f.newUpstream(gCtx, dir, remote) + if err != nil { + return err + } + mu.Lock() + f.upstreams[dir] = u + mu.Unlock() + return nil + }) + } + err = g.Wait() + if err != nil { + return nil, err + } + // check features + var features = (&fs.Features{ + CaseInsensitive: true, + DuplicateFiles: false, + ReadMimeType: true, + WriteMimeType: true, + CanHaveEmptyDirectories: true, + BucketBased: true, + SetTier: true, + GetTier: true, + }).Fill(ctx, f) + canMove := true + for _, u := range f.upstreams { + features = features.Mask(ctx, u.f) // Mask all upstream fs + if !operations.CanServerSideMove(u.f) { + canMove = false + } + } + // We can move if all remotes support Move or Copy + if canMove { + features.Move = f.Move + } + + // Enable ListR when upstreams either support ListR or is local + // But not when all upstreams are local + if features.ListR == nil { + for _, u := range f.upstreams { + if u.f.Features().ListR != nil { + features.ListR = f.ListR + } else if !u.f.Features().IsLocal { + features.ListR = nil + break + } + } + } + + // Enable Purge when any upstreams support it + if features.Purge == nil { + for _, u := range f.upstreams { + if u.f.Features().Purge != nil { + features.Purge = f.Purge + break + } + } + } + + // Enable Shutdown when any upstreams support it + if features.Shutdown == nil { + for _, u := range f.upstreams { + if u.f.Features().Shutdown != nil { + features.Shutdown = f.Shutdown + break + } + } + } + + // Enable DirCacheFlush when any upstreams support it + if features.DirCacheFlush == nil { + for _, u := range f.upstreams { + if u.f.Features().DirCacheFlush != nil { + features.DirCacheFlush = f.DirCacheFlush + break + } + } + } + + // Enable ChangeNotify when any upstreams support it + if features.ChangeNotify == nil { + for _, u := range f.upstreams { + if u.f.Features().ChangeNotify != nil { + features.ChangeNotify = f.ChangeNotify + break + } + } + } + + f.features = features + + // Get common intersection of hashes + var hashSet hash.Set + var first = true + for _, u := range f.upstreams { + if first { + hashSet = u.f.Hashes() + first = false + } else { + hashSet = hashSet.Overlap(u.f.Hashes()) + } + } + f.hashSet = hashSet + + // Check to see if the root is actually a file + if f.root != "" && !isDir { + _, err := f.NewObject(ctx, "") + if err != nil { + if err == fs.ErrorObjectNotFound || err == fs.ErrorNotAFile || err == fs.ErrorIsDir { + // File doesn't exist or is a directory so return old f + return f, nil + } + return nil, err + } + + // Check to see if the root path is actually an existing file + f.root = path.Dir(f.root) + if f.root == "." { + f.root = "" + } + // Adjust path adjustment to remove leaf + for _, u := range f.upstreams { + u.pathAdjustment = newAdjustment(f.root, u.dir) + } + return f, fs.ErrorIsFile + } + return f, nil +} + +// Run a function over all the upstreams in parallel +func (f *Fs) multithread(ctx context.Context, fn func(context.Context, *upstream) error) error { + g, gCtx := errgroup.WithContext(ctx) + for _, u := range f.upstreams { + u := u + g.Go(func() (err error) { + return fn(gCtx, u) + }) + } + return g.Wait() +} + +// join the elements together but unline path.Join return empty string +func join(elem ...string) string { + result := path.Join(elem...) + if result == "." { + return "" + } + if len(result) > 0 && result[0] == '/' { + result = result[1:] + } + return result +} + +// find the upstream for the remote passed in, returning the upstream and the adjusted path +func (f *Fs) findUpstream(remote string) (u *upstream, uRemote string, err error) { + // defer log.Trace(remote, "")("f=%v, uRemote=%q, err=%v", &u, &uRemote, &err) + for _, u := range f.upstreams { + uRemote, err = u.pathAdjustment.undo(remote) + if err == nil { + return u, uRemote, nil + } + } + return nil, "", fmt.Errorf("combine for remote %q: %w", remote, fs.ErrorDirNotFound) +} + +// Name of the remote (as passed into NewFs) +func (f *Fs) Name() string { + return f.name +} + +// Root of the remote (as passed into NewFs) +func (f *Fs) Root() string { + return f.root +} + +// String converts this Fs to a string +func (f *Fs) String() string { + return fmt.Sprintf("combine root '%s'", f.root) +} + +// Features returns the optional features of this Fs +func (f *Fs) Features() *fs.Features { + return f.features +} + +// Rmdir removes the root directory of the Fs object +func (f *Fs) Rmdir(ctx context.Context, dir string) error { + // The root always exists + if f.root == "" && dir == "" { + return nil + } + u, uRemote, err := f.findUpstream(dir) + if err != nil { + return err + } + return u.f.Rmdir(ctx, uRemote) +} + +// Hashes returns hash.HashNone to indicate remote hashing is unavailable +func (f *Fs) Hashes() hash.Set { + return f.hashSet +} + +// Mkdir makes the root directory of the Fs object +func (f *Fs) Mkdir(ctx context.Context, dir string) error { + // The root always exists + if f.root == "" && dir == "" { + return nil + } + u, uRemote, err := f.findUpstream(dir) + if err != nil { + return err + } + return u.f.Mkdir(ctx, uRemote) +} + +// purge the upstream or fallback to a slow way +func (u *upstream) purge(ctx context.Context, dir string) (err error) { + if do := u.f.Features().Purge; do != nil { + err = do(ctx, dir) + } else { + err = operations.Purge(ctx, u.f, dir) + } + return err +} + +// Purge all files in the directory +// +// Implement this if you have a way of deleting all the files +// quicker than just running Remove() on the result of List() +// +// Return an error if it doesn't exist +func (f *Fs) Purge(ctx context.Context, dir string) error { + if f.root == "" && dir == "" { + return f.multithread(ctx, func(ctx context.Context, u *upstream) error { + return u.purge(ctx, "") + }) + } + u, uRemote, err := f.findUpstream(dir) + if err != nil { + return err + } + return u.purge(ctx, uRemote) +} + +// Copy src to this remote using server-side copy operations. +// +// This is stored with the remote path given +// +// It returns the destination Object and a possible error +// +// Will only be called if src.Fs().Name() == f.Name() +// +// If it isn't possible then return fs.ErrorCantCopy +func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { + srcObj, ok := src.(*Object) + if !ok { + fs.Debugf(src, "Can't copy - not same remote type") + return nil, fs.ErrorCantCopy + } + + dstU, dstRemote, err := f.findUpstream(remote) + if err != nil { + return nil, err + } + + do := dstU.f.Features().Copy + if do == nil { + return nil, fs.ErrorCantCopy + } + + o, err := do(ctx, srcObj.Object, dstRemote) + if err != nil { + return nil, err + } + + return dstU.newObject(o), nil +} + +// Move src to this remote using server-side move operations. +// +// This is stored with the remote path given +// +// It returns the destination Object and a possible error +// +// Will only be called if src.Fs().Name() == f.Name() +// +// If it isn't possible then return fs.ErrorCantMove +func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { + srcObj, ok := src.(*Object) + if !ok { + fs.Debugf(src, "Can't move - not same remote type") + return nil, fs.ErrorCantMove + } + + dstU, dstRemote, err := f.findUpstream(remote) + if err != nil { + return nil, err + } + + do := dstU.f.Features().Move + useCopy := false + if do == nil { + do = dstU.f.Features().Copy + if do == nil { + return nil, fs.ErrorCantMove + } + useCopy = true + } + + o, err := do(ctx, srcObj.Object, dstRemote) + if err != nil { + return nil, err + } + + // If did Copy then remove the source object + if useCopy { + err = srcObj.Remove(ctx) + if err != nil { + return nil, err + } + } + + return dstU.newObject(o), nil +} + +// DirMove moves src, srcRemote to this remote at dstRemote +// using server-side move operations. +// +// Will only be called if src.Fs().Name() == f.Name() +// +// If it isn't possible then return fs.ErrorCantDirMove +// +// If destination exists then return fs.ErrorDirExists +func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) (err error) { + // defer log.Trace(f, "src=%v, srcRemote=%q, dstRemote=%q", src, srcRemote, dstRemote)("err=%v", &err) + srcFs, ok := src.(*Fs) + if !ok { + fs.Debugf(src, "Can't move directory - not same remote type") + return fs.ErrorCantDirMove + } + + dstU, dstURemote, err := f.findUpstream(dstRemote) + if err != nil { + return err + } + + srcU, srcURemote, err := srcFs.findUpstream(srcRemote) + if err != nil { + return err + } + + do := dstU.f.Features().DirMove + if do == nil { + return fs.ErrorCantDirMove + } + + fs.Logf(dstU.f, "srcU.f=%v, srcURemote=%q, dstURemote=%q", srcU.f, srcURemote, dstURemote) + return do(ctx, srcU.f, srcURemote, dstURemote) +} + +// ChangeNotify calls the passed function with a path +// that has had changes. If the implementation +// uses polling, it should adhere to the given interval. +// At least one value will be written to the channel, +// specifying the initial value and updated values might +// follow. A 0 Duration should pause the polling. +// The ChangeNotify implementation must empty the channel +// regularly. When the channel gets closed, the implementation +// should stop polling and release resources. +func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), ch <-chan time.Duration) { + var uChans []chan time.Duration + + for _, u := range f.upstreams { + u := u + if do := u.f.Features().ChangeNotify; do != nil { + ch := make(chan time.Duration) + uChans = append(uChans, ch) + wrappedNotifyFunc := func(path string, entryType fs.EntryType) { + newPath, err := u.pathAdjustment.do(path) + if err != nil { + fs.Logf(f, "ChangeNotify: unable to process %q: %s", path, err) + return + } + fs.Debugf(f, "ChangeNotify: path %q entryType %d", newPath, entryType) + notifyFunc(newPath, entryType) + } + do(ctx, wrappedNotifyFunc, ch) + } + } + + go func() { + for i := range ch { + for _, c := range uChans { + c <- i + } + } + for _, c := range uChans { + close(c) + } + }() +} + +// DirCacheFlush resets the directory cache - used in testing +// as an optional interface +func (f *Fs) DirCacheFlush() { + ctx := context.Background() + _ = f.multithread(ctx, func(ctx context.Context, u *upstream) error { + if do := u.f.Features().DirCacheFlush; do != nil { + do() + } + return nil + }) +} + +func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bool, options ...fs.OpenOption) (fs.Object, error) { + srcPath := src.Remote() + u, uRemote, err := f.findUpstream(srcPath) + if err != nil { + return nil, err + } + uSrc := operations.NewOverrideRemote(src, uRemote) + var o fs.Object + if stream { + o, err = u.f.Features().PutStream(ctx, in, uSrc, options...) + } else { + o, err = u.f.Put(ctx, in, uSrc, options...) + } + if err != nil { + return nil, err + } + return u.newObject(o), nil +} + +// Put in to the remote path with the modTime given of the given size +// +// May create the object even if it returns an error - if so +// will return the object and the error, otherwise will return +// nil and the error +func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + o, err := f.NewObject(ctx, src.Remote()) + switch err { + case nil: + return o, o.Update(ctx, in, src, options...) + case fs.ErrorObjectNotFound: + return f.put(ctx, in, src, false, options...) + default: + return nil, err + } +} + +// PutStream uploads to the remote path with the modTime given of indeterminate size +// +// May create the object even if it returns an error - if so +// will return the object and the error, otherwise will return +// nil and the error +func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + o, err := f.NewObject(ctx, src.Remote()) + switch err { + case nil: + return o, o.Update(ctx, in, src, options...) + case fs.ErrorObjectNotFound: + return f.put(ctx, in, src, true, options...) + default: + return nil, err + } +} + +// About gets quota information from the Fs +func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { + usage := &fs.Usage{ + Total: new(int64), + Used: new(int64), + Trashed: new(int64), + Other: new(int64), + Free: new(int64), + Objects: new(int64), + } + for _, u := range f.upstreams { + doAbout := u.f.Features().About + if doAbout == nil { + continue + } + usg, err := doAbout(ctx) + if errors.Is(err, fs.ErrorDirNotFound) { + continue + } + if err != nil { + return nil, err + } + if usg.Total != nil && usage.Total != nil { + *usage.Total += *usg.Total + } else { + usage.Total = nil + } + if usg.Used != nil && usage.Used != nil { + *usage.Used += *usg.Used + } else { + usage.Used = nil + } + if usg.Trashed != nil && usage.Trashed != nil { + *usage.Trashed += *usg.Trashed + } else { + usage.Trashed = nil + } + if usg.Other != nil && usage.Other != nil { + *usage.Other += *usg.Other + } else { + usage.Other = nil + } + if usg.Free != nil && usage.Free != nil { + *usage.Free += *usg.Free + } else { + usage.Free = nil + } + if usg.Objects != nil && usage.Objects != nil { + *usage.Objects += *usg.Objects + } else { + usage.Objects = nil + } + } + return usage, nil +} + +// Wraps entries for this upstream +func (u *upstream) wrapEntries(ctx context.Context, entries fs.DirEntries) (fs.DirEntries, error) { + for i, entry := range entries { + switch x := entry.(type) { + case fs.Object: + entries[i] = u.newObject(x) + case fs.Directory: + newDir := fs.NewDirCopy(ctx, x) + newPath, err := u.pathAdjustment.do(newDir.Remote()) + if err != nil { + return nil, err + } + newDir.SetRemote(newPath) + entries[i] = newDir + default: + return nil, fmt.Errorf("unknown entry type %T", entry) + } + } + return entries, nil +} + +// List the objects and directories in dir into entries. The +// entries can be returned in any order but should be for a +// complete directory. +// +// dir should be "" to list the root, and should not have +// trailing slashes. +// +// This should return ErrDirNotFound if the directory isn't +// found. +func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { + // defer log.Trace(f, "dir=%q", dir)("entries = %v, err=%v", &entries, &err) + if f.root == "" && dir == "" { + entries = make(fs.DirEntries, 0, len(f.upstreams)) + for combineDir := range f.upstreams { + d := fs.NewDir(combineDir, f.when) + entries = append(entries, d) + } + return entries, nil + } + u, uRemote, err := f.findUpstream(dir) + if err != nil { + return nil, err + } + entries, err = u.f.List(ctx, uRemote) + if err != nil { + return nil, err + } + return u.wrapEntries(ctx, entries) +} + +// ListR lists the objects and directories of the Fs starting +// from dir recursively into out. +// +// dir should be "" to start from the root, and should not +// have trailing slashes. +// +// This should return ErrDirNotFound if the directory isn't +// found. +// +// It should call callback for each tranche of entries read. +// These need not be returned in any particular order. If +// callback returns an error then the listing will stop +// immediately. +// +// Don't implement this unless you have a more efficient way +// of listing recursively that doing a directory traversal. +func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { + // defer log.Trace(f, "dir=%q, callback=%v", dir, callback)("err=%v", &err) + if f.root == "" && dir == "" { + rootEntries, err := f.List(ctx, "") + if err != nil { + return err + } + err = callback(rootEntries) + if err != nil { + return err + } + var mu sync.Mutex + syncCallback := func(entries fs.DirEntries) error { + mu.Lock() + defer mu.Unlock() + return callback(entries) + } + err = f.multithread(ctx, func(ctx context.Context, u *upstream) error { + return f.ListR(ctx, u.dir, syncCallback) + }) + if err != nil { + return err + } + return nil + } + u, uRemote, err := f.findUpstream(dir) + if err != nil { + return err + } + wrapCallback := func(entries fs.DirEntries) error { + entries, err := u.wrapEntries(ctx, entries) + if err != nil { + return err + } + return callback(entries) + } + if do := u.f.Features().ListR; do != nil { + err = do(ctx, uRemote, wrapCallback) + } else { + err = walk.ListR(ctx, u.f, uRemote, true, -1, walk.ListAll, wrapCallback) + } + if err == fs.ErrorDirNotFound { + err = nil + } + return err +} + +// NewObject creates a new remote combine file object +func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { + u, uRemote, err := f.findUpstream(remote) + if err != nil { + return nil, err + } + if uRemote == "" || strings.HasSuffix(uRemote, "/") { + return nil, fs.ErrorIsDir + } + o, err := u.f.NewObject(ctx, uRemote) + if err != nil { + return nil, err + } + return u.newObject(o), nil +} + +// Precision is the greatest Precision of all upstreams +func (f *Fs) Precision() time.Duration { + var greatestPrecision time.Duration + for _, u := range f.upstreams { + uPrecision := u.f.Precision() + if uPrecision > greatestPrecision { + greatestPrecision = uPrecision + } + } + return greatestPrecision +} + +// Shutdown the backend, closing any background tasks and any +// cached connections. +func (f *Fs) Shutdown(ctx context.Context) error { + return f.multithread(ctx, func(ctx context.Context, u *upstream) error { + if do := u.f.Features().Shutdown; do != nil { + return do(ctx) + } + return nil + }) +} + +// Object describes a wrapped Object +// +// This is a wrapped Object which knows its path prefix +type Object struct { + fs.Object + u *upstream +} + +func (u *upstream) newObject(o fs.Object) *Object { + return &Object{ + Object: o, + u: u, + } +} + +// Fs returns read only access to the Fs that this object is part of +func (o *Object) Fs() fs.Info { + return o.u.parent +} + +// String returns the remote path +func (o *Object) String() string { + return o.Remote() +} + +// Remote returns the remote path +func (o *Object) Remote() string { + newPath, err := o.u.pathAdjustment.do(o.Object.String()) + if err != nil { + fs.Errorf(o, "Bad object: %v", err) + return err.Error() + } + return newPath +} + +// MimeType returns the content type of the Object if known +func (o *Object) MimeType(ctx context.Context) (mimeType string) { + if do, ok := o.Object.(fs.MimeTyper); ok { + mimeType = do.MimeType(ctx) + } + return mimeType +} + +// UnWrap returns the Object that this Object is wrapping or +// nil if it isn't wrapping anything +func (o *Object) UnWrap() fs.Object { + return o.Object +} + +// Check the interfaces are satisfied +var ( + _ fs.Fs = (*Fs)(nil) + _ fs.Purger = (*Fs)(nil) + _ fs.PutStreamer = (*Fs)(nil) + _ fs.Copier = (*Fs)(nil) + _ fs.Mover = (*Fs)(nil) + _ fs.DirMover = (*Fs)(nil) + _ fs.DirCacheFlusher = (*Fs)(nil) + _ fs.ChangeNotifier = (*Fs)(nil) + _ fs.Abouter = (*Fs)(nil) + _ fs.ListRer = (*Fs)(nil) + _ fs.Shutdowner = (*Fs)(nil) +) diff --git a/backend/combine/combine_internal_test.go b/backend/combine/combine_internal_test.go new file mode 100644 index 000000000..5c889bb8c --- /dev/null +++ b/backend/combine/combine_internal_test.go @@ -0,0 +1,94 @@ +package combine + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAdjustmentDo(t *testing.T) { + for _, test := range []struct { + root string + mountpoint string + in string + want string + wantErr error + }{ + { + root: "", + mountpoint: "mountpoint", + in: "path/to/file.txt", + want: "mountpoint/path/to/file.txt", + }, + { + root: "mountpoint", + mountpoint: "mountpoint", + in: "path/to/file.txt", + want: "path/to/file.txt", + }, + { + root: "mountpoint/path", + mountpoint: "mountpoint", + in: "path/to/file.txt", + want: "to/file.txt", + }, + { + root: "mountpoint/path", + mountpoint: "mountpoint", + in: "wrongpath/to/file.txt", + want: "", + wantErr: errNotUnderRoot, + }, + } { + what := fmt.Sprintf("%+v", test) + a := newAdjustment(test.root, test.mountpoint) + got, gotErr := a.do(test.in) + assert.Equal(t, test.wantErr, gotErr) + assert.Equal(t, test.want, got, what) + } + +} + +func TestAdjustmentUndo(t *testing.T) { + for _, test := range []struct { + root string + mountpoint string + in string + want string + wantErr error + }{ + { + root: "", + mountpoint: "mountpoint", + in: "mountpoint/path/to/file.txt", + want: "path/to/file.txt", + }, + { + root: "mountpoint", + mountpoint: "mountpoint", + in: "path/to/file.txt", + want: "path/to/file.txt", + }, + { + root: "mountpoint/path", + mountpoint: "mountpoint", + in: "to/file.txt", + want: "path/to/file.txt", + }, + { + root: "wrongmountpoint/path", + mountpoint: "mountpoint", + in: "to/file.txt", + want: "", + wantErr: errNotUnderRoot, + }, + } { + what := fmt.Sprintf("%+v", test) + a := newAdjustment(test.root, test.mountpoint) + got, gotErr := a.undo(test.in) + assert.Equal(t, test.wantErr, gotErr) + assert.Equal(t, test.want, got, what) + } + +} diff --git a/backend/combine/combine_test.go b/backend/combine/combine_test.go new file mode 100644 index 000000000..c6274c562 --- /dev/null +++ b/backend/combine/combine_test.go @@ -0,0 +1,79 @@ +// Test Combine filesystem interface +package combine_test + +import ( + "testing" + + _ "github.com/rclone/rclone/backend/local" + _ "github.com/rclone/rclone/backend/memory" + "github.com/rclone/rclone/fstest" + "github.com/rclone/rclone/fstest/fstests" +) + +// TestIntegration runs integration tests against the remote +func TestIntegration(t *testing.T) { + if *fstest.RemoteName == "" { + t.Skip("Skipping as -remote not set") + } + fstests.Run(t, &fstests.Opt{ + RemoteName: *fstest.RemoteName, + UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"}, + UnimplementableObjectMethods: []string{"MimeType"}, + }) +} + +func TestLocal(t *testing.T) { + if *fstest.RemoteName != "" { + t.Skip("Skipping as -remote set") + } + dirs := MakeTestDirs(t, 3) + upstreams := "dir1=" + dirs[0] + " dir2=" + dirs[1] + " dir3=" + dirs[2] + name := "TestCombineLocal" + fstests.Run(t, &fstests.Opt{ + RemoteName: name + ":dir1", + ExtraConfig: []fstests.ExtraConfigItem{ + {Name: name, Key: "type", Value: "combine"}, + {Name: name, Key: "upstreams", Value: upstreams}, + }, + }) +} + +func TestMemory(t *testing.T) { + if *fstest.RemoteName != "" { + t.Skip("Skipping as -remote set") + } + upstreams := "dir1=:memory:dir1 dir2=:memory:dir2 dir3=:memory:dir3" + name := "TestCombineMemory" + fstests.Run(t, &fstests.Opt{ + RemoteName: name + ":dir1", + ExtraConfig: []fstests.ExtraConfigItem{ + {Name: name, Key: "type", Value: "combine"}, + {Name: name, Key: "upstreams", Value: upstreams}, + }, + }) +} + +func TestMixed(t *testing.T) { + if *fstest.RemoteName != "" { + t.Skip("Skipping as -remote set") + } + dirs := MakeTestDirs(t, 2) + upstreams := "dir1=" + dirs[0] + " dir2=" + dirs[1] + " dir3=:memory:dir3" + name := "TestCombineMixed" + fstests.Run(t, &fstests.Opt{ + RemoteName: name + ":dir1", + ExtraConfig: []fstests.ExtraConfigItem{ + {Name: name, Key: "type", Value: "combine"}, + {Name: name, Key: "upstreams", Value: upstreams}, + }, + }) +} + +// MakeTestDirs makes directories in /tmp for testing +func MakeTestDirs(t *testing.T, n int) (dirs []string) { + for i := 1; i <= n; i++ { + dir := t.TempDir() + dirs = append(dirs, dir) + } + return dirs +} diff --git a/bin/make_manual.py b/bin/make_manual.py index 0afa5f2b5..335f4c814 100755 --- a/bin/make_manual.py +++ b/bin/make_manual.py @@ -38,6 +38,7 @@ docs = [ "sharefile.md", "crypt.md", "compress.md", + "combine.md", "dropbox.md", "filefabric.md", "ftp.md", diff --git a/docs/content/_index.md b/docs/content/_index.md index 653ce53dd..c7a213bfa 100644 --- a/docs/content/_index.md +++ b/docs/content/_index.md @@ -170,6 +170,20 @@ WebDAV or S3, that work out of the box.) {{< provider name="The local filesystem" home="/local/" config="/local/" end="true">}} {{< /provider_list >}} +## Virtual providers + +These backends adapt or modify other storage providers: + +{{< provider name="Alias: rename existing remotes" home="/alias/" config="/alias/" >}} +{{< provider name="Cache: cache remotes (DEPRECATED)" home="/cache/" config="/cache/" >}} +{{< provider name="Chunker: split large files" home="/chunker/" config="/chunker/" >}} +{{< provider name="Combine: combine multiple remotes into a directory tree" home="/combine/" config="/combine/" >}} +{{< provider name="Compress: compress files" home="/compress/" config="/compress/" >}} +{{< provider name="Crypt: encrypt files" home="/crypt/" config="/crypt/" >}} +{{< provider name="Hasher: hash files" home="/hasher/" config="/hasher/" >}} +{{< provider name="Union: join multiple remotes to work together" home="/union/" config="/union/" >}} + + ## Links * {{< icon "fa fa-home" >}} [Home page](https://rclone.org/) diff --git a/docs/content/combine.md b/docs/content/combine.md new file mode 100644 index 000000000..98e174a04 --- /dev/null +++ b/docs/content/combine.md @@ -0,0 +1,156 @@ +--- +title: "Combine" +description: "Combine several remotes into one" +--- + +# {{< icon "fa fa-folder-plus" >}} Combine + +The `combine` backend joins remotes together into a single directory +tree. + +For example you might have a remote for images on one provider: + +``` +$ rclone tree s3:imagesbucket +/ +├── image1.jpg +└── image2.jpg +``` + +And a remote for files on another: + +``` +$ rclone tree drive:important/files +/ +├── file1.txt +└── file2.txt +``` + +The `combine` backend can join these together into a synthetic +directory structure like this: + +``` +$ rclone tree combined: +/ +├── files +│ ├── file1.txt +│ └── file2.txt +└── images + ├── image1.jpg + └── image2.jpg +``` + +You'd do this by specifying an `upstreams` parameter in the config +like this + + upstreams = images=s3:imagesbucket files=drive:important/files + +During the initial setup with `rclone config` you will specify the +upstreams remotes as a space separated list. The upstream remotes can +either be a local paths or other remotes. + +## Configuration + +Here is an example of how to make a combine called `remote` for the +example above. First run: + + rclone config + +This will guide you through an interactive setup process: + +``` +No remotes found, make a new one? +n) New remote +s) Set configuration password +q) Quit config +n/s/q> n +name> remote +Option Storage. +Type of storage to configure. +Choose a number from below, or type in your own value. +... +XX / Combine several remotes into one + \ (combine) +... +Storage> combine +Option upstreams. +Upstreams for combining +These should be in the form + dir=remote:path dir2=remote2:path +Where before the = is specified the root directory and after is the remote to +put there. +Embedded spaces can be added using quotes + "dir=remote:path with space" "dir2=remote2:path with space" +Enter a fs.SpaceSepList value. +upstreams> images=s3:imagesbucket files=drive:important/files +-------------------- +[remote] +type = combine +upstreams = images=s3:imagesbucket files=drive:important/files +-------------------- +y) Yes this is OK (default) +e) Edit this remote +d) Delete this remote +y/e/d> y +``` + +### Configuring for Google Drive Shared Drives + +Rclone has a convenience feature for making a combine backend for all +the shared drives you have access to. + +Assuming your main (non shared drive) Google drive remote is called +`drive:` you would run + + rclone backend -o config drives drive: + +This would produce something like this: + + [My Drive] + type = alias + remote = drive,team_drive=0ABCDEF-01234567890,root_folder_id=: + + [Test Drive] + type = alias + remote = drive,team_drive=0ABCDEFabcdefghijkl,root_folder_id=: + + [AllDrives] + type = combine + remote = "My Drive=My Drive:" "Test Drive=Test Drive:" + +If you then add that config to your config file (find it with `rclone +config file`) then you can access all the shared drives in one place +with the `AllDrives:` remote. + +See [the Google Drive docs](/drive/#drives) for full info. + +{{< rem autogenerated options start" - DO NOT EDIT - instead edit fs.RegInfo in backend/combine/combine.go then run make backenddocs" >}} +### Standard options + +Here are the standard options specific to combine (Combine several remotes into one). + +#### --combine-upstreams + +Upstreams for combining + +These should be in the form + + dir=remote:path dir2=remote2:path + +Where before the = is specified the root directory and after is the remote to +put there. + +Embedded spaces can be added using quotes + + "dir=remote:path with space" "dir2=remote2:path with space" + + + +Properties: + +- Config: upstreams +- Env Var: RCLONE_COMBINE_UPSTREAMS +- Type: SpaceSepList +- Default: + +{{< rem autogenerated options stop >}} diff --git a/docs/content/docs.md b/docs/content/docs.md index ac714c0f7..9be5038b8 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -37,6 +37,7 @@ See the following for detailed instructions for * [Chunker](/chunker/) - transparently splits large files for other remotes * [Citrix ShareFile](/sharefile/) * [Compress](/compress/) + * [Combine](/combine/) * [Crypt](/crypt/) - to encrypt other remotes * [DigitalOcean Spaces](/s3/#digitalocean-spaces) * [Digi Storage](/koofr/#digi-storage) diff --git a/docs/layouts/chrome/navbar.html b/docs/layouts/chrome/navbar.html index 552b976ca..32c05d3e0 100644 --- a/docs/layouts/chrome/navbar.html +++ b/docs/layouts/chrome/navbar.html @@ -60,6 +60,7 @@ Box Chunker (splits large files) Compress (transparent gzip compression) + Combine (remotes into a directory tree) Citrix ShareFile Crypt (encrypts the others) Digi Storage diff --git a/fstest/test_all/config.yaml b/fstest/test_all/config.yaml index 1b6bb9e1d..f0abff007 100644 --- a/fstest/test_all/config.yaml +++ b/fstest/test_all/config.yaml @@ -91,6 +91,9 @@ backends: fastlist: true maxfile: 1k ## end chunker + - backend: "combine" + remote: "TestCombine:dir1" + fastlist: false ## begin compress - backend: "compress" remote: "TestCompress:"