mirror of
https://github.com/rclone/rclone.git
synced 2024-11-25 17:57:44 +08:00
84191ac6dc
When a file has its modtime set while it is open we delay setting the modtime until the file is closed. The file is then uploaded in Flush. In Release we check the cached file has been uploaded by comparing modtimes and or hashes and upload it again if it has changed. Before this change we forgot to change the time on the cached file when we updated the time file on the object, so this mean that Release reset the time to the wrong time and uploaded the file again on remotes which don't support hashes (eg crypt). The fix was to set the modtime of the cached file at the same time we set the modtime of the remote object. This means that the files check as identical in Release so it doesn't try to upload the file. This means that we avoid a double upload and the modtime is correct. See: https://forum.rclone.org/t/modification-time-with-vfs-cache/13906/8
598 lines
14 KiB
Go
598 lines
14 KiB
Go
// This deals with caching of files locally
|
|
|
|
package vfs
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/djherbis/times"
|
|
"github.com/pkg/errors"
|
|
"github.com/rclone/rclone/fs"
|
|
fscache "github.com/rclone/rclone/fs/cache"
|
|
"github.com/rclone/rclone/fs/config"
|
|
)
|
|
|
|
// CacheMode controls the functionality of the cache
|
|
type CacheMode byte
|
|
|
|
// CacheMode options
|
|
const (
|
|
CacheModeOff CacheMode = iota // cache nothing - return errors for writes which can't be satisfied
|
|
CacheModeMinimal // cache only the minimum, eg read/write opens
|
|
CacheModeWrites // cache all files opened with write intent
|
|
CacheModeFull // cache all files opened in any mode
|
|
)
|
|
|
|
var cacheModeToString = []string{
|
|
CacheModeOff: "off",
|
|
CacheModeMinimal: "minimal",
|
|
CacheModeWrites: "writes",
|
|
CacheModeFull: "full",
|
|
}
|
|
|
|
// String turns a CacheMode into a string
|
|
func (l CacheMode) String() string {
|
|
if l >= CacheMode(len(cacheModeToString)) {
|
|
return fmt.Sprintf("CacheMode(%d)", l)
|
|
}
|
|
return cacheModeToString[l]
|
|
}
|
|
|
|
// Set a CacheMode
|
|
func (l *CacheMode) Set(s string) error {
|
|
for n, name := range cacheModeToString {
|
|
if s != "" && name == s {
|
|
*l = CacheMode(n)
|
|
return nil
|
|
}
|
|
}
|
|
return errors.Errorf("Unknown cache mode level %q", s)
|
|
}
|
|
|
|
// Type of the value
|
|
func (l *CacheMode) Type() string {
|
|
return "CacheMode"
|
|
}
|
|
|
|
// cache opened files
|
|
type cache struct {
|
|
f fs.Fs // fs for the cache directory
|
|
opt *Options // vfs Options
|
|
root string // root of the cache directory
|
|
itemMu sync.Mutex // protects the following variables
|
|
item map[string]*cacheItem // files/directories in the cache
|
|
used int64 // total size of files in the cache
|
|
}
|
|
|
|
// cacheItem is stored in the item map
|
|
type cacheItem struct {
|
|
opens int // number of times file is open
|
|
atime time.Time // last time file was accessed
|
|
isFile bool // if this is a file or a directory
|
|
size int64 // size of the cached item
|
|
}
|
|
|
|
// newCacheItem returns an item for the cache
|
|
func newCacheItem(isFile bool) *cacheItem {
|
|
return &cacheItem{atime: time.Now(), isFile: isFile}
|
|
}
|
|
|
|
// newCache creates a new cache heirachy for f
|
|
//
|
|
// This starts background goroutines which can be cancelled with the
|
|
// context passed in.
|
|
func newCache(ctx context.Context, f fs.Fs, opt *Options) (*cache, error) {
|
|
fRoot := filepath.FromSlash(f.Root())
|
|
if runtime.GOOS == "windows" {
|
|
if strings.HasPrefix(fRoot, `\\?`) {
|
|
fRoot = fRoot[3:]
|
|
}
|
|
fRoot = strings.Replace(fRoot, ":", "", -1)
|
|
}
|
|
root := filepath.Join(config.CacheDir, "vfs", f.Name(), fRoot)
|
|
fs.Debugf(nil, "vfs cache root is %q", root)
|
|
|
|
f, err := fscache.Get(root)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to create cache remote")
|
|
}
|
|
|
|
c := &cache{
|
|
f: f,
|
|
opt: opt,
|
|
root: root,
|
|
item: make(map[string]*cacheItem),
|
|
}
|
|
|
|
go c.cleaner(ctx)
|
|
|
|
return c, nil
|
|
}
|
|
|
|
// findParent returns the parent directory of name, or "" for the root
|
|
func findParent(name string) string {
|
|
parent := filepath.Dir(name)
|
|
if parent == "." || parent == "/" {
|
|
parent = ""
|
|
}
|
|
return parent
|
|
}
|
|
|
|
// clean returns the cleaned version of name for use in the index map
|
|
func clean(name string) string {
|
|
name = strings.Trim(name, "/")
|
|
name = filepath.Clean(name)
|
|
if name == "." || name == "/" {
|
|
name = ""
|
|
}
|
|
return name
|
|
}
|
|
|
|
// toOSPath turns a remote relative name into an OS path in the cache
|
|
func (c *cache) toOSPath(name string) string {
|
|
return filepath.Join(c.root, filepath.FromSlash(name))
|
|
}
|
|
|
|
// mkdir makes the directory for name in the cache and returns an os
|
|
// path for the file
|
|
func (c *cache) mkdir(name string) (string, error) {
|
|
parent := findParent(name)
|
|
leaf := filepath.Base(name)
|
|
parentPath := c.toOSPath(parent)
|
|
err := os.MkdirAll(parentPath, 0700)
|
|
if err != nil {
|
|
return "", errors.Wrap(err, "make cache directory failed")
|
|
}
|
|
c.cacheDir(parent)
|
|
return filepath.Join(parentPath, leaf), nil
|
|
}
|
|
|
|
// _get gets name from the cache or creates a new one
|
|
//
|
|
// It returns the item and found as to whether this item was found in
|
|
// the cache (or just created).
|
|
//
|
|
// name should be a remote path not an osPath
|
|
//
|
|
// must be called with itemMu held
|
|
func (c *cache) _get(isFile bool, name string) (item *cacheItem, found bool) {
|
|
item = c.item[name]
|
|
found = item != nil
|
|
if !found {
|
|
item = newCacheItem(isFile)
|
|
c.item[name] = item
|
|
}
|
|
return item, found
|
|
}
|
|
|
|
// opens returns the number of opens that are on the file
|
|
//
|
|
// name should be a remote path not an osPath
|
|
func (c *cache) opens(name string) int {
|
|
name = clean(name)
|
|
c.itemMu.Lock()
|
|
defer c.itemMu.Unlock()
|
|
item := c.item[name]
|
|
if item == nil {
|
|
return 0
|
|
}
|
|
return item.opens
|
|
}
|
|
|
|
// get gets name from the cache or creates a new one
|
|
//
|
|
// name should be a remote path not an osPath
|
|
func (c *cache) get(name string) *cacheItem {
|
|
name = clean(name)
|
|
c.itemMu.Lock()
|
|
item, _ := c._get(true, name)
|
|
c.itemMu.Unlock()
|
|
return item
|
|
}
|
|
|
|
// updateStat sets the atime of the name to that passed in if it is
|
|
// newer than the existing or there isn't an existing time.
|
|
//
|
|
// it also sets the size
|
|
//
|
|
// name should be a remote path not an osPath
|
|
func (c *cache) updateStat(name string, when time.Time, size int64) {
|
|
name = clean(name)
|
|
c.itemMu.Lock()
|
|
item, found := c._get(true, name)
|
|
if !found || when.Sub(item.atime) > 0 {
|
|
fs.Debugf(name, "updateTime: setting atime to %v", when)
|
|
item.atime = when
|
|
}
|
|
item.size = size
|
|
c.itemMu.Unlock()
|
|
}
|
|
|
|
// _open marks name as open, must be called with the lock held
|
|
//
|
|
// name should be a remote path not an osPath
|
|
func (c *cache) _open(isFile bool, name string) {
|
|
for {
|
|
item, _ := c._get(isFile, name)
|
|
item.opens++
|
|
item.atime = time.Now()
|
|
if name == "" {
|
|
break
|
|
}
|
|
isFile = false
|
|
name = findParent(name)
|
|
}
|
|
}
|
|
|
|
// open marks name as open
|
|
//
|
|
// name should be a remote path not an osPath
|
|
func (c *cache) open(name string) {
|
|
name = clean(name)
|
|
c.itemMu.Lock()
|
|
c._open(true, name)
|
|
c.itemMu.Unlock()
|
|
}
|
|
|
|
// cacheDir marks a directory and its parents as being in the cache
|
|
//
|
|
// name should be a remote path not an osPath
|
|
func (c *cache) cacheDir(name string) {
|
|
name = clean(name)
|
|
c.itemMu.Lock()
|
|
defer c.itemMu.Unlock()
|
|
for {
|
|
item := c.item[name]
|
|
if item != nil {
|
|
break
|
|
}
|
|
c.item[name] = newCacheItem(false)
|
|
if name == "" {
|
|
break
|
|
}
|
|
name = findParent(name)
|
|
}
|
|
}
|
|
|
|
// exists checks to see if the file exists in the cache or not
|
|
func (c *cache) exists(name string) bool {
|
|
osPath := c.toOSPath(name)
|
|
fi, err := os.Stat(osPath)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
// checks for non-regular files (e.g. directories, symlinks, devices, etc.)
|
|
if !fi.Mode().IsRegular() {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// renames the file in cache
|
|
func (c *cache) rename(name string, newName string) (err error) {
|
|
osOldPath := c.toOSPath(name)
|
|
osNewPath := c.toOSPath(newName)
|
|
sfi, err := os.Stat(osOldPath)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "Failed to stat source: %s", osOldPath)
|
|
}
|
|
if !sfi.Mode().IsRegular() {
|
|
// cannot copy non-regular files (e.g., directories, symlinks, devices, etc.)
|
|
return errors.Errorf("Non-regular source file: %s (%q)", sfi.Name(), sfi.Mode().String())
|
|
}
|
|
dfi, err := os.Stat(osNewPath)
|
|
if err != nil {
|
|
if !os.IsNotExist(err) {
|
|
return errors.Wrapf(err, "Failed to stat destination: %s", osNewPath)
|
|
}
|
|
parent := findParent(osNewPath)
|
|
err = os.MkdirAll(parent, 0700)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "Failed to create parent dir: %s", parent)
|
|
}
|
|
} else {
|
|
if !(dfi.Mode().IsRegular()) {
|
|
return errors.Errorf("Non-regular destination file: %s (%q)", dfi.Name(), dfi.Mode().String())
|
|
}
|
|
if os.SameFile(sfi, dfi) {
|
|
return nil
|
|
}
|
|
}
|
|
if err = os.Rename(osOldPath, osNewPath); err != nil {
|
|
return errors.Wrapf(err, "Failed to rename in cache: %s to %s", osOldPath, osNewPath)
|
|
}
|
|
// Rename the cache item
|
|
c.itemMu.Lock()
|
|
if oldItem, ok := c.item[name]; ok {
|
|
c.item[newName] = oldItem
|
|
delete(c.item, name)
|
|
}
|
|
c.itemMu.Unlock()
|
|
fs.Infof(name, "Renamed in cache")
|
|
return nil
|
|
}
|
|
|
|
// _close marks name as closed - must be called with the lock held
|
|
func (c *cache) _close(isFile bool, name string) {
|
|
for {
|
|
item, _ := c._get(isFile, name)
|
|
item.opens--
|
|
item.atime = time.Now()
|
|
if item.opens < 0 {
|
|
fs.Errorf(name, "cache: double close")
|
|
}
|
|
osPath := c.toOSPath(name)
|
|
fi, err := os.Stat(osPath)
|
|
// Update the size on close
|
|
if err == nil && !fi.IsDir() {
|
|
item.size = fi.Size()
|
|
}
|
|
if name == "" {
|
|
break
|
|
}
|
|
isFile = false
|
|
name = findParent(name)
|
|
}
|
|
}
|
|
|
|
// close marks name as closed
|
|
//
|
|
// name should be a remote path not an osPath
|
|
func (c *cache) close(name string) {
|
|
name = clean(name)
|
|
c.itemMu.Lock()
|
|
c._close(true, name)
|
|
c.itemMu.Unlock()
|
|
}
|
|
|
|
// remove should be called if name is deleted
|
|
func (c *cache) remove(name string) {
|
|
osPath := c.toOSPath(name)
|
|
err := os.Remove(osPath)
|
|
if err != nil && !os.IsNotExist(err) {
|
|
fs.Errorf(name, "Failed to remove from cache: %v", err)
|
|
} else {
|
|
fs.Infof(name, "Removed from cache")
|
|
}
|
|
}
|
|
|
|
// removeDir should be called if dir is deleted and returns true if
|
|
// the directory is gone.
|
|
func (c *cache) removeDir(dir string) bool {
|
|
osPath := c.toOSPath(dir)
|
|
err := os.Remove(osPath)
|
|
if err == nil || os.IsNotExist(err) {
|
|
if err == nil {
|
|
fs.Debugf(dir, "Removed empty directory")
|
|
}
|
|
return true
|
|
}
|
|
if !os.IsExist(err) {
|
|
fs.Errorf(dir, "Failed to remove cached dir: %v", err)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// setModTime should be called to set the modification time of the cache file
|
|
func (c *cache) setModTime(name string, modTime time.Time) {
|
|
osPath := c.toOSPath(name)
|
|
err := os.Chtimes(osPath, modTime, modTime)
|
|
if err != nil {
|
|
fs.Errorf(name, "Failed to set modification time of cached file: %v", err)
|
|
}
|
|
}
|
|
|
|
// cleanUp empties the cache of everything
|
|
func (c *cache) cleanUp() error {
|
|
return os.RemoveAll(c.root)
|
|
}
|
|
|
|
// walk walks the cache calling the function
|
|
func (c *cache) walk(fn func(osPath string, fi os.FileInfo, name string) error) error {
|
|
return filepath.Walk(c.root, func(osPath string, fi os.FileInfo, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Find path relative to the cache root
|
|
name, err := filepath.Rel(c.root, osPath)
|
|
if err != nil {
|
|
return errors.Wrap(err, "filepath.Rel failed in walk")
|
|
}
|
|
if name == "." {
|
|
name = ""
|
|
}
|
|
// And convert into slashes
|
|
name = filepath.ToSlash(name)
|
|
|
|
return fn(osPath, fi, name)
|
|
})
|
|
}
|
|
|
|
// updateStats walks the cache updating any atimes and sizes it finds
|
|
//
|
|
// it also updates used
|
|
func (c *cache) updateStats() error {
|
|
var newUsed int64
|
|
err := c.walk(func(osPath string, fi os.FileInfo, name string) error {
|
|
if !fi.IsDir() {
|
|
// Update the atime with that of the file
|
|
atime := times.Get(fi).AccessTime()
|
|
c.updateStat(name, atime, fi.Size())
|
|
newUsed += fi.Size()
|
|
} else {
|
|
c.cacheDir(name)
|
|
}
|
|
return nil
|
|
})
|
|
c.itemMu.Lock()
|
|
c.used = newUsed
|
|
c.itemMu.Unlock()
|
|
return err
|
|
}
|
|
|
|
// purgeOld gets rid of any files that are over age
|
|
func (c *cache) purgeOld(maxAge time.Duration) {
|
|
c._purgeOld(maxAge, c.remove)
|
|
}
|
|
|
|
func (c *cache) _purgeOld(maxAge time.Duration, remove func(name string)) {
|
|
c.itemMu.Lock()
|
|
defer c.itemMu.Unlock()
|
|
cutoff := time.Now().Add(-maxAge)
|
|
for name, item := range c.item {
|
|
if item.isFile && item.opens == 0 {
|
|
// If not locked and access time too long ago - delete the file
|
|
dt := item.atime.Sub(cutoff)
|
|
// fs.Debugf(name, "atime=%v cutoff=%v, dt=%v", item.atime, cutoff, dt)
|
|
if dt < 0 {
|
|
remove(name)
|
|
// Remove the entry
|
|
delete(c.item, name)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Purge any empty directories
|
|
func (c *cache) purgeEmptyDirs() {
|
|
c._purgeEmptyDirs(c.removeDir)
|
|
}
|
|
|
|
func (c *cache) _purgeEmptyDirs(removeDir func(name string) bool) {
|
|
c.itemMu.Lock()
|
|
defer c.itemMu.Unlock()
|
|
var dirs []string
|
|
for name, item := range c.item {
|
|
if !item.isFile && item.opens == 0 {
|
|
dirs = append(dirs, name)
|
|
}
|
|
}
|
|
// remove empty directories in reverse alphabetical order
|
|
sort.Strings(dirs)
|
|
for i := len(dirs) - 1; i >= 0; i-- {
|
|
dir := dirs[i]
|
|
// Remove the entry
|
|
if removeDir(dir) {
|
|
delete(c.item, dir)
|
|
}
|
|
}
|
|
}
|
|
|
|
// This is a cacheItem with a name for sorting
|
|
type cacheNamedItem struct {
|
|
name string
|
|
item *cacheItem
|
|
}
|
|
type cacheNamedItems []cacheNamedItem
|
|
|
|
func (v cacheNamedItems) Len() int { return len(v) }
|
|
func (v cacheNamedItems) Swap(i, j int) { v[i], v[j] = v[j], v[i] }
|
|
func (v cacheNamedItems) Less(i, j int) bool { return v[i].item.atime.Before(v[j].item.atime) }
|
|
|
|
// Remove any files that are over quota starting from the
|
|
// oldest first
|
|
func (c *cache) purgeOverQuota(quota int64) {
|
|
c._purgeOverQuota(quota, c.remove)
|
|
}
|
|
|
|
func (c *cache) _purgeOverQuota(quota int64, remove func(name string)) {
|
|
c.itemMu.Lock()
|
|
defer c.itemMu.Unlock()
|
|
|
|
if quota <= 0 || c.used < quota {
|
|
return
|
|
}
|
|
|
|
var items cacheNamedItems
|
|
|
|
// Make a slice of unused files
|
|
for name, item := range c.item {
|
|
if item.isFile && item.opens == 0 {
|
|
items = append(items, cacheNamedItem{
|
|
name: name,
|
|
item: item,
|
|
})
|
|
}
|
|
}
|
|
sort.Sort(items)
|
|
|
|
// Remove items until the quota is OK
|
|
for _, item := range items {
|
|
if c.used < quota {
|
|
break
|
|
}
|
|
remove(item.name)
|
|
// Remove the entry
|
|
delete(c.item, item.name)
|
|
c.used -= item.item.size
|
|
}
|
|
}
|
|
|
|
// clean empties the cache of stuff if it can
|
|
func (c *cache) clean() {
|
|
// Cache may be empty so end
|
|
_, err := os.Stat(c.root)
|
|
if os.IsNotExist(err) {
|
|
return
|
|
}
|
|
|
|
c.itemMu.Lock()
|
|
oldItems, oldUsed := len(c.item), fs.SizeSuffix(c.used)
|
|
c.itemMu.Unlock()
|
|
|
|
// first walk the FS to update the atimes and sizes
|
|
err = c.updateStats()
|
|
if err != nil {
|
|
fs.Errorf(nil, "Error traversing cache %q: %v", c.root, err)
|
|
}
|
|
|
|
// Remove any files that are over age
|
|
c.purgeOld(c.opt.CacheMaxAge)
|
|
|
|
// Now remove any files that are over quota starting from the
|
|
// oldest first
|
|
c.purgeOverQuota(int64(c.opt.CacheMaxSize))
|
|
|
|
// Remove any empty directories
|
|
c.purgeEmptyDirs()
|
|
|
|
// Stats
|
|
c.itemMu.Lock()
|
|
newItems, newUsed := len(c.item), fs.SizeSuffix(c.used)
|
|
c.itemMu.Unlock()
|
|
|
|
fs.Infof(nil, "Cleaned the cache: objects %d (was %d), total size %v (was %v)", newItems, oldItems, newUsed, oldUsed)
|
|
}
|
|
|
|
// cleaner calls clean at regular intervals
|
|
//
|
|
// doesn't return until context is cancelled
|
|
func (c *cache) cleaner(ctx context.Context) {
|
|
if c.opt.CachePollInterval <= 0 {
|
|
fs.Debugf(nil, "Cache cleaning thread disabled because poll interval <= 0")
|
|
return
|
|
}
|
|
// Start cleaning the cache immediately
|
|
c.clean()
|
|
// Then every interval specified
|
|
timer := time.NewTicker(c.opt.CachePollInterval)
|
|
defer timer.Stop()
|
|
for {
|
|
select {
|
|
case <-timer.C:
|
|
c.clean()
|
|
case <-ctx.Done():
|
|
fs.Debugf(nil, "cache cleaner exiting")
|
|
return
|
|
}
|
|
}
|
|
}
|