rclone/backend/cache/cache.go
Anagh Kumar Baranwal 5a6750e1cd cache: documentation fix for cache-chunk-total-size - Fixes #2519
Signed-off-by: Anagh Kumar Baranwal <anaghk.dos@gmail.com>
2018-09-04 16:16:35 +01:00

1762 lines
50 KiB
Go

// +build !plan9
package cache
import (
"context"
"fmt"
"io"
"math"
"os"
"os/signal"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/ncw/rclone/backend/crypt"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/config"
"github.com/ncw/rclone/fs/config/configmap"
"github.com/ncw/rclone/fs/config/configstruct"
"github.com/ncw/rclone/fs/config/obscure"
"github.com/ncw/rclone/fs/hash"
"github.com/ncw/rclone/fs/rc"
"github.com/ncw/rclone/fs/walk"
"github.com/ncw/rclone/lib/atexit"
"github.com/pkg/errors"
"golang.org/x/time/rate"
)
const (
// DefCacheChunkSize is the default value for chunk size
DefCacheChunkSize = fs.SizeSuffix(5 * 1024 * 1024)
// DefCacheTotalChunkSize is the default value for the maximum size of stored chunks
DefCacheTotalChunkSize = fs.SizeSuffix(10 * 1024 * 1024 * 1024)
// DefCacheChunkCleanInterval is the interval at which chunks are cleaned
DefCacheChunkCleanInterval = fs.Duration(time.Minute)
// DefCacheInfoAge is the default value for object info age
DefCacheInfoAge = fs.Duration(6 * time.Hour)
// DefCacheReadRetries is the default value for read retries
DefCacheReadRetries = 10
// DefCacheTotalWorkers is how many workers run in parallel to download chunks
DefCacheTotalWorkers = 4
// DefCacheChunkNoMemory will enable or disable in-memory storage for chunks
DefCacheChunkNoMemory = false
// DefCacheRps limits the number of requests per second to the source FS
DefCacheRps = -1
// DefCacheWrites will cache file data on writes through the cache
DefCacheWrites = false
// DefCacheTmpWaitTime says how long should files be stored in local cache before being uploaded
DefCacheTmpWaitTime = fs.Duration(15 * time.Second)
// DefCacheDbWaitTime defines how long the cache backend should wait for the DB to be available
DefCacheDbWaitTime = fs.Duration(1 * time.Second)
)
// Register with Fs
func init() {
fs.Register(&fs.RegInfo{
Name: "cache",
Description: "Cache a remote",
NewFs: NewFs,
Options: []fs.Option{{
Name: "remote",
Help: "Remote to cache.\nNormally should contain a ':' and a path, eg \"myremote:path/to/dir\",\n\"myremote:bucket\" or maybe \"myremote:\" (not recommended).",
Required: true,
}, {
Name: "plex_url",
Help: "The URL of the Plex server",
}, {
Name: "plex_username",
Help: "The username of the Plex user",
}, {
Name: "plex_password",
Help: "The password of the Plex user",
IsPassword: true,
}, {
Name: "plex_token",
Help: "The plex token for authentication - auto set normally",
Hide: fs.OptionHideBoth,
Advanced: true,
}, {
Name: "chunk_size",
Help: "The size of a chunk. Lower value good for slow connections but can affect seamless reading.",
Default: DefCacheChunkSize,
Examples: []fs.OptionExample{{
Value: "1m",
Help: "1MB",
}, {
Value: "5M",
Help: "5 MB",
}, {
Value: "10M",
Help: "10 MB",
}},
}, {
Name: "info_age",
Help: "How much time should object info (file size, file hashes etc) be stored in cache.\nUse a very high value if you don't plan on changing the source FS from outside the cache.\nAccepted units are: \"s\", \"m\", \"h\".",
Default: DefCacheInfoAge,
Examples: []fs.OptionExample{{
Value: "1h",
Help: "1 hour",
}, {
Value: "24h",
Help: "24 hours",
}, {
Value: "48h",
Help: "48 hours",
}},
}, {
Name: "chunk_total_size",
Help: "The maximum size of stored chunks. When the storage grows beyond this size, the oldest chunks will be deleted.",
Default: DefCacheTotalChunkSize,
Examples: []fs.OptionExample{{
Value: "500M",
Help: "500 MB",
}, {
Value: "1G",
Help: "1 GB",
}, {
Value: "10G",
Help: "10 GB",
}},
}, {
Name: "db_path",
Default: filepath.Join(config.CacheDir, "cache-backend"),
Help: "Directory to cache DB",
Advanced: true,
}, {
Name: "chunk_path",
Default: filepath.Join(config.CacheDir, "cache-backend"),
Help: "Directory to cache chunk files",
Advanced: true,
}, {
Name: "db_purge",
Default: false,
Help: "Purge the cache DB before",
Hide: fs.OptionHideConfigurator,
Advanced: true,
}, {
Name: "chunk_clean_interval",
Default: DefCacheChunkCleanInterval,
Help: "Interval at which chunk cleanup runs",
Advanced: true,
}, {
Name: "read_retries",
Default: DefCacheReadRetries,
Help: "How many times to retry a read from a cache storage",
Advanced: true,
}, {
Name: "workers",
Default: DefCacheTotalWorkers,
Help: "How many workers should run in parallel to download chunks",
Advanced: true,
}, {
Name: "chunk_no_memory",
Default: DefCacheChunkNoMemory,
Help: "Disable the in-memory cache for storing chunks during streaming",
Advanced: true,
}, {
Name: "rps",
Default: int(DefCacheRps),
Help: "Limits the number of requests per second to the source FS. -1 disables the rate limiter",
Advanced: true,
}, {
Name: "writes",
Default: DefCacheWrites,
Help: "Will cache file data on writes through the FS",
Advanced: true,
}, {
Name: "tmp_upload_path",
Default: "",
Help: "Directory to keep temporary files until they are uploaded to the cloud storage",
Advanced: true,
}, {
Name: "tmp_wait_time",
Default: DefCacheTmpWaitTime,
Help: "How long should files be stored in local cache before being uploaded",
Advanced: true,
}, {
Name: "db_wait_time",
Default: DefCacheDbWaitTime,
Help: "How long to wait for the DB to be available - 0 is unlimited",
Advanced: true,
}},
})
}
// Options defines the configuration for this backend
type Options struct {
Remote string `config:"remote"`
PlexURL string `config:"plex_url"`
PlexUsername string `config:"plex_username"`
PlexPassword string `config:"plex_password"`
PlexToken string `config:"plex_token"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
InfoAge fs.Duration `config:"info_age"`
ChunkTotalSize fs.SizeSuffix `config:"chunk_total_size"`
DbPath string `config:"db_path"`
ChunkPath string `config:"chunk_path"`
DbPurge bool `config:"db_purge"`
ChunkCleanInterval fs.Duration `config:"chunk_clean_interval"`
ReadRetries int `config:"read_retries"`
TotalWorkers int `config:"workers"`
ChunkNoMemory bool `config:"chunk_no_memory"`
Rps int `config:"rps"`
StoreWrites bool `config:"writes"`
TempWritePath string `config:"tmp_upload_path"`
TempWaitTime fs.Duration `config:"tmp_wait_time"`
DbWaitTime fs.Duration `config:"db_wait_time"`
}
// Fs represents a wrapped fs.Fs
type Fs struct {
fs.Fs
wrapper fs.Fs
name string
root string
opt Options // parsed options
features *fs.Features // optional features
cache *Persistent
tempFs fs.Fs
lastChunkCleanup time.Time
cleanupMu sync.Mutex
rateLimiter *rate.Limiter
plexConnector *plexConnector
backgroundRunner *backgroundWriter
cleanupChan chan bool
parentsForgetFn []func(string, fs.EntryType)
notifiedRemotes map[string]bool
notifiedMu sync.Mutex
parentsForgetMu sync.Mutex
}
// parseRootPath returns a cleaned root path and a nil error or "" and an error when the path is invalid
func parseRootPath(path string) (string, error) {
return strings.Trim(path, "/"), nil
}
// NewFs constructs a Fs from the path, container:path
func NewFs(name, rootPath string, m configmap.Mapper) (fs.Fs, error) {
// Parse config into Options struct
opt := new(Options)
err := configstruct.Set(m, opt)
if err != nil {
return nil, err
}
if opt.ChunkTotalSize < opt.ChunkSize*fs.SizeSuffix(opt.TotalWorkers) {
return nil, errors.Errorf("don't set cache-chunk-total-size(%v) less than cache-chunk-size(%v) * cache-workers(%v)",
opt.ChunkTotalSize, opt.ChunkSize, opt.TotalWorkers)
}
if strings.HasPrefix(opt.Remote, name+":") {
return nil, errors.New("can't point cache remote at itself - check the value of the remote setting")
}
rpath, err := parseRootPath(rootPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to clean root path %q", rootPath)
}
remotePath := path.Join(opt.Remote, rpath)
wrappedFs, wrapErr := fs.NewFs(remotePath)
if wrapErr != nil && wrapErr != fs.ErrorIsFile {
return nil, errors.Wrapf(wrapErr, "failed to make remote %q to wrap", remotePath)
}
var fsErr error
fs.Debugf(name, "wrapped %v:%v at root %v", wrappedFs.Name(), wrappedFs.Root(), rpath)
if wrapErr == fs.ErrorIsFile {
fsErr = fs.ErrorIsFile
rpath = cleanPath(path.Dir(rpath))
}
// configure cache backend
if opt.DbPurge {
fs.Debugf(name, "Purging the DB")
}
f := &Fs{
Fs: wrappedFs,
name: name,
root: rpath,
opt: *opt,
lastChunkCleanup: time.Now().Truncate(time.Hour * 24 * 30),
cleanupChan: make(chan bool, 1),
notifiedRemotes: make(map[string]bool),
}
f.rateLimiter = rate.NewLimiter(rate.Limit(float64(opt.Rps)), opt.TotalWorkers)
f.plexConnector = &plexConnector{}
if opt.PlexURL != "" {
if opt.PlexToken != "" {
f.plexConnector, err = newPlexConnectorWithToken(f, opt.PlexURL, opt.PlexToken)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to the Plex API %v", opt.PlexURL)
}
} else {
if opt.PlexPassword != "" && opt.PlexUsername != "" {
decPass, err := obscure.Reveal(opt.PlexPassword)
if err != nil {
decPass = opt.PlexPassword
}
f.plexConnector, err = newPlexConnector(f, opt.PlexURL, opt.PlexUsername, decPass, func(token string) {
m.Set("plex_token", token)
})
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to the Plex API %v", opt.PlexURL)
}
}
}
}
dbPath := f.opt.DbPath
chunkPath := f.opt.ChunkPath
// if the dbPath is non default but the chunk path is default, we overwrite the last to follow the same one as dbPath
if dbPath != filepath.Join(config.CacheDir, "cache-backend") &&
chunkPath == filepath.Join(config.CacheDir, "cache-backend") {
chunkPath = dbPath
}
if filepath.Ext(dbPath) != "" {
dbPath = filepath.Dir(dbPath)
}
if filepath.Ext(chunkPath) != "" {
chunkPath = filepath.Dir(chunkPath)
}
err = os.MkdirAll(dbPath, os.ModePerm)
if err != nil {
return nil, errors.Wrapf(err, "failed to create cache directory %v", dbPath)
}
err = os.MkdirAll(chunkPath, os.ModePerm)
if err != nil {
return nil, errors.Wrapf(err, "failed to create cache directory %v", chunkPath)
}
dbPath = filepath.Join(dbPath, name+".db")
chunkPath = filepath.Join(chunkPath, name)
fs.Infof(name, "Cache DB path: %v", dbPath)
fs.Infof(name, "Cache chunk path: %v", chunkPath)
f.cache, err = GetPersistent(dbPath, chunkPath, &Features{
PurgeDb: opt.DbPurge,
DbWaitTime: time.Duration(opt.DbWaitTime),
})
if err != nil {
return nil, errors.Wrapf(err, "failed to start cache db")
}
// Trap SIGINT and SIGTERM to close the DB handle gracefully
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP)
atexit.Register(func() {
if opt.PlexURL != "" {
f.plexConnector.closeWebsocket()
}
f.StopBackgroundRunners()
})
go func() {
for {
s := <-c
if s == syscall.SIGHUP {
fs.Infof(f, "Clearing cache from signal")
f.DirCacheFlush()
}
}
}()
fs.Infof(name, "Chunk Memory: %v", !f.opt.ChunkNoMemory)
fs.Infof(name, "Chunk Size: %v", f.opt.ChunkSize)
fs.Infof(name, "Chunk Total Size: %v", f.opt.ChunkTotalSize)
fs.Infof(name, "Chunk Clean Interval: %v", f.opt.ChunkCleanInterval)
fs.Infof(name, "Workers: %v", f.opt.TotalWorkers)
fs.Infof(name, "File Age: %v", f.opt.InfoAge)
if !f.opt.StoreWrites {
fs.Infof(name, "Cache Writes: enabled")
}
if f.opt.TempWritePath != "" {
err = os.MkdirAll(f.opt.TempWritePath, os.ModePerm)
if err != nil {
return nil, errors.Wrapf(err, "failed to create cache directory %v", f.opt.TempWritePath)
}
f.opt.TempWritePath = filepath.ToSlash(f.opt.TempWritePath)
f.tempFs, err = fs.NewFs(f.opt.TempWritePath)
if err != nil {
return nil, errors.Wrapf(err, "failed to create temp fs: %v", err)
}
fs.Infof(name, "Upload Temp Rest Time: %v", f.opt.TempWaitTime)
fs.Infof(name, "Upload Temp FS: %v", f.opt.TempWritePath)
f.backgroundRunner, _ = initBackgroundUploader(f)
go f.backgroundRunner.run()
}
go func() {
for {
time.Sleep(time.Duration(f.opt.ChunkCleanInterval))
select {
case <-f.cleanupChan:
fs.Infof(f, "stopping cleanup")
return
default:
fs.Debugf(f, "starting cleanup")
f.CleanUpCache(false)
}
}
}()
if doChangeNotify := wrappedFs.Features().ChangeNotify; doChangeNotify != nil {
doChangeNotify(f.receiveChangeNotify, time.Duration(f.opt.ChunkCleanInterval))
}
f.features = (&fs.Features{
CanHaveEmptyDirectories: true,
DuplicateFiles: false, // storage doesn't permit this
}).Fill(f).Mask(wrappedFs).WrapsFs(f, wrappedFs)
// override only those features that use a temp fs and it doesn't support them
//f.features.ChangeNotify = f.ChangeNotify
if f.opt.TempWritePath != "" {
if f.tempFs.Features().Copy == nil {
f.features.Copy = nil
}
if f.tempFs.Features().Move == nil {
f.features.Move = nil
}
if f.tempFs.Features().Move == nil {
f.features.Move = nil
}
if f.tempFs.Features().DirMove == nil {
f.features.DirMove = nil
}
if f.tempFs.Features().MergeDirs == nil {
f.features.MergeDirs = nil
}
}
// even if the wrapped fs doesn't support it, we still want it
f.features.DirCacheFlush = f.DirCacheFlush
rc.Add(rc.Call{
Path: "cache/expire",
Fn: f.httpExpireRemote,
Title: "Purge a remote from cache",
Help: `
Purge a remote from the cache backend. Supports either a directory or a file.
Params:
- remote = path to remote (required)
- withData = true/false to delete cached data (chunks) as well (optional)
Eg
rclone rc cache/expire remote=path/to/sub/folder/
rclone rc cache/expire remote=/ withData=true
`,
})
rc.Add(rc.Call{
Path: "cache/stats",
Fn: f.httpStats,
Title: "Get cache stats",
Help: `
Show statistics for the cache remote.
`,
})
rc.Add(rc.Call{
Path: "cache/fetch",
Fn: f.rcFetch,
Title: "Fetch file chunks",
Help: `
Ensure the specified file chunks are cached on disk.
The chunks= parameter specifies the file chunks to check.
It takes a comma separated list of array slice indices.
The slice indices are similar to Python slices: start[:end]
start is the 0 based chunk number from the beginning of the file
to fetch inclusive. end is 0 based chunk number from the beginning
of the file to fetch exclisive.
Both values can be negative, in which case they count from the back
of the file. The value "-5:" represents the last 5 chunks of a file.
Some valid examples are:
":5,-5:" -> the first and last five chunks
"0,-2" -> the first and the second last chunk
"0:10" -> the first ten chunks
Any parameter with a key that starts with "file" can be used to
specify files to fetch, eg
rclone rc cache/fetch chunks=0 file=hello file2=home/goodbye
File names will automatically be encrypted when the a crypt remote
is used on top of the cache.
`,
})
return f, fsErr
}
func (f *Fs) httpStats(in rc.Params) (out rc.Params, err error) {
out = make(rc.Params)
m, err := f.Stats()
if err != nil {
return out, errors.Errorf("error while getting cache stats")
}
out["status"] = "ok"
out["stats"] = m
return out, nil
}
func (f *Fs) unwrapRemote(remote string) string {
remote = cleanPath(remote)
if remote != "" {
// if it's wrapped by crypt we need to check what format we got
if cryptFs, yes := f.isWrappedByCrypt(); yes {
_, err := cryptFs.DecryptFileName(remote)
// if it failed to decrypt then it is a decrypted format and we need to encrypt it
if err != nil {
return cryptFs.EncryptFileName(remote)
}
// else it's an encrypted format and we can use it as it is
}
}
return remote
}
func (f *Fs) httpExpireRemote(in rc.Params) (out rc.Params, err error) {
out = make(rc.Params)
remoteInt, ok := in["remote"]
if !ok {
return out, errors.Errorf("remote is needed")
}
remote := remoteInt.(string)
withData := false
_, ok = in["withData"]
if ok {
withData = true
}
remote = f.unwrapRemote(remote)
if !f.cache.HasEntry(path.Join(f.Root(), remote)) {
return out, errors.Errorf("%s doesn't exist in cache", remote)
}
co := NewObject(f, remote)
err = f.cache.GetObject(co)
if err != nil { // it could be a dir
cd := NewDirectory(f, remote)
err := f.cache.ExpireDir(cd)
if err != nil {
return out, errors.WithMessage(err, "error expiring directory")
}
// notify vfs too
f.notifyChangeUpstream(cd.Remote(), fs.EntryDirectory)
out["status"] = "ok"
out["message"] = fmt.Sprintf("cached directory cleared: %v", remote)
return out, nil
}
// expire the entry
err = f.cache.ExpireObject(co, withData)
if err != nil {
return out, errors.WithMessage(err, "error expiring file")
}
// notify vfs too
f.notifyChangeUpstream(co.Remote(), fs.EntryObject)
out["status"] = "ok"
out["message"] = fmt.Sprintf("cached file cleared: %v", remote)
return out, nil
}
func (f *Fs) rcFetch(in rc.Params) (rc.Params, error) {
type chunkRange struct {
start, end int64
}
parseChunks := func(ranges string) (crs []chunkRange, err error) {
for _, part := range strings.Split(ranges, ",") {
var start, end int64 = 0, math.MaxInt64
switch ints := strings.Split(part, ":"); len(ints) {
case 1:
start, err = strconv.ParseInt(ints[0], 10, 64)
if err != nil {
return nil, errors.Errorf("invalid range: %q", part)
}
end = start + 1
case 2:
if ints[0] != "" {
start, err = strconv.ParseInt(ints[0], 10, 64)
if err != nil {
return nil, errors.Errorf("invalid range: %q", part)
}
}
if ints[1] != "" {
end, err = strconv.ParseInt(ints[1], 10, 64)
if err != nil {
return nil, errors.Errorf("invalid range: %q", part)
}
}
default:
return nil, errors.Errorf("invalid range: %q", part)
}
crs = append(crs, chunkRange{start: start, end: end})
}
return
}
walkChunkRange := func(cr chunkRange, size int64, cb func(chunk int64)) {
if size <= 0 {
return
}
chunks := (size-1)/f.ChunkSize() + 1
start, end := cr.start, cr.end
if start < 0 {
start += chunks
}
if end <= 0 {
end += chunks
}
if end <= start {
return
}
switch {
case start < 0:
start = 0
case start >= chunks:
return
}
switch {
case end <= start:
end = start + 1
case end >= chunks:
end = chunks
}
for i := start; i < end; i++ {
cb(i)
}
}
walkChunkRanges := func(crs []chunkRange, size int64, cb func(chunk int64)) {
for _, cr := range crs {
walkChunkRange(cr, size, cb)
}
}
v, ok := in["chunks"]
if !ok {
return nil, errors.New("missing chunks parameter")
}
s, ok := v.(string)
if !ok {
return nil, errors.New("invalid chunks parameter")
}
delete(in, "chunks")
crs, err := parseChunks(s)
if err != nil {
return nil, errors.Wrap(err, "invalid chunks parameter")
}
var files [][2]string
for k, v := range in {
if !strings.HasPrefix(k, "file") {
return nil, errors.Errorf("invalid parameter %s=%s", k, v)
}
switch v := v.(type) {
case string:
files = append(files, [2]string{v, f.unwrapRemote(v)})
default:
return nil, errors.Errorf("invalid parameter %s=%s", k, v)
}
}
type fileStatus struct {
Error string
FetchedChunks int
}
fetchedChunks := make(map[string]fileStatus, len(files))
for _, pair := range files {
file, remote := pair[0], pair[1]
var status fileStatus
o, err := f.NewObject(remote)
if err != nil {
fetchedChunks[file] = fileStatus{Error: err.Error()}
continue
}
co := o.(*Object)
err = co.refreshFromSource(true)
if err != nil {
fetchedChunks[file] = fileStatus{Error: err.Error()}
continue
}
handle := NewObjectHandle(co, f)
handle.UseMemory = false
handle.scaleWorkers(1)
walkChunkRanges(crs, co.Size(), func(chunk int64) {
_, err := handle.getChunk(chunk * f.ChunkSize())
if err != nil {
if status.Error == "" {
status.Error = err.Error()
}
} else {
status.FetchedChunks++
}
})
fetchedChunks[file] = status
}
return rc.Params{"status": fetchedChunks}, nil
}
// receiveChangeNotify is a wrapper to notifications sent from the wrapped FS about changed files
func (f *Fs) receiveChangeNotify(forgetPath string, entryType fs.EntryType) {
if crypt, yes := f.isWrappedByCrypt(); yes {
decryptedPath, err := crypt.DecryptFileName(forgetPath)
if err == nil {
fs.Infof(decryptedPath, "received cache expiry notification")
} else {
fs.Infof(forgetPath, "received cache expiry notification")
}
} else {
fs.Infof(forgetPath, "received cache expiry notification")
}
// notify upstreams too (vfs)
f.notifyChangeUpstream(forgetPath, entryType)
var cd *Directory
if entryType == fs.EntryObject {
co := NewObject(f, forgetPath)
err := f.cache.GetObject(co)
if err != nil {
fs.Debugf(f, "got change notification for non cached entry %v", co)
}
err = f.cache.ExpireObject(co, true)
if err != nil {
fs.Debugf(forgetPath, "notify: error expiring '%v': %v", co, err)
}
cd = NewDirectory(f, cleanPath(path.Dir(co.Remote())))
} else {
cd = NewDirectory(f, forgetPath)
}
// we expire the dir
err := f.cache.ExpireDir(cd)
if err != nil {
fs.Debugf(forgetPath, "notify: error expiring '%v': %v", cd, err)
} else {
fs.Debugf(forgetPath, "notify: expired '%v'", cd)
}
f.notifiedMu.Lock()
defer f.notifiedMu.Unlock()
f.notifiedRemotes[forgetPath] = true
f.notifiedRemotes[cd.Remote()] = true
}
// notifyChangeUpstreamIfNeeded will check if the wrapped remote doesn't notify on changes
// or if we use a temp fs
func (f *Fs) notifyChangeUpstreamIfNeeded(remote string, entryType fs.EntryType) {
if f.Fs.Features().ChangeNotify == nil || f.opt.TempWritePath != "" {
f.notifyChangeUpstream(remote, entryType)
}
}
// notifyChangeUpstream will loop through all the upstreams and notify
// of the provided remote (should be only a dir)
func (f *Fs) notifyChangeUpstream(remote string, entryType fs.EntryType) {
f.parentsForgetMu.Lock()
defer f.parentsForgetMu.Unlock()
if len(f.parentsForgetFn) > 0 {
for _, fn := range f.parentsForgetFn {
fn(remote, entryType)
}
}
}
// ChangeNotify can subsribe multiple callers
// this is coupled with the wrapped fs ChangeNotify (if it supports it)
// and also notifies other caches (i.e VFS) to clear out whenever something changes
func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollInterval time.Duration) chan bool {
f.parentsForgetMu.Lock()
defer f.parentsForgetMu.Unlock()
fs.Debugf(f, "subscribing to ChangeNotify")
f.parentsForgetFn = append(f.parentsForgetFn, notifyFunc)
return make(chan bool)
}
// Name of the remote (as passed into NewFs)
func (f *Fs) Name() string {
return f.name
}
// Root of the remote (as passed into NewFs)
func (f *Fs) Root() string {
return f.root
}
// Features returns the optional features of this Fs
func (f *Fs) Features() *fs.Features {
return f.features
}
// String returns a description of the FS
func (f *Fs) String() string {
return fmt.Sprintf("Cache remote %s:%s", f.name, f.root)
}
// ChunkSize returns the configured chunk size
func (f *Fs) ChunkSize() int64 {
return int64(f.opt.ChunkSize)
}
// InfoAge returns the configured file age
func (f *Fs) InfoAge() time.Duration {
return time.Duration(f.opt.InfoAge)
}
// TempUploadWaitTime returns the configured temp file upload wait time
func (f *Fs) TempUploadWaitTime() time.Duration {
return time.Duration(f.opt.TempWaitTime)
}
// NewObject finds the Object at remote.
func (f *Fs) NewObject(remote string) (fs.Object, error) {
var err error
fs.Debugf(f, "new object '%s'", remote)
co := NewObject(f, remote)
// search for entry in cache and validate it
err = f.cache.GetObject(co)
if err != nil {
fs.Debugf(remote, "find: error: %v", err)
} else if time.Now().After(co.CacheTs.Add(time.Duration(f.opt.InfoAge))) {
fs.Debugf(co, "find: cold object: %+v", co)
} else {
fs.Debugf(co, "find: warm object: %v, expiring on: %v", co, co.CacheTs.Add(time.Duration(f.opt.InfoAge)))
return co, nil
}
// search for entry in source or temp fs
var obj fs.Object
if f.opt.TempWritePath != "" {
obj, err = f.tempFs.NewObject(remote)
// not found in temp fs
if err != nil {
fs.Debugf(remote, "find: not found in local cache fs")
obj, err = f.Fs.NewObject(remote)
} else {
fs.Debugf(obj, "find: found in local cache fs")
}
} else {
obj, err = f.Fs.NewObject(remote)
}
// not found in either fs
if err != nil {
fs.Debugf(obj, "find failed: not found in either local or remote fs")
return nil, err
}
// cache the new entry
co = ObjectFromOriginal(f, obj).persist()
fs.Debugf(co, "find: cached object")
return co, nil
}
// List the objects and directories in dir into entries
func (f *Fs) List(dir string) (entries fs.DirEntries, err error) {
fs.Debugf(f, "list '%s'", dir)
cd := ShallowDirectory(f, dir)
// search for cached dir entries and validate them
entries, err = f.cache.GetDirEntries(cd)
if err != nil {
fs.Debugf(dir, "list: error: %v", err)
} else if time.Now().After(cd.CacheTs.Add(time.Duration(f.opt.InfoAge))) {
fs.Debugf(dir, "list: cold listing: %v", cd.CacheTs)
} else if len(entries) == 0 {
// TODO: read empty dirs from source?
fs.Debugf(dir, "list: empty listing")
} else {
fs.Debugf(dir, "list: warm %v from cache for: %v, expiring on: %v", len(entries), cd.abs(), cd.CacheTs.Add(time.Duration(f.opt.InfoAge)))
fs.Debugf(dir, "list: cached entries: %v", entries)
return entries, nil
}
// FIXME need to clean existing cached listing
// we first search any temporary files stored locally
var cachedEntries fs.DirEntries
if f.opt.TempWritePath != "" {
queuedEntries, err := f.cache.searchPendingUploadFromDir(cd.abs())
if err != nil {
fs.Errorf(dir, "list: error getting pending uploads: %v", err)
} else {
fs.Debugf(dir, "list: read %v from temp fs", len(queuedEntries))
fs.Debugf(dir, "list: temp fs entries: %v", queuedEntries)
for _, queuedRemote := range queuedEntries {
queuedEntry, err := f.tempFs.NewObject(f.cleanRootFromPath(queuedRemote))
if err != nil {
fs.Debugf(dir, "list: temp file not found in local fs: %v", err)
continue
}
co := ObjectFromOriginal(f, queuedEntry).persist()
fs.Debugf(co, "list: cached temp object")
cachedEntries = append(cachedEntries, co)
}
}
}
// search from the source
entries, err = f.Fs.List(dir)
if err != nil {
return nil, err
}
fs.Debugf(dir, "list: read %v from source", len(entries))
fs.Debugf(dir, "list: source entries: %v", entries)
// and then iterate over the ones from source (temp Objects will override source ones)
var batchDirectories []*Directory
for _, entry := range entries {
switch o := entry.(type) {
case fs.Object:
// skip over temporary objects (might be uploading)
found := false
for _, t := range cachedEntries {
if t.Remote() == o.Remote() {
found = true
break
}
}
if found {
continue
}
co := ObjectFromOriginal(f, o).persist()
cachedEntries = append(cachedEntries, co)
fs.Debugf(dir, "list: cached object: %v", co)
case fs.Directory:
cdd := DirectoryFromOriginal(f, o)
// check if the dir isn't expired and add it in cache if it isn't
if cdd2, err := f.cache.GetDir(cdd.abs()); err != nil || time.Now().Before(cdd2.CacheTs.Add(time.Duration(f.opt.InfoAge))) {
batchDirectories = append(batchDirectories, cdd)
}
cachedEntries = append(cachedEntries, cdd)
default:
fs.Debugf(entry, "list: Unknown object type %T", entry)
}
}
err = f.cache.AddBatchDir(batchDirectories)
if err != nil {
fs.Errorf(dir, "list: error caching directories from listing %v", dir)
} else {
fs.Debugf(dir, "list: cached directories: %v", len(batchDirectories))
}
// cache dir meta
t := time.Now()
cd.CacheTs = &t
err = f.cache.AddDir(cd)
if err != nil {
fs.Errorf(cd, "list: save error: '%v'", err)
} else {
fs.Debugf(dir, "list: cached dir: '%v', cache ts: %v", cd.abs(), cd.CacheTs)
}
return cachedEntries, nil
}
func (f *Fs) recurse(dir string, list *walk.ListRHelper) error {
entries, err := f.List(dir)
if err != nil {
return err
}
for i := 0; i < len(entries); i++ {
innerDir, ok := entries[i].(fs.Directory)
if ok {
err := f.recurse(innerDir.Remote(), list)
if err != nil {
return err
}
}
err := list.Add(entries[i])
if err != nil {
return err
}
}
return nil
}
// ListR lists the objects and directories of the Fs starting
// from dir recursively into out.
func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) {
fs.Debugf(f, "list recursively from '%s'", dir)
// we check if the source FS supports ListR
// if it does, we'll use that to get all the entries, cache them and return
do := f.Fs.Features().ListR
if do != nil {
return do(dir, func(entries fs.DirEntries) error {
// we got called back with a set of entries so let's cache them and call the original callback
for _, entry := range entries {
switch o := entry.(type) {
case fs.Object:
_ = f.cache.AddObject(ObjectFromOriginal(f, o))
case fs.Directory:
_ = f.cache.AddDir(DirectoryFromOriginal(f, o))
default:
return errors.Errorf("Unknown object type %T", entry)
}
}
// call the original callback
return callback(entries)
})
}
// if we're here, we're gonna do a standard recursive traversal and cache everything
list := walk.NewListRHelper(callback)
err = f.recurse(dir, list)
if err != nil {
return err
}
return list.Flush()
}
// Mkdir makes the directory (container, bucket)
func (f *Fs) Mkdir(dir string) error {
fs.Debugf(f, "mkdir '%s'", dir)
err := f.Fs.Mkdir(dir)
if err != nil {
return err
}
fs.Debugf(dir, "mkdir: created dir in source fs")
cd := NewDirectory(f, cleanPath(dir))
err = f.cache.AddDir(cd)
if err != nil {
fs.Errorf(dir, "mkdir: add error: %v", err)
} else {
fs.Debugf(cd, "mkdir: added to cache")
}
// expire parent of new dir
parentCd := NewDirectory(f, cleanPath(path.Dir(dir)))
err = f.cache.ExpireDir(parentCd)
if err != nil {
fs.Errorf(parentCd, "mkdir: cache expire error: %v", err)
} else {
fs.Infof(parentCd, "mkdir: cache expired")
}
// advertise to ChangeNotify if wrapped doesn't do that
f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory)
return nil
}
// Rmdir removes the directory (container, bucket) if empty
func (f *Fs) Rmdir(dir string) error {
fs.Debugf(f, "rmdir '%s'", dir)
if f.opt.TempWritePath != "" {
// pause background uploads
f.backgroundRunner.pause()
defer f.backgroundRunner.play()
// we check if the source exists on the remote and make the same move on it too if it does
// otherwise, we skip this step
_, err := f.UnWrap().List(dir)
if err == nil {
err := f.Fs.Rmdir(dir)
if err != nil {
return err
}
fs.Debugf(dir, "rmdir: removed dir in source fs")
}
var queuedEntries []*Object
err = walk.Walk(f.tempFs, dir, true, -1, func(path string, entries fs.DirEntries, err error) error {
for _, o := range entries {
if oo, ok := o.(fs.Object); ok {
co := ObjectFromOriginal(f, oo)
queuedEntries = append(queuedEntries, co)
}
}
return nil
})
if err != nil {
fs.Errorf(dir, "rmdir: error getting pending uploads: %v", err)
} else {
fs.Debugf(dir, "rmdir: read %v from temp fs", len(queuedEntries))
fs.Debugf(dir, "rmdir: temp fs entries: %v", queuedEntries)
if len(queuedEntries) > 0 {
fs.Errorf(dir, "rmdir: temporary dir not empty: %v", queuedEntries)
return fs.ErrorDirectoryNotEmpty
}
}
} else {
err := f.Fs.Rmdir(dir)
if err != nil {
return err
}
fs.Debugf(dir, "rmdir: removed dir in source fs")
}
// remove dir data
d := NewDirectory(f, dir)
err := f.cache.RemoveDir(d.abs())
if err != nil {
fs.Errorf(dir, "rmdir: remove error: %v", err)
} else {
fs.Debugf(d, "rmdir: removed from cache")
}
// expire parent
parentCd := NewDirectory(f, cleanPath(path.Dir(dir)))
err = f.cache.ExpireDir(parentCd)
if err != nil {
fs.Errorf(dir, "rmdir: cache expire error: %v", err)
} else {
fs.Infof(parentCd, "rmdir: cache expired")
}
// advertise to ChangeNotify if wrapped doesn't do that
f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory)
return nil
}
// DirMove moves src, srcRemote to this remote at dstRemote
// using server side move operations.
func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error {
fs.Debugf(f, "move dir '%s'/'%s' -> '%s'/'%s'", src.Root(), srcRemote, f.Root(), dstRemote)
do := f.Fs.Features().DirMove
if do == nil {
return fs.ErrorCantDirMove
}
srcFs, ok := src.(*Fs)
if !ok {
fs.Errorf(srcFs, "can't move directory - not same remote type")
return fs.ErrorCantDirMove
}
if srcFs.Fs.Name() != f.Fs.Name() {
fs.Errorf(srcFs, "can't move directory - not wrapping same remotes")
return fs.ErrorCantDirMove
}
if f.opt.TempWritePath != "" {
// pause background uploads
f.backgroundRunner.pause()
defer f.backgroundRunner.play()
_, errInWrap := srcFs.UnWrap().List(srcRemote)
_, errInTemp := f.tempFs.List(srcRemote)
// not found in either fs
if errInWrap != nil && errInTemp != nil {
return fs.ErrorDirNotFound
}
// we check if the source exists on the remote and make the same move on it too if it does
// otherwise, we skip this step
if errInWrap == nil {
err := do(srcFs.UnWrap(), srcRemote, dstRemote)
if err != nil {
return err
}
fs.Debugf(srcRemote, "movedir: dir moved in the source fs")
}
// we need to check if the directory exists in the temp fs
// and skip the move if it doesn't
if errInTemp != nil {
goto cleanup
}
var queuedEntries []*Object
err := walk.Walk(f.tempFs, srcRemote, true, -1, func(path string, entries fs.DirEntries, err error) error {
for _, o := range entries {
if oo, ok := o.(fs.Object); ok {
co := ObjectFromOriginal(f, oo)
queuedEntries = append(queuedEntries, co)
if co.tempFileStartedUpload() {
fs.Errorf(co, "can't move - upload has already started. need to finish that")
return fs.ErrorCantDirMove
}
}
}
return nil
})
if err != nil {
return err
}
fs.Debugf(srcRemote, "dirmove: read %v from temp fs", len(queuedEntries))
fs.Debugf(srcRemote, "dirmove: temp fs entries: %v", queuedEntries)
do := f.tempFs.Features().DirMove
if do == nil {
fs.Errorf(srcRemote, "dirmove: can't move dir in temp fs")
return fs.ErrorCantDirMove
}
err = do(f.tempFs, srcRemote, dstRemote)
if err != nil {
return err
}
err = f.cache.ReconcileTempUploads(f)
if err != nil {
return err
}
} else {
err := do(srcFs.UnWrap(), srcRemote, dstRemote)
if err != nil {
return err
}
fs.Debugf(srcRemote, "movedir: dir moved in the source fs")
}
cleanup:
// delete src dir from cache along with all chunks
srcDir := NewDirectory(srcFs, srcRemote)
err := f.cache.RemoveDir(srcDir.abs())
if err != nil {
fs.Errorf(srcDir, "dirmove: remove error: %v", err)
} else {
fs.Debugf(srcDir, "dirmove: removed cached dir")
}
// expire src parent
srcParent := NewDirectory(f, cleanPath(path.Dir(srcRemote)))
err = f.cache.ExpireDir(srcParent)
if err != nil {
fs.Errorf(srcParent, "dirmove: cache expire error: %v", err)
} else {
fs.Debugf(srcParent, "dirmove: cache expired")
}
// advertise to ChangeNotify if wrapped doesn't do that
f.notifyChangeUpstreamIfNeeded(srcParent.Remote(), fs.EntryDirectory)
// expire parent dir at the destination path
dstParent := NewDirectory(f, cleanPath(path.Dir(dstRemote)))
err = f.cache.ExpireDir(dstParent)
if err != nil {
fs.Errorf(dstParent, "dirmove: cache expire error: %v", err)
} else {
fs.Debugf(dstParent, "dirmove: cache expired")
}
// advertise to ChangeNotify if wrapped doesn't do that
f.notifyChangeUpstreamIfNeeded(dstParent.Remote(), fs.EntryDirectory)
// TODO: precache dst dir and save the chunks
return nil
}
// cacheReader will split the stream of a reader to be cached at the same time it is read by the original source
func (f *Fs) cacheReader(u io.Reader, src fs.ObjectInfo, originalRead func(inn io.Reader)) {
// create the pipe and tee reader
pr, pw := io.Pipe()
tr := io.TeeReader(u, pw)
// create channel to synchronize
done := make(chan bool)
defer close(done)
go func() {
// notify the cache reader that we're complete after the source FS finishes
defer func() {
_ = pw.Close()
}()
// process original reading
originalRead(tr)
// signal complete
done <- true
}()
go func() {
var offset int64
for {
chunk := make([]byte, f.opt.ChunkSize)
readSize, err := io.ReadFull(pr, chunk)
// we ignore 3 failures which are ok:
// 1. EOF - original reading finished and we got a full buffer too
// 2. ErrUnexpectedEOF - original reading finished and partial buffer
// 3. ErrClosedPipe - source remote reader was closed (usually means it reached the end) and we need to stop too
// if we have a different error: we're going to error out the original reading too and stop this
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF && err != io.ErrClosedPipe {
fs.Errorf(src, "error saving new data in cache. offset: %v, err: %v", offset, err)
_ = pr.CloseWithError(err)
break
}
// if we have some bytes we cache them
if readSize > 0 {
chunk = chunk[:readSize]
err2 := f.cache.AddChunk(cleanPath(path.Join(f.root, src.Remote())), chunk, offset)
if err2 != nil {
fs.Errorf(src, "error saving new data in cache '%v'", err2)
_ = pr.CloseWithError(err2)
break
}
offset += int64(readSize)
}
// stuff should be closed but let's be sure
if err == io.EOF || err == io.ErrUnexpectedEOF || err == io.ErrClosedPipe {
_ = pr.Close()
break
}
}
// signal complete
done <- true
}()
// wait until both are done
for c := 0; c < 2; c++ {
<-done
}
}
type putFn func(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error)
// put in to the remote path
func (f *Fs) put(in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn) (fs.Object, error) {
var err error
var obj fs.Object
// queue for upload and store in temp fs if configured
if f.opt.TempWritePath != "" {
// we need to clear the caches before a put through temp fs
parentCd := NewDirectory(f, cleanPath(path.Dir(src.Remote())))
_ = f.cache.ExpireDir(parentCd)
f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory)
obj, err = f.tempFs.Put(in, src, options...)
if err != nil {
fs.Errorf(obj, "put: failed to upload in temp fs: %v", err)
return nil, err
}
fs.Infof(obj, "put: uploaded in temp fs")
err = f.cache.addPendingUpload(path.Join(f.Root(), src.Remote()), false)
if err != nil {
fs.Errorf(obj, "put: failed to queue for upload: %v", err)
return nil, err
}
fs.Infof(obj, "put: queued for upload")
// if cache writes is enabled write it first through cache
} else if f.opt.StoreWrites {
f.cacheReader(in, src, func(inn io.Reader) {
obj, err = put(inn, src, options...)
})
if err == nil {
fs.Debugf(obj, "put: uploaded to remote fs and saved in cache")
}
// last option: save it directly in remote fs
} else {
obj, err = put(in, src, options...)
if err == nil {
fs.Debugf(obj, "put: uploaded to remote fs")
}
}
// validate and stop if errors are found
if err != nil {
fs.Errorf(src, "put: error uploading: %v", err)
return nil, err
}
// cache the new file
cachedObj := ObjectFromOriginal(f, obj)
// deleting cached chunks and info to be replaced with new ones
_ = f.cache.RemoveObject(cachedObj.abs())
cachedObj.persist()
fs.Debugf(cachedObj, "put: added to cache")
// expire parent
parentCd := NewDirectory(f, cleanPath(path.Dir(cachedObj.Remote())))
err = f.cache.ExpireDir(parentCd)
if err != nil {
fs.Errorf(cachedObj, "put: cache expire error: %v", err)
} else {
fs.Infof(parentCd, "put: cache expired")
}
// advertise to ChangeNotify
f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory)
return cachedObj, nil
}
// Put in to the remote path with the modTime given of the given size
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
fs.Debugf(f, "put data at '%s'", src.Remote())
return f.put(in, src, options, f.Fs.Put)
}
// PutUnchecked uploads the object
func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
do := f.Fs.Features().PutUnchecked
if do == nil {
return nil, errors.New("can't PutUnchecked")
}
fs.Debugf(f, "put data unchecked in '%s'", src.Remote())
return f.put(in, src, options, do)
}
// PutStream uploads the object
func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
do := f.Fs.Features().PutStream
if do == nil {
return nil, errors.New("can't PutStream")
}
fs.Debugf(f, "put data streaming in '%s'", src.Remote())
return f.put(in, src, options, do)
}
// Copy src to this remote using server side copy operations.
func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
fs.Debugf(f, "copy obj '%s' -> '%s'", src, remote)
do := f.Fs.Features().Copy
if do == nil {
fs.Errorf(src, "source remote (%v) doesn't support Copy", src.Fs())
return nil, fs.ErrorCantCopy
}
// the source must be a cached object or we abort
srcObj, ok := src.(*Object)
if !ok {
fs.Errorf(srcObj, "can't copy - not same remote type")
return nil, fs.ErrorCantCopy
}
// both the source cache fs and this cache fs need to wrap the same remote
if srcObj.CacheFs.Fs.Name() != f.Fs.Name() {
fs.Errorf(srcObj, "can't copy - not wrapping same remotes")
return nil, fs.ErrorCantCopy
}
// refresh from source or abort
if err := srcObj.refreshFromSource(false); err != nil {
fs.Errorf(f, "can't copy %v - %v", src, err)
return nil, fs.ErrorCantCopy
}
if srcObj.isTempFile() {
// we check if the feature is stil active
if f.opt.TempWritePath == "" {
fs.Errorf(srcObj, "can't copy - this is a local cached file but this feature is turned off this run")
return nil, fs.ErrorCantCopy
}
do = srcObj.ParentFs.Features().Copy
if do == nil {
fs.Errorf(src, "parent remote (%v) doesn't support Copy", srcObj.ParentFs)
return nil, fs.ErrorCantCopy
}
}
obj, err := do(srcObj.Object, remote)
if err != nil {
fs.Errorf(srcObj, "error moving in cache: %v", err)
return nil, err
}
fs.Debugf(obj, "copy: file copied")
// persist new
co := ObjectFromOriginal(f, obj).persist()
fs.Debugf(co, "copy: added to cache")
// expire the destination path
parentCd := NewDirectory(f, cleanPath(path.Dir(co.Remote())))
err = f.cache.ExpireDir(parentCd)
if err != nil {
fs.Errorf(parentCd, "copy: cache expire error: %v", err)
} else {
fs.Infof(parentCd, "copy: cache expired")
}
// advertise to ChangeNotify if wrapped doesn't do that
f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory)
// expire src parent
srcParent := NewDirectory(f, cleanPath(path.Dir(src.Remote())))
err = f.cache.ExpireDir(srcParent)
if err != nil {
fs.Errorf(srcParent, "copy: cache expire error: %v", err)
} else {
fs.Infof(srcParent, "copy: cache expired")
}
// advertise to ChangeNotify if wrapped doesn't do that
f.notifyChangeUpstreamIfNeeded(srcParent.Remote(), fs.EntryDirectory)
return co, nil
}
// Move src to this remote using server side move operations.
func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) {
fs.Debugf(f, "moving obj '%s' -> %s", src, remote)
// if source fs doesn't support move abort
do := f.Fs.Features().Move
if do == nil {
fs.Errorf(src, "source remote (%v) doesn't support Move", src.Fs())
return nil, fs.ErrorCantMove
}
// the source must be a cached object or we abort
srcObj, ok := src.(*Object)
if !ok {
fs.Errorf(srcObj, "can't move - not same remote type")
return nil, fs.ErrorCantMove
}
// both the source cache fs and this cache fs need to wrap the same remote
if srcObj.CacheFs.Fs.Name() != f.Fs.Name() {
fs.Errorf(srcObj, "can't move - not wrapping same remote types")
return nil, fs.ErrorCantMove
}
// refresh from source or abort
if err := srcObj.refreshFromSource(false); err != nil {
fs.Errorf(f, "can't move %v - %v", src, err)
return nil, fs.ErrorCantMove
}
// if this is a temp object then we perform the changes locally
if srcObj.isTempFile() {
// we check if the feature is stil active
if f.opt.TempWritePath == "" {
fs.Errorf(srcObj, "can't move - this is a local cached file but this feature is turned off this run")
return nil, fs.ErrorCantMove
}
// pause background uploads
f.backgroundRunner.pause()
defer f.backgroundRunner.play()
// started uploads can't be moved until they complete
if srcObj.tempFileStartedUpload() {
fs.Errorf(srcObj, "can't move - upload has already started. need to finish that")
return nil, fs.ErrorCantMove
}
do = f.tempFs.Features().Move
// we must also update the pending queue
err := f.cache.updatePendingUpload(srcObj.abs(), func(item *tempUploadInfo) error {
item.DestPath = path.Join(f.Root(), remote)
item.AddedOn = time.Now()
return nil
})
if err != nil {
fs.Errorf(srcObj, "failed to rename queued file for upload: %v", err)
return nil, fs.ErrorCantMove
}
fs.Debugf(srcObj, "move: queued file moved to %v", remote)
}
obj, err := do(srcObj.Object, remote)
if err != nil {
fs.Errorf(srcObj, "error moving: %v", err)
return nil, err
}
fs.Debugf(obj, "move: file moved")
// remove old
err = f.cache.RemoveObject(srcObj.abs())
if err != nil {
fs.Errorf(srcObj, "move: remove error: %v", err)
} else {
fs.Debugf(srcObj, "move: removed from cache")
}
// expire old parent
parentCd := NewDirectory(f, cleanPath(path.Dir(srcObj.Remote())))
err = f.cache.ExpireDir(parentCd)
if err != nil {
fs.Errorf(parentCd, "move: parent cache expire error: %v", err)
} else {
fs.Infof(parentCd, "move: cache expired")
}
// advertise to ChangeNotify if wrapped doesn't do that
f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory)
// persist new
cachedObj := ObjectFromOriginal(f, obj).persist()
fs.Debugf(cachedObj, "move: added to cache")
// expire new parent
parentCd = NewDirectory(f, cleanPath(path.Dir(cachedObj.Remote())))
err = f.cache.ExpireDir(parentCd)
if err != nil {
fs.Errorf(parentCd, "move: expire error: %v", err)
} else {
fs.Infof(parentCd, "move: cache expired")
}
// advertise to ChangeNotify if wrapped doesn't do that
f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory)
return cachedObj, nil
}
// Hashes returns the supported hash sets.
func (f *Fs) Hashes() hash.Set {
return f.Fs.Hashes()
}
// Purge all files in the root and the root directory
func (f *Fs) Purge() error {
fs.Infof(f, "purging cache")
f.cache.Purge()
do := f.Fs.Features().Purge
if do == nil {
return nil
}
err := do()
if err != nil {
return err
}
return nil
}
// CleanUp the trash in the Fs
func (f *Fs) CleanUp() error {
f.CleanUpCache(false)
do := f.Fs.Features().CleanUp
if do == nil {
return nil
}
return do()
}
// About gets quota information from the Fs
func (f *Fs) About() (*fs.Usage, error) {
do := f.Fs.Features().About
if do == nil {
return nil, errors.New("About not supported")
}
return do()
}
// Stats returns stats about the cache storage
func (f *Fs) Stats() (map[string]map[string]interface{}, error) {
return f.cache.Stats()
}
// openRateLimited will execute a closure under a rate limiter watch
func (f *Fs) openRateLimited(fn func() (io.ReadCloser, error)) (io.ReadCloser, error) {
var err error
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
start := time.Now()
if err = f.rateLimiter.Wait(ctx); err != nil {
return nil, err
}
elapsed := time.Since(start)
if elapsed > time.Second*2 {
fs.Debugf(f, "rate limited: %s", elapsed)
}
return fn()
}
// CleanUpCache will cleanup only the cache data that is expired
func (f *Fs) CleanUpCache(ignoreLastTs bool) {
f.cleanupMu.Lock()
defer f.cleanupMu.Unlock()
if ignoreLastTs || time.Now().After(f.lastChunkCleanup.Add(time.Duration(f.opt.ChunkCleanInterval))) {
f.cache.CleanChunksBySize(int64(f.opt.ChunkTotalSize))
f.lastChunkCleanup = time.Now()
}
}
// StopBackgroundRunners will signall all the runners to stop their work
// can be triggered from a terminate signal or from testing between runs
func (f *Fs) StopBackgroundRunners() {
f.cleanupChan <- false
if f.opt.TempWritePath != "" && f.backgroundRunner != nil && f.backgroundRunner.isRunning() {
f.backgroundRunner.close()
}
f.cache.Close()
fs.Debugf(f, "Services stopped")
}
// UnWrap returns the Fs that this Fs is wrapping
func (f *Fs) UnWrap() fs.Fs {
return f.Fs
}
// WrapFs returns the Fs that is wrapping this Fs
func (f *Fs) WrapFs() fs.Fs {
return f.wrapper
}
// SetWrapper sets the Fs that is wrapping this Fs
func (f *Fs) SetWrapper(wrapper fs.Fs) {
f.wrapper = wrapper
}
// isWrappedByCrypt checks if this is wrapped by a crypt remote
func (f *Fs) isWrappedByCrypt() (*crypt.Fs, bool) {
if f.wrapper == nil {
return nil, false
}
c, ok := f.wrapper.(*crypt.Fs)
return c, ok
}
// cleanRootFromPath trims the root of the current fs from a path
func (f *Fs) cleanRootFromPath(p string) string {
if f.Root() != "" {
p = p[len(f.Root()):] // trim out root
if len(p) > 0 { // remove first separator
p = p[1:]
}
}
return p
}
func (f *Fs) isRootInPath(p string) bool {
if f.Root() == "" {
return true
}
return strings.HasPrefix(p, f.Root()+"/")
}
// DirCacheFlush flushes the dir cache
func (f *Fs) DirCacheFlush() {
_ = f.cache.RemoveDir("")
}
// GetBackgroundUploadChannel returns a channel that can be listened to for remote activities that happen
// in the background
func (f *Fs) GetBackgroundUploadChannel() chan BackgroundUploadState {
if f.opt.TempWritePath != "" {
return f.backgroundRunner.notifyCh
}
return nil
}
func (f *Fs) isNotifiedRemote(remote string) bool {
f.notifiedMu.Lock()
defer f.notifiedMu.Unlock()
n, ok := f.notifiedRemotes[remote]
if !ok || !n {
return false
}
delete(f.notifiedRemotes, remote)
return n
}
func cleanPath(p string) string {
p = path.Clean(p)
if p == "." || p == "/" {
p = ""
}
return p
}
// Check the interfaces are satisfied
var (
_ fs.Fs = (*Fs)(nil)
_ fs.Purger = (*Fs)(nil)
_ fs.Copier = (*Fs)(nil)
_ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil)
_ fs.PutUncheckeder = (*Fs)(nil)
_ fs.PutStreamer = (*Fs)(nil)
_ fs.CleanUpper = (*Fs)(nil)
_ fs.UnWrapper = (*Fs)(nil)
_ fs.Wrapper = (*Fs)(nil)
_ fs.ListRer = (*Fs)(nil)
_ fs.ChangeNotifier = (*Fs)(nil)
_ fs.Abouter = (*Fs)(nil)
)