diff --git a/fs/rc/cache.go b/fs/rc/cache.go index 6d7cf48bd..af3aac230 100644 --- a/fs/rc/cache.go +++ b/fs/rc/cache.go @@ -4,29 +4,66 @@ package rc import ( "sync" + "time" "github.com/ncw/rclone/fs" ) var ( - fsCacheMu sync.Mutex - fsCache = map[string]fs.Fs{} - fsNewFs = fs.NewFs // for tests + fsCacheMu sync.Mutex + fsCache = map[string]*cacheEntry{} + fsNewFs = fs.NewFs // for tests + expireRunning = false + cacheExpireDuration = 300 * time.Second // expire the cache entry when it is older than this + cacheExpireInterval = 60 * time.Second // interval to run the cache expire ) +type cacheEntry struct { + f fs.Fs + fsString string + lastUsed time.Time +} + // GetCachedFs gets a fs.Fs named fsString either from the cache or creates it afresh func GetCachedFs(fsString string) (f fs.Fs, err error) { fsCacheMu.Lock() defer fsCacheMu.Unlock() - - f = fsCache[fsString] - if f == nil { + entry, ok := fsCache[fsString] + if !ok { f, err = fsNewFs(fsString) - if err == nil { - fsCache[fsString] = f + if err != nil { + return nil, err + } + entry = &cacheEntry{ + f: f, + fsString: fsString, + } + fsCache[fsString] = entry + } + entry.lastUsed = time.Now() + if !expireRunning { + time.AfterFunc(cacheExpireInterval, cacheExpire) + expireRunning = true + } + return entry.f, err +} + +// cacheExpire expires any entries that haven't been used recently +func cacheExpire() { + fsCacheMu.Lock() + defer fsCacheMu.Unlock() + now := time.Now() + for fsString, entry := range fsCache { + if now.Sub(entry.lastUsed) > cacheExpireDuration { + delete(fsCache, fsString) } } - return f, err + if len(fsCache) != 0 { + time.AfterFunc(cacheExpireInterval, cacheExpire) + expireRunning = true + } else { + expireRunning = false + } } // GetFsNamed gets a fs.Fs named fsName either from the cache or creates it afresh diff --git a/fs/rc/cache_test.go b/fs/rc/cache_test.go index 07c696bca..6b9bab6be 100644 --- a/fs/rc/cache_test.go +++ b/fs/rc/cache_test.go @@ -2,6 +2,7 @@ package rc import ( "testing" + "time" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fstest/mockfs" @@ -22,7 +23,10 @@ func mockNewFs(t *testing.T) func() { } return func() { fsNewFs = oldFsNewFs - fsCache = map[string]fs.Fs{} + fsCacheMu.Lock() + fsCache = map[string]*cacheEntry{} + expireRunning = false + fsCacheMu.Unlock() } } @@ -42,6 +46,33 @@ func TestGetCachedFs(t *testing.T) { assert.Equal(t, f, f2) } +func TestCacheExpire(t *testing.T) { + defer mockNewFs(t)() + + cacheExpireInterval = time.Millisecond + assert.Equal(t, false, expireRunning) + + _, err := GetCachedFs("/") + require.NoError(t, err) + + fsCacheMu.Lock() + entry := fsCache["/"] + + assert.Equal(t, 1, len(fsCache)) + fsCacheMu.Unlock() + cacheExpire() + fsCacheMu.Lock() + assert.Equal(t, 1, len(fsCache)) + entry.lastUsed = time.Now().Add(-cacheExpireDuration - 60*time.Second) + assert.Equal(t, true, expireRunning) + fsCacheMu.Unlock() + time.Sleep(10 * time.Millisecond) + fsCacheMu.Lock() + assert.Equal(t, false, expireRunning) + assert.Equal(t, 0, len(fsCache)) + fsCacheMu.Unlock() +} + func TestGetFsNamed(t *testing.T) { defer mockNewFs(t)() diff --git a/fs/rc/job_test.go b/fs/rc/job_test.go index 5c9b56446..3017057f8 100644 --- a/fs/rc/job_test.go +++ b/fs/rc/job_test.go @@ -1,6 +1,7 @@ package rc import ( + "runtime" "testing" "time" @@ -19,9 +20,13 @@ func TestJobsKickExpire(t *testing.T) { jobs.expireInterval = time.Millisecond assert.Equal(t, false, jobs.expireRunning) jobs.kickExpire() + jobs.mu.Lock() assert.Equal(t, true, jobs.expireRunning) + jobs.mu.Unlock() time.Sleep(10 * time.Millisecond) + jobs.mu.Lock() assert.Equal(t, false, jobs.expireRunning) + jobs.mu.Unlock() } func TestJobsExpire(t *testing.T) { @@ -37,11 +42,15 @@ func TestJobsExpire(t *testing.T) { assert.Equal(t, 1, len(jobs.jobs)) jobs.Expire() assert.Equal(t, 1, len(jobs.jobs)) + jobs.mu.Lock() job.EndTime = time.Now().Add(-expireDuration - 60*time.Second) assert.Equal(t, true, jobs.expireRunning) + jobs.mu.Unlock() time.Sleep(10 * time.Millisecond) + jobs.mu.Lock() assert.Equal(t, false, jobs.expireRunning) assert.Equal(t, 0, len(jobs.jobs)) + jobs.mu.Unlock() } var noopFn = func(in Params) (Params, error) { @@ -127,13 +136,27 @@ func TestJobRunPanic(t *testing.T) { jobs := newJobs() job := jobs.NewJob(boom, Params{}) <-wait + runtime.Gosched() // yield to make sure job is updated + // Wait a short time for the panic to propagate + for i := uint(0); i < 10; i++ { + job.mu.Lock() + e := job.Error + job.mu.Unlock() + if e != "" { + break + } + time.Sleep(time.Millisecond << i) + } + + job.mu.Lock() assert.Equal(t, false, job.EndTime.IsZero()) assert.Equal(t, Params{}, job.Output) assert.NotEqual(t, 0.0, job.Duration) assert.Equal(t, "panic received: boom", job.Error) assert.Equal(t, false, job.Success) assert.Equal(t, true, job.Finished) + job.mu.Unlock() } func TestJobsNewJob(t *testing.T) {