vfs: keep track of directories in the cache also #1860

This makes managing empty directories more reliable.
This commit is contained in:
Nick Craig-Wood 2017-11-28 14:18:48 +00:00
parent 0978957a2e
commit c16ac697a9
3 changed files with 336 additions and 50 deletions

View File

@ -8,6 +8,7 @@ import (
"path" "path"
"path/filepath" "path/filepath"
"runtime" "runtime"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -66,18 +67,19 @@ type cache struct {
opt *Options // vfs Options opt *Options // vfs Options
root string // root of the cache directory root string // root of the cache directory
itemMu sync.Mutex // protects the next two maps itemMu sync.Mutex // protects the next two maps
item map[string]*cacheItem // files in the cache item map[string]*cacheItem // files/directories in the cache
} }
// cacheItem is stored in the item map // cacheItem is stored in the item map
type cacheItem struct { type cacheItem struct {
opens int // number of times file is open opens int // number of times file is open
atime time.Time // last time file was accessed atime time.Time // last time file was accessed
isFile bool // if this is a file or a directory
} }
// newCacheItem returns an item for the cache // newCacheItem returns an item for the cache
func newCacheItem() *cacheItem { func newCacheItem(isFile bool) *cacheItem {
return &cacheItem{atime: time.Now()} return &cacheItem{atime: time.Now(), isFile: isFile}
} }
// newCache creates a new cache heirachy for f // newCache creates a new cache heirachy for f
@ -112,47 +114,65 @@ func newCache(ctx context.Context, f fs.Fs, opt *Options) (*cache, error) {
return c, nil return c, nil
} }
// findParent returns the parent directory of name, or "" for the root
func findParent(name string) string {
parent := path.Dir(name)
if parent == "." || parent == "/" {
parent = ""
}
return parent
}
// toOSPath turns a remote relative name into an OS path in the cache
func (c *cache) toOSPath(name string) string {
return filepath.Join(c.root, filepath.FromSlash(name))
}
// mkdir makes the directory for name in the cache and returns an os // mkdir makes the directory for name in the cache and returns an os
// path for the file // path for the file
func (c *cache) mkdir(name string) (string, error) { func (c *cache) mkdir(name string) (string, error) {
parent := path.Dir(name) parent := findParent(name)
if parent == "." {
parent = ""
}
leaf := path.Base(name) leaf := path.Base(name)
parentPath := filepath.Join(c.root, filepath.FromSlash(parent)) parentPath := c.toOSPath(parent)
err := os.MkdirAll(parentPath, 0700) err := os.MkdirAll(parentPath, 0700)
if err != nil { if err != nil {
return "", errors.Wrap(err, "make cache directory failed") return "", errors.Wrap(err, "make cache directory failed")
} }
c.cacheDir(parent)
return filepath.Join(parentPath, leaf), nil return filepath.Join(parentPath, leaf), nil
} }
// _get gets name from the cache or creates a new one // _get gets name from the cache or creates a new one
// //
// name should be a remote path not an osPath
//
// must be called with itemMu held // must be called with itemMu held
func (c *cache) _get(name string) *cacheItem { func (c *cache) _get(isFile bool, name string) *cacheItem {
item := c.item[name] item := c.item[name]
if item == nil { if item == nil {
item = newCacheItem() item = newCacheItem(isFile)
c.item[name] = item c.item[name] = item
} }
return item return item
} }
// get gets name from the cache or creates a new one // get gets name from the cache or creates a new one
//
// name should be a remote path not an osPath
func (c *cache) get(name string) *cacheItem { func (c *cache) get(name string) *cacheItem {
c.itemMu.Lock() c.itemMu.Lock()
item := c._get(name) item := c._get(true, name)
c.itemMu.Unlock() c.itemMu.Unlock()
return item return item
} }
// updateTime sets the atime of the name to that passed in if it is // updateTime sets the atime of the name to that passed in if it is
// newer than the existing or there isn't an existing time. // newer than the existing or there isn't an existing time.
//
// name should be a remote path not an osPath
func (c *cache) updateTime(name string, when time.Time) { func (c *cache) updateTime(name string, when time.Time) {
c.itemMu.Lock() c.itemMu.Lock()
item := c._get(name) item := c._get(true, name)
if when.Sub(item.atime) > 0 { if when.Sub(item.atime) > 0 {
fs.Debugf(name, "updateTime: setting atime to %v", when) fs.Debugf(name, "updateTime: setting atime to %v", when)
item.atime = when item.atime = when
@ -160,30 +180,79 @@ func (c *cache) updateTime(name string, when time.Time) {
c.itemMu.Unlock() c.itemMu.Unlock()
} }
// open marks name as open // _open marks name as open, must be called with the lock held
func (c *cache) open(name string) { //
c.itemMu.Lock() // name should be a remote path not an osPath
item := c._get(name) func (c *cache) _open(isFile bool, name string) {
for {
item := c._get(isFile, name)
item.opens++ item.opens++
item.atime = time.Now() item.atime = time.Now()
if name == "" {
break
}
isFile = false
name = findParent(name)
}
}
// open marks name as open
//
// name should be a remote path not an osPath
func (c *cache) open(name string) {
c.itemMu.Lock()
c._open(true, name)
c.itemMu.Unlock() c.itemMu.Unlock()
} }
// close marks name as closed // cacheDir marks a directory and its parents as being in the cache
func (c *cache) close(name string) { //
// name should be a remote path not an osPath
func (c *cache) cacheDir(name string) {
c.itemMu.Lock() c.itemMu.Lock()
item := c._get(name) defer c.itemMu.Unlock()
for {
item := c.item[name]
if item != nil {
break
}
c.item[name] = newCacheItem(false)
if name == "" {
break
}
name = findParent(name)
}
}
// _close marks name as closed - must be called with the lock held
func (c *cache) _close(isFile bool, name string) {
for {
item := c._get(isFile, name)
item.opens-- item.opens--
item.atime = time.Now() item.atime = time.Now()
if item.opens < 0 { if item.opens < 0 {
fs.Errorf(name, "cache: double close") fs.Errorf(name, "cache: double close")
} }
if name == "" {
break
}
isFile = false
name = findParent(name)
}
}
// close marks name as closed
//
// name should be a remote path not an osPath
func (c *cache) close(name string) {
c.itemMu.Lock()
c._close(true, name)
c.itemMu.Unlock() c.itemMu.Unlock()
} }
// remove should be called if name is deleted // remove should be called if name is deleted
func (c *cache) remove(name string) { func (c *cache) remove(name string) {
osPath := filepath.Join(c.root, filepath.FromSlash(name)) osPath := c.toOSPath(name)
err := os.Remove(osPath) err := os.Remove(osPath)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
fs.Errorf(name, "Failed to remove from cache: %v", err) fs.Errorf(name, "Failed to remove from cache: %v", err)
@ -192,29 +261,55 @@ func (c *cache) remove(name string) {
} }
} }
// removeDir should be called if dir is deleted and returns true if
// the directory is gone.
func (c *cache) removeDir(dir string) bool {
osPath := c.toOSPath(dir)
err := os.Remove(osPath)
if err == nil || os.IsNotExist(err) {
return true
}
if !os.IsExist(err) {
fs.Errorf(dir, "Failed to remove cached dir: %v", err)
}
return false
}
// cleanUp empties the cache of everything // cleanUp empties the cache of everything
func (c *cache) cleanUp() error { func (c *cache) cleanUp() error {
return os.RemoveAll(c.root) return os.RemoveAll(c.root)
} }
// updateAtimes walks the cache updating any atimes it finds // walk walks the cache calling the function
func (c *cache) updateAtimes() error { func (c *cache) walk(fn func(osPath string, fi os.FileInfo, name string) error) error {
return filepath.Walk(c.root, func(osPath string, fi os.FileInfo, err error) error { return filepath.Walk(c.root, func(osPath string, fi os.FileInfo, err error) error {
if err != nil { if err != nil {
return err return err
} }
if !fi.IsDir() {
// Find path relative to the cache root // Find path relative to the cache root
name, err := filepath.Rel(c.root, osPath) name, err := filepath.Rel(c.root, osPath)
if err != nil { if err != nil {
return errors.Wrap(err, "filepath.Rel failed in updatAtimes") return errors.Wrap(err, "filepath.Rel failed in walk")
}
if name == "." {
name = ""
} }
// And convert into slashes // And convert into slashes
name = filepath.ToSlash(name) name = filepath.ToSlash(name)
return fn(osPath, fi, name)
})
}
// updateAtimes walks the cache updating any atimes it finds
func (c *cache) updateAtimes() error {
return c.walk(func(osPath string, fi os.FileInfo, name string) error {
if !fi.IsDir() {
// Update the atime with that of the file // Update the atime with that of the file
atime := times.Get(fi).AccessTime() atime := times.Get(fi).AccessTime()
c.updateTime(name, atime) c.updateTime(name, atime)
} else {
c.cacheDir(name)
} }
return nil return nil
}) })
@ -222,20 +317,42 @@ func (c *cache) updateAtimes() error {
// purgeOld gets rid of any files that are over age // purgeOld gets rid of any files that are over age
func (c *cache) purgeOld(maxAge time.Duration) { func (c *cache) purgeOld(maxAge time.Duration) {
c._purgeOld(maxAge, c.remove, c.removeDir)
}
func (c *cache) _purgeOld(maxAge time.Duration, remove func(name string), removeDir func(name string) bool) {
c.itemMu.Lock() c.itemMu.Lock()
defer c.itemMu.Unlock() defer c.itemMu.Unlock()
cutoff := time.Now().Add(-maxAge) cutoff := time.Now().Add(-maxAge)
for name, item := range c.item { for name, item := range c.item {
if item.isFile && item.opens == 0 {
// If not locked and access time too long ago - delete the file // If not locked and access time too long ago - delete the file
dt := item.atime.Sub(cutoff) dt := item.atime.Sub(cutoff)
// fs.Debugf(name, "atime=%v cutoff=%v, dt=%v", item.atime, cutoff, dt) // fs.Debugf(name, "atime=%v cutoff=%v, dt=%v", item.atime, cutoff, dt)
if item.opens == 0 && dt < 0 { if dt < 0 {
c.remove(name) remove(name)
// Remove the entry // Remove the entry
delete(c.item, name) delete(c.item, name)
} }
} }
} }
// now find any empty directories
var dirs []string
for name, item := range c.item {
if !item.isFile && item.opens == 0 {
dirs = append(dirs, name)
}
}
// remove empty directories in reverse alphabetical order
sort.Strings(dirs)
for i := len(dirs) - 1; i >= 0; i-- {
dir := dirs[i]
// Remove the entry
if removeDir(dir) {
delete(c.item, dir)
}
}
}
// clean empties the cache of stuff if it can // clean empties the cache of stuff if it can
func (c *cache) clean() { func (c *cache) clean() {
@ -253,14 +370,9 @@ func (c *cache) clean() {
fs.Errorf(nil, "Error traversing cache %q: %v", c.root, err) fs.Errorf(nil, "Error traversing cache %q: %v", c.root, err)
} }
// Now remove any files that are over age // Now remove any files that are over age and any empty
// directories
c.purgeOld(c.opt.CacheMaxAge) c.purgeOld(c.opt.CacheMaxAge)
// Now tidy up any empty directories
err = fs.Rmdirs(c.f, "")
if err != nil {
fs.Errorf(c.f, "Failed to remove empty directories from cache: %v", err)
}
} }
// cleaner calls clean at regular intervals // cleaner calls clean at regular intervals

View File

@ -1,10 +1,13 @@
package vfs package vfs
import ( import (
"fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
"sort"
"strings"
"testing" "testing"
"time" "time"
@ -44,6 +47,18 @@ func TestCacheModeType(t *testing.T) {
assert.Equal(t, "string", m.Type()) assert.Equal(t, "string", m.Type())
} }
// convert c.item to a string
func itemAsString(c *cache) string {
c.itemMu.Lock()
defer c.itemMu.Unlock()
var out []string
for name, item := range c.item {
out = append(out, fmt.Sprintf("name=%q isFile=%v opens=%d", name, item.isFile, item.opens))
}
sort.Strings(out)
return strings.Join(out, "\n")
}
func TestCacheNew(t *testing.T) { func TestCacheNew(t *testing.T) {
r := fstest.NewRun(t) r := fstest.NewRun(t)
defer r.Finalise() defer r.Finalise()
@ -56,11 +71,13 @@ func TestCacheNew(t *testing.T) {
assert.Contains(t, c.root, "vfs") assert.Contains(t, c.root, "vfs")
assert.Contains(t, c.f.Root(), filepath.Base(r.Fremote.Root())) assert.Contains(t, c.f.Root(), filepath.Base(r.Fremote.Root()))
assert.Equal(t, "", itemAsString(c))
// mkdir // mkdir
p, err := c.mkdir("potato") p, err := c.mkdir("potato")
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, "potato", filepath.Base(p)) assert.Equal(t, "potato", filepath.Base(p))
assert.Equal(t, `name="" isFile=false opens=0`, itemAsString(c))
fi, err := os.Stat(filepath.Dir(p)) fi, err := os.Stat(filepath.Dir(p))
require.NoError(t, err) require.NoError(t, err)
@ -87,7 +104,11 @@ func TestCacheNew(t *testing.T) {
assert.Equal(t, 0, item.opens) assert.Equal(t, 0, item.opens)
// open // open
assert.Equal(t, `name="" isFile=false opens=0
name="potato" isFile=true opens=0`, itemAsString(c))
c.open("potato") c.open("potato")
assert.Equal(t, `name="" isFile=false opens=1
name="potato" isFile=true opens=1`, itemAsString(c))
item = c.get("potato") item = c.get("potato")
assert.WithinDuration(t, time.Now(), item.atime, time.Second) assert.WithinDuration(t, time.Now(), item.atime, time.Second)
assert.Equal(t, 1, item.opens) assert.Equal(t, 1, item.opens)
@ -107,6 +128,8 @@ func TestCacheNew(t *testing.T) {
item.atime = time.Now().Add(-24 * time.Hour) item.atime = time.Now().Add(-24 * time.Hour)
err = c.updateAtimes() err = c.updateAtimes()
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, `name="" isFile=false opens=1
name="potato" isFile=true opens=1`, itemAsString(c))
item = c.get("potato") item = c.get("potato")
assert.Equal(t, atime, item.atime) assert.Equal(t, atime, item.atime)
@ -116,8 +139,14 @@ func TestCacheNew(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
// close // close
assert.Equal(t, `name="" isFile=false opens=1
name="potato" isFile=true opens=1`, itemAsString(c))
c.updateTime("potato", t2) c.updateTime("potato", t2)
assert.Equal(t, `name="" isFile=false opens=1
name="potato" isFile=true opens=1`, itemAsString(c))
c.close("potato") c.close("potato")
assert.Equal(t, `name="" isFile=false opens=0
name="potato" isFile=true opens=0`, itemAsString(c))
item = c.get("potato") item = c.get("potato")
assert.WithinDuration(t, time.Now(), item.atime, time.Second) assert.WithinDuration(t, time.Now(), item.atime, time.Second)
assert.Equal(t, 0, item.opens) assert.Equal(t, 0, item.opens)
@ -142,3 +171,148 @@ func TestCacheNew(t *testing.T) {
_, err = os.Stat(c.root) _, err = os.Stat(c.root)
assert.True(t, os.IsNotExist(err)) assert.True(t, os.IsNotExist(err))
} }
func TestCacheOpens(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c, err := newCache(ctx, r.Fremote, &DefaultOpt)
require.NoError(t, err)
assert.Equal(t, "", itemAsString(c))
c.open("potato")
assert.Equal(t, `name="" isFile=false opens=1
name="potato" isFile=true opens=1`, itemAsString(c))
c.open("potato")
assert.Equal(t, `name="" isFile=false opens=2
name="potato" isFile=true opens=2`, itemAsString(c))
c.close("potato")
assert.Equal(t, `name="" isFile=false opens=1
name="potato" isFile=true opens=1`, itemAsString(c))
c.close("potato")
assert.Equal(t, `name="" isFile=false opens=0
name="potato" isFile=true opens=0`, itemAsString(c))
c.open("potato")
c.open("a/b/c/d/one")
c.open("a/b/c/d/e/two")
c.open("a/b/c/d/e/f/three")
assert.Equal(t, `name="" isFile=false opens=4
name="a" isFile=false opens=3
name="a/b" isFile=false opens=3
name="a/b/c" isFile=false opens=3
name="a/b/c/d" isFile=false opens=3
name="a/b/c/d/e" isFile=false opens=2
name="a/b/c/d/e/f" isFile=false opens=1
name="a/b/c/d/e/f/three" isFile=true opens=1
name="a/b/c/d/e/two" isFile=true opens=1
name="a/b/c/d/one" isFile=true opens=1
name="potato" isFile=true opens=1`, itemAsString(c))
c.close("potato")
c.close("a/b/c/d/one")
c.close("a/b/c/d/e/two")
c.close("a/b/c/d/e/f/three")
assert.Equal(t, `name="" isFile=false opens=0
name="a" isFile=false opens=0
name="a/b" isFile=false opens=0
name="a/b/c" isFile=false opens=0
name="a/b/c/d" isFile=false opens=0
name="a/b/c/d/e" isFile=false opens=0
name="a/b/c/d/e/f" isFile=false opens=0
name="a/b/c/d/e/f/three" isFile=true opens=0
name="a/b/c/d/e/two" isFile=true opens=0
name="a/b/c/d/one" isFile=true opens=0
name="potato" isFile=true opens=0`, itemAsString(c))
}
func TestCacheCacheDir(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c, err := newCache(ctx, r.Fremote, &DefaultOpt)
require.NoError(t, err)
assert.Equal(t, "", itemAsString(c))
c.cacheDir("dir")
assert.Equal(t, `name="" isFile=false opens=0
name="dir" isFile=false opens=0`, itemAsString(c))
c.cacheDir("dir/sub")
assert.Equal(t, `name="" isFile=false opens=0
name="dir" isFile=false opens=0
name="dir/sub" isFile=false opens=0`, itemAsString(c))
c.cacheDir("dir/sub2/subsub2")
assert.Equal(t, `name="" isFile=false opens=0
name="dir" isFile=false opens=0
name="dir/sub" isFile=false opens=0
name="dir/sub2" isFile=false opens=0
name="dir/sub2/subsub2" isFile=false opens=0`, itemAsString(c))
}
func TestCachePurgeOld(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c, err := newCache(ctx, r.Fremote, &DefaultOpt)
require.NoError(t, err)
// Test funcs
var removed []string
removedDir := true
removeFile := func(name string) {
removed = append(removed, name)
}
removeDir := func(name string) bool {
if removedDir {
removed = append(removed, name+"/")
}
return removedDir
}
removed = nil
c._purgeOld(-10*time.Second, removeFile, removeDir)
assert.Equal(t, []string(nil), removed)
c.open("sub/dir/potato")
c.close("sub/dir/potato")
assert.Equal(t, `name="" isFile=false opens=0
name="sub" isFile=false opens=0
name="sub/dir" isFile=false opens=0
name="sub/dir/potato" isFile=true opens=0`, itemAsString(c))
removed = nil
removedDir = false
c._purgeOld(10*time.Second, removeFile, removeDir)
assert.Equal(t, []string(nil), removed)
assert.Equal(t, `name="" isFile=false opens=0
name="sub" isFile=false opens=0
name="sub/dir" isFile=false opens=0
name="sub/dir/potato" isFile=true opens=0`, itemAsString(c))
removed = nil
removedDir = true
c._purgeOld(-10*time.Second, removeFile, removeDir)
assert.Equal(t, []string{
"sub/dir/potato",
"sub/dir/",
"sub/",
"/",
}, removed)
assert.Equal(t, ``, itemAsString(c))
}

View File

@ -129,7 +129,7 @@ func (fh *RWFileHandle) openPending(truncate bool) (err error) {
} }
fh.File = fd fh.File = fd
fh.opened = true fh.opened = true
fh.d.vfs.cache.open(fh.osPath) fh.d.vfs.cache.open(fh.remote)
fh.d.addObject(fh.file) // make sure the directory has this object in it now fh.d.addObject(fh.file) // make sure the directory has this object in it now
return nil return nil
} }
@ -196,7 +196,7 @@ func (fh *RWFileHandle) close() (err error) {
fh.file.setSize(fi.Size()) fh.file.setSize(fi.Size())
} }
} }
fh.d.vfs.cache.close(fh.osPath) fh.d.vfs.cache.close(fh.remote)
// Close the underlying file // Close the underlying file
err = fh.File.Close() err = fh.File.Close()