// Package vfscache deals with caching of files locally for the VFS layer package vfscache import ( "context" "fmt" "os" "path" "path/filepath" "runtime" "sort" "strings" "sync" "time" sysdnotify "github.com/iguanesolutions/go-systemd/v5/notify" "github.com/pkg/errors" "github.com/rclone/rclone/fs" fscache "github.com/rclone/rclone/fs/cache" "github.com/rclone/rclone/fs/config" "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/operations" "github.com/rclone/rclone/lib/file" "github.com/rclone/rclone/vfs/vfscache/writeback" "github.com/rclone/rclone/vfs/vfscommon" ) // NB as Cache and Item are tightly linked it is necessary to have a // total lock ordering between them. So Cache.mu must always be // taken before Item.mu to avoid deadlocks. // // Cache may call into Item but care is needed if Item calls Cache // FIXME need to purge cache nodes which don't have backing files and aren't dirty // these may get created by the VFS layer or may be orphans from reload() // Cache opened files type Cache struct { // read only - no locking needed to read these fremote fs.Fs // fs for the remote we are caching fcache fs.Fs // fs for the cache directory fcacheMeta fs.Fs // fs for the cache metadata directory opt *vfscommon.Options // vfs Options root string // root of the cache directory metaRoot string // root of the cache metadata directory hashType hash.Type // hash to use locally and remotely hashOption *fs.HashesOption // corresponding OpenOption writeback *writeback.WriteBack // holds Items for writeback avFn AddVirtualFn // if set, can be called to add dir entries mu sync.Mutex // protects the following variables cond *sync.Cond // cond lock for synchronous cache cleaning item map[string]*Item // files/directories in the cache errItems map[string]error // items in error state used int64 // total size of files in the cache outOfSpace bool // out of space cleanerKicked bool // some thread kicked the cleaner upon out of space kickerMu sync.Mutex // mutex for clearnerKicked kick chan struct{} // channel for kicking clear to start } // AddVirtualFn if registered by the WithAddVirtual method, can be // called to register the object or directory at remote as a virtual // entry in directory listings. // // This is used when reloading the Cache and uploading items need to // go into the directory tree. type AddVirtualFn func(remote string, size int64, isDir bool) error // New creates a new cache heirachy for fremote // // This starts background goroutines which can be cancelled with the // context passed in. func New(ctx context.Context, fremote fs.Fs, opt *vfscommon.Options, avFn AddVirtualFn) (*Cache, error) { fRoot := filepath.FromSlash(fremote.Root()) if runtime.GOOS == "windows" { if strings.HasPrefix(fRoot, `\\?`) { fRoot = fRoot[3:] } fRoot = strings.Replace(fRoot, ":", "", -1) } root := file.UNCPath(filepath.Join(config.CacheDir, "vfs", fremote.Name(), fRoot)) fs.Debugf(nil, "vfs cache: root is %q", root) metaRoot := file.UNCPath(filepath.Join(config.CacheDir, "vfsMeta", fremote.Name(), fRoot)) fs.Debugf(nil, "vfs cache: metadata root is %q", root) fcache, err := fscache.Get(root) if err != nil { return nil, errors.Wrap(err, "failed to create cache remote") } fcacheMeta, err := fscache.Get(root) if err != nil { return nil, errors.Wrap(err, "failed to create cache meta remote") } hashType, hashOption := operations.CommonHash(fcache, fremote) c := &Cache{ fremote: fremote, fcache: fcache, fcacheMeta: fcacheMeta, opt: opt, root: root, metaRoot: metaRoot, item: make(map[string]*Item), errItems: make(map[string]error), hashType: hashType, hashOption: hashOption, writeback: writeback.New(ctx, opt), avFn: avFn, } // Make sure cache directories exist _, err = c.mkdir("") if err != nil { return nil, errors.Wrap(err, "failed to make cache directory") } // load in the cache and metadata off disk err = c.reload(ctx) if err != nil { return nil, errors.Wrap(err, "failed to load cache") } // Remove any empty directories c.purgeEmptyDirs() // Create a channel for cleaner to be kicked upon out of space con c.kick = make(chan struct{}, 1) c.cond = sync.NewCond(&c.mu) go c.cleaner(ctx) return c, nil } // clean returns the cleaned version of name for use in the index map // // name should be a remote path not an osPath func clean(name string) string { name = strings.Trim(name, "/") name = path.Clean(name) if name == "." || name == "/" { name = "" } return name } // toOSPath turns a remote relative name into an OS path in the cache func (c *Cache) toOSPath(name string) string { return filepath.Join(c.root, filepath.FromSlash(name)) } // toOSPathMeta turns a remote relative name into an OS path in the // cache for the metadata func (c *Cache) toOSPathMeta(name string) string { return filepath.Join(c.metaRoot, filepath.FromSlash(name)) } // mkdir makes the directory for name in the cache and returns an os // path for the file func (c *Cache) mkdir(name string) (string, error) { parent := vfscommon.FindParent(name) leaf := filepath.Base(name) parentPath := c.toOSPath(parent) err := os.MkdirAll(parentPath, 0700) if err != nil { return "", errors.Wrap(err, "make cache directory failed") } parentPathMeta := c.toOSPathMeta(parent) err = os.MkdirAll(parentPathMeta, 0700) if err != nil { return "", errors.Wrap(err, "make cache meta directory failed") } return filepath.Join(parentPath, leaf), nil } // _get gets name from the cache or creates a new one // // It returns the item and found as to whether this item was found in // the cache (or just created). // // name should be a remote path not an osPath // // must be called with mu held func (c *Cache) _get(name string) (item *Item, found bool) { item = c.item[name] found = item != nil if !found { item = newItem(c, name) c.item[name] = item } return item, found } // put puts item under name in the cache // // It returns an old item if there was one or nil if not. // // name should be a remote path not an osPath func (c *Cache) put(name string, item *Item) (oldItem *Item) { name = clean(name) c.mu.Lock() oldItem = c.item[name] if oldItem != item { c.item[name] = item } else { oldItem = nil } c.mu.Unlock() return oldItem } // InUse returns whether the name is in use in the cache // // name should be a remote path not an osPath func (c *Cache) InUse(name string) bool { name = clean(name) c.mu.Lock() item := c.item[name] c.mu.Unlock() if item == nil { return false } return item.inUse() } // DirtyItem the Item if it exists in the cache and is Dirty // // name should be a remote path not an osPath func (c *Cache) DirtyItem(name string) (item *Item) { name = clean(name) c.mu.Lock() defer c.mu.Unlock() item = c.item[name] if item != nil && !item.IsDirty() { item = nil } return item } // get gets a file name from the cache or creates a new one // // It returns the item and found as to whether this item was found in // the cache (or just created). // // name should be a remote path not an osPath func (c *Cache) get(name string) (item *Item, found bool) { name = clean(name) c.mu.Lock() item, found = c._get(name) c.mu.Unlock() return item, found } // Item gets a cache item for name // // To use it item.Open will need to be called // // name should be a remote path not an osPath func (c *Cache) Item(name string) (item *Item) { item, _ = c.get(name) return item } // Exists checks to see if the file exists in the cache or not. // // This is done by bringing the item into the cache which will // validate the backing file and metadata and then asking if the Item // exists or not. func (c *Cache) Exists(name string) bool { item, _ := c.get(name) return item.Exists() } // rename with os.Rename and more checking func rename(osOldPath, osNewPath string) error { sfi, err := os.Stat(osOldPath) if err != nil { // Just do nothing if the source does not exist if os.IsNotExist(err) { return nil } return errors.Wrapf(err, "Failed to stat source: %s", osOldPath) } if !sfi.Mode().IsRegular() { // cannot copy non-regular files (e.g., directories, symlinks, devices, etc.) return errors.Errorf("Non-regular source file: %s (%q)", sfi.Name(), sfi.Mode().String()) } dfi, err := os.Stat(osNewPath) if err != nil { if !os.IsNotExist(err) { return errors.Wrapf(err, "Failed to stat destination: %s", osNewPath) } parent := vfscommon.OsFindParent(osNewPath) err = os.MkdirAll(parent, 0700) if err != nil { return errors.Wrapf(err, "Failed to create parent dir: %s", parent) } } else { if !(dfi.Mode().IsRegular()) { return errors.Errorf("Non-regular destination file: %s (%q)", dfi.Name(), dfi.Mode().String()) } if os.SameFile(sfi, dfi) { return nil } } if err = os.Rename(osOldPath, osNewPath); err != nil { return errors.Wrapf(err, "Failed to rename in cache: %s to %s", osOldPath, osNewPath) } return nil } // Rename the item in cache func (c *Cache) Rename(name string, newName string, newObj fs.Object) (err error) { item, _ := c.get(name) err = item.rename(name, newName, newObj) if err != nil { return err } // Move the item in the cache c.mu.Lock() if item, ok := c.item[name]; ok { c.item[newName] = item delete(c.item, name) } c.mu.Unlock() fs.Infof(name, "vfs cache: renamed in cache to %q", newName) return nil } // Remove should be called if name is deleted // // This returns true if the file was in the transfer queue so may not // have completedly uploaded yet. func (c *Cache) Remove(name string) (wasWriting bool) { name = clean(name) c.mu.Lock() item := c.item[name] if item != nil { delete(c.item, name) } c.mu.Unlock() if item == nil { return false } return item.remove("file deleted") } // SetModTime should be called to set the modification time of the cache file func (c *Cache) SetModTime(name string, modTime time.Time) { item, _ := c.get(name) item.setModTime(modTime) } // CleanUp empties the cache of everything func (c *Cache) CleanUp() error { err1 := os.RemoveAll(c.root) err2 := os.RemoveAll(c.metaRoot) if err1 != nil { return err1 } return err2 } // walk walks the cache calling the function func (c *Cache) walk(dir string, fn func(osPath string, fi os.FileInfo, name string) error) error { return filepath.Walk(dir, func(osPath string, fi os.FileInfo, err error) error { if err != nil { return err } // Find path relative to the cache root name, err := filepath.Rel(dir, osPath) if err != nil { return errors.Wrap(err, "filepath.Rel failed in walk") } if name == "." { name = "" } // And convert into slashes name = filepath.ToSlash(name) return fn(osPath, fi, name) }) } // reload walks the cache loading metadata files // // It iterates the files first then metadata trees. It doesn't expect // to find any new items iterating the metadata but it will clear up // orphan files. func (c *Cache) reload(ctx context.Context) error { for _, dir := range []string{c.root, c.metaRoot} { err := c.walk(dir, func(osPath string, fi os.FileInfo, name string) error { if fi.IsDir() { return nil } item, found := c.get(name) if !found { err := item.reload(ctx) if err != nil { fs.Errorf(name, "vfs cache: failed to reload item: %v", err) } } return nil }) if err != nil { return errors.Wrapf(err, "failed to walk cache %q", dir) } } return nil } // KickCleaner kicks cache cleaner upon out of space situation func (c *Cache) KickCleaner() { /* Use a separate kicker mutex for the kick to go through without waiting for the cache mutex to avoid letting a thread kick again after the clearer just finished cleaning and unlock the cache mutex. */ fs.Debugf(nil, "vfs cache: at the beginning of KickCleaner") c.kickerMu.Lock() if !c.cleanerKicked { c.cleanerKicked = true fs.Debugf(nil, "vfs cache: in KickCleaner, ready to lock cache mutex") c.mu.Lock() c.outOfSpace = true fs.Logf(nil, "vfs cache: in KickCleaner, ready to kick cleaner") c.kick <- struct{}{} c.mu.Unlock() } c.kickerMu.Unlock() c.mu.Lock() for c.outOfSpace == true { fs.Debugf(nil, "vfs cache: in KickCleaner, looping on c.outOfSpace") c.cond.Wait() } fs.Debugf(nil, "vfs cache: in KickCleaner, leaving c.outOfSpace loop") c.mu.Unlock() } // removeNotInUse removes items not in use with a possible maxAge cutoff // called with cache mutex locked and up-to-date c.used (as we update it directly here) func (c *Cache) removeNotInUse(item *Item, maxAge time.Duration, emptyOnly bool) { removed, spaceFreed := item.RemoveNotInUse(maxAge, emptyOnly) // The item space might be freed even if we get an error after the cache file is removed // The item will not be removed or reset the cache data is dirty (DataDirty) c.used -= spaceFreed if removed { fs.Infof(nil, "vfs cache RemoveNotInUse (maxAge=%d, emptyOnly=%v): item %s was removed, freed %d bytes", maxAge, emptyOnly, item.GetName(), spaceFreed) // Remove the entry delete(c.item, item.name) } else { fs.Debugf(nil, "vfs cache RemoveNotInUse (maxAge=%d, emptyOnly=%v): item %s not removed, freed %d bytes", maxAge, emptyOnly, item.GetName(), spaceFreed) } return } // Retry failed resets during purgeClean() func (c *Cache) retryFailedResets() { // Some items may have failed to reset becasue there was not enough space // for saving the cache item's metadata. Redo the Reset()'s here now that // we may have some available space. if len(c.errItems) != 0 { fs.Debugf(nil, "vfs cache reset: before redoing reset errItems = %v", c.errItems) for itemName := range c.errItems { if retryItem, ok := c.item[itemName]; ok { _, _, err := retryItem.Reset() if err == nil || !fserrors.IsErrNoSpace(err) { // TODO: not trying to handle non-ENOSPC errors yet delete(c.errItems, itemName) } } else { // The retry item was deleted because it was closed. // No need to redo the failed reset now. delete(c.errItems, itemName) } } fs.Debugf(nil, "vfs cache reset: after redoing reset errItems = %v", c.errItems) } } func (c *Cache) purgeClean(quota int64) { c.mu.Lock() defer c.mu.Unlock() var items Items if quota <= 0 || c.used < quota { return } // Make a slice of clean cache files for _, item := range c.item { if !item.IsDataDirty() { items = append(items, item) } } sort.Sort(items) // Reset items until the quota is OK for _, item := range items { if c.used < quota { break } resetResult, spaceFreed, err := item.Reset() // The item space might be freed even if we get an error after the cache file is removed // The item will not be removed or reset if the cache data is dirty (DataDirty) c.used -= spaceFreed fs.Infof(nil, "vfs cache purgeClean item.Reset %s: %s, freed %d bytes", item.GetName(), resetResult.String(), spaceFreed) if resetResult == RemovedNotInUse { delete(c.item, item.name) } if err != nil { fs.Errorf(nil, "vfs cache purgeClean item.Reset %s reset failed, err = %v, freed %d bytes", item.GetName(), err, spaceFreed) c.errItems[item.name] = err } } // Resest outOfSpace without checking whether we have reduced cache space below the quota. // This allows some files to reduce their pendingAccesses count to allow them to be reset // in the next iteration of the purge cleaner loop. c.outOfSpace = false c.cond.Broadcast() } // purgeOld gets rid of any files that are over age func (c *Cache) purgeOld(maxAge time.Duration) { c.mu.Lock() defer c.mu.Unlock() // cutoff := time.Now().Add(-maxAge) for _, item := range c.item { c.removeNotInUse(item, maxAge, false) } if c.used < int64(c.opt.CacheMaxSize) { c.outOfSpace = false c.cond.Broadcast() } } // Purge any empty directories func (c *Cache) purgeEmptyDirs() { ctx := context.Background() err := operations.Rmdirs(ctx, c.fcache, "", true) if err != nil { fs.Errorf(c.fcache, "vfs cache: failed to remove empty directories from cache: %v", err) } err = operations.Rmdirs(ctx, c.fcacheMeta, "", true) if err != nil { fs.Errorf(c.fcache, "vfs cache: failed to remove empty directories from metadata cache: %v", err) } } // updateUsed updates c.used so it is accurate func (c *Cache) updateUsed() (used int64) { c.mu.Lock() defer c.mu.Unlock() newUsed := int64(0) for _, item := range c.item { newUsed += item.getDiskSize() } c.used = newUsed return newUsed } // Remove clean cache files that are not open until the total space // is reduced below quota starting from the oldest first func (c *Cache) purgeOverQuota(quota int64) { c.updateUsed() c.mu.Lock() defer c.mu.Unlock() if quota <= 0 || c.used < quota { return } var items Items // Make a slice of unused files for _, item := range c.item { if !item.inUse() { items = append(items, item) } } sort.Sort(items) // Remove items until the quota is OK for _, item := range items { c.removeNotInUse(item, 0, c.used <= quota) } if c.used < quota { c.outOfSpace = false c.cond.Broadcast() } } // clean empties the cache of stuff if it can func (c *Cache) clean(removeCleanFiles bool) { // Cache may be empty so end _, err := os.Stat(c.root) if os.IsNotExist(err) { return } c.mu.Lock() oldItems, oldUsed := len(c.item), fs.SizeSuffix(c.used) c.mu.Unlock() // loop cleaning the cache until we reach below cache quota for { // Remove any files that are over age c.purgeOld(c.opt.CacheMaxAge) // Now remove files not in use until cache size is below quota starting from the // oldest first c.purgeOverQuota(int64(c.opt.CacheMaxSize)) // removeCleanFiles indicates that we got ENOSPC error // We remove cache files that are not dirty if we are still avove the max cache size if removeCleanFiles { c.purgeClean(int64(c.opt.CacheMaxSize)) c.retryFailedResets() } else { break } used := c.updateUsed() if used <= int64(c.opt.CacheMaxSize) && len(c.errItems) == 0 { break } } // Was kicked? if removeCleanFiles { c.kickerMu.Lock() // Make sure this is called with cache mutex unlocked // Reenable io threads to kick me c.cleanerKicked = false c.kickerMu.Unlock() } // Stats c.mu.Lock() newItems, newUsed := len(c.item), fs.SizeSuffix(c.used) totalInUse := 0 for _, item := range c.item { if item.inUse() { totalInUse++ } } c.mu.Unlock() uploadsInProgress, uploadsQueued := c.writeback.Stats() stats := fmt.Sprintf("objects %d (was %d) in use %d, to upload %d, uploading %d, total size %v (was %v)", newItems, oldItems, totalInUse, uploadsQueued, uploadsInProgress, newUsed, oldUsed) fs.Infof(nil, "vfs cache: cleaned: %s", stats) if err = sysdnotify.Status(fmt.Sprintf("[%s] vfs cache: %s", time.Now().Format("15:04"), stats)); err != nil { fs.Errorf(nil, "vfs cache: updating systemd status with current stats failed: %s", err) } } // cleaner calls clean at regular intervals and upon being kicked for out-of-space condition // // doesn't return until context is cancelled func (c *Cache) cleaner(ctx context.Context) { if c.opt.CachePollInterval <= 0 { fs.Debugf(nil, "vfs cache: cleaning thread disabled because poll interval <= 0") return } // Start cleaning the cache immediately c.clean(false) // Then every interval specified timer := time.NewTicker(c.opt.CachePollInterval) defer timer.Stop() for { select { case <-c.kick: // a thread encountering ENOSPC kicked me c.clean(true) // remove inUse files that are clean (!item.info.Dirty) case <-timer.C: c.clean(false) // do not remove inUse files case <-ctx.Done(): fs.Debugf(nil, "vfs cache: cleaner exiting") return } } } // TotalInUse returns the number of items in the cache which are InUse func (c *Cache) TotalInUse() (n int) { c.mu.Lock() defer c.mu.Unlock() for _, item := range c.item { if item.inUse() { n++ } } return n } // Dump the cache into a string for debugging purposes func (c *Cache) Dump() string { if c == nil { return "Cache: \n" } c.mu.Lock() defer c.mu.Unlock() var out strings.Builder out.WriteString("Cache{\n") for name, item := range c.item { fmt.Fprintf(&out, "\t%q: %+v,\n", name, item) } out.WriteString("}\n") return out.String() } // AddVirtual adds a virtual directory entry by calling the addVirtual // callback if one has been registered. func (c *Cache) AddVirtual(remote string, size int64, isDir bool) error { if c.avFn == nil { return errors.New("no AddVirtual function registered") } return c.avFn(remote, size, isDir) }