From 59acb9dfa95115072ff8dc1d8c7e0d332ed093da Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 20 Jun 2024 15:34:26 +0100 Subject: [PATCH] rc: add vfs/queue to show the status of the upload queue --- vfs/rc.go | 48 ++++++++++++++++++++++ vfs/vfscache/cache.go | 7 ++++ vfs/vfscache/cache_test.go | 14 +++++++ vfs/vfscache/writeback/writeback.go | 45 ++++++++++++++++++++ vfs/vfscache/writeback/writeback_test.go | 52 ++++++++++++++++++++++++ 5 files changed, 166 insertions(+) diff --git a/vfs/rc.go b/vfs/rc.go index 6e84c26f9..12b7df051 100644 --- a/vfs/rc.go +++ b/vfs/rc.go @@ -437,3 +437,51 @@ func rcStats(ctx context.Context, in rc.Params) (out rc.Params, err error) { } return vfs.Stats(), nil } + +func init() { + rc.Add(rc.Call{ + Path: "vfs/queue", + Title: "Queue info for a VFS.", + Help: strings.ReplaceAll(` +This returns info about the upload queue for the selected VFS. + +This is only useful if |--vfs-cache-mode| > off. If you call it when +the |--vfs-cache-mode| is off, it will return an empty result. + + { + "queued": // an array of files queued for upload + [ + { + "name": "file", // string: name (full path) of the file, + "id": 123, // integer: id of this item in the queue, + "size": 79, // integer: size of the file in bytes + "expiry": 1.5 // float: time until file is eligible for transfer, lowest goes first + "tries": 1, // integer: number of times we have tried to upload + "delay": 5.0, // float: seconds between upload attempts + "uploading": false, // boolean: true if item is being uploaded + }, + ], + } + +The |expiry| time is the time until the file is elegible for being +uploaded in floating point seconds. This may go negative. As rclone +only transfers |--transfers| files at once, only the lowest +|--transfers| expiry times will have |uploading| as |true|. So there +may be files with negative expiry times for which |uploading| is +|false|. + +`, "|", "`") + getVFSHelp, + Fn: rcQueue, + }) +} + +func rcQueue(ctx context.Context, in rc.Params) (out rc.Params, err error) { + vfs, err := getVFS(in) + if err != nil { + return nil, err + } + if vfs.cache == nil { + return nil, nil + } + return vfs.cache.Queue(), nil +} diff --git a/vfs/vfscache/cache.go b/vfs/vfscache/cache.go index 3d34dd82a..dbd408cae 100644 --- a/vfs/vfscache/cache.go +++ b/vfs/vfscache/cache.go @@ -170,6 +170,13 @@ func (c *Cache) Stats() (out rc.Params) { return out } +// Queue returns info about the Cache +func (c *Cache) Queue() (out rc.Params) { + out = make(rc.Params) + out["queue"] = c.writeback.Queue() + return out +} + // createDir creates a directory path, along with any necessary parents func createDir(dir string) error { return file.MkdirAll(dir, 0700) diff --git a/vfs/vfscache/cache_test.go b/vfs/vfscache/cache_test.go index 61357c2f9..20ebee2e7 100644 --- a/vfs/vfscache/cache_test.go +++ b/vfs/vfscache/cache_test.go @@ -14,6 +14,7 @@ import ( "github.com/rclone/rclone/fs/config" "github.com/rclone/rclone/fstest" "github.com/rclone/rclone/lib/diskusage" + "github.com/rclone/rclone/vfs/vfscache/writeback" "github.com/rclone/rclone/vfs/vfscommon" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -727,3 +728,16 @@ func TestCacheStats(t *testing.T) { assert.Equal(t, 0, out["uploadsInProgress"]) assert.Equal(t, 0, out["uploadsQueued"]) } + +func TestCacheQueue(t *testing.T) { + _, c := newTestCache(t) + + out := c.Queue() + + // We've checked the contents of queue in the writeback tests + // Just check it is present here + queue, found := out["queue"] + require.True(t, found) + _, ok := queue.([]writeback.QueueInfo) + require.True(t, ok) +} diff --git a/vfs/vfscache/writeback/writeback.go b/vfs/vfscache/writeback/writeback.go index 33c5616c2..cf3380d4f 100644 --- a/vfs/vfscache/writeback/writeback.go +++ b/vfs/vfscache/writeback/writeback.go @@ -6,6 +6,7 @@ import ( "container/heap" "context" "errors" + "sort" "sync" "sync/atomic" "time" @@ -466,3 +467,47 @@ func (wb *WriteBack) Stats() (uploadsInProgress, uploadsQueued int) { defer wb.mu.Unlock() return wb.uploads, len(wb.items) } + +// QueueInfo is information about an item queued for upload, returned +// by Queue +type QueueInfo struct { + Name string `json:"name"` // name (full path) of the file, + ID Handle `json:"id"` // id of queue item + Size int64 `json:"size"` // integer size of the file in bytes + Expiry float64 `json:"expiry"` // seconds from now which the file is eligible for transfer, oldest goes first + Tries int `json:"tries"` // number of times we have tried to upload + Delay float64 `json:"delay"` // delay between upload attempts (s) + Uploading bool `json:"uploading"` // true if item is being uploaded +} + +// Queue return info about the current upload queue +func (wb *WriteBack) Queue() []QueueInfo { + wb.mu.Lock() + defer wb.mu.Unlock() + + items := make([]QueueInfo, 0, len(wb.lookup)) + now := time.Now() + + // Lookup all the items in no particular order + for _, wbItem := range wb.lookup { + items = append(items, QueueInfo{ + Name: wbItem.name, + ID: wbItem.id, + Size: wbItem.size, + Expiry: wbItem.expiry.Sub(now).Seconds(), + Tries: wbItem.tries, + Delay: wbItem.delay.Seconds(), + Uploading: wbItem.uploading, + }) + } + + // Sort by Uploading first then Expiry + sort.Slice(items, func(i, j int) bool { + if items[i].Uploading != items[j].Uploading { + return items[i].Uploading + } + return items[i].Expiry < items[j].Expiry + }) + + return items +} diff --git a/vfs/vfscache/writeback/writeback_test.go b/vfs/vfscache/writeback/writeback_test.go index 21d8bae7e..b862e283e 100644 --- a/vfs/vfscache/writeback/writeback_test.go +++ b/vfs/vfscache/writeback/writeback_test.go @@ -13,6 +13,7 @@ import ( "github.com/rclone/rclone/fs" "github.com/rclone/rclone/vfs/vfscommon" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func newTestWriteBack(t *testing.T) (wb *WriteBack, cancel func()) { @@ -493,6 +494,57 @@ func TestWriteBackGetStats(t *testing.T) { } +func TestWriteBackQueue(t *testing.T) { + wb, cancel := newTestWriteBack(t) + defer cancel() + + pi := newPutItem(t) + + id := wb.Add(0, "one", 10, true, pi.put) + + queue := wb.Queue() + require.Equal(t, 1, len(queue)) + assert.Greater(t, queue[0].Expiry, 0.0) + assert.Less(t, queue[0].Expiry, 1.0) + queue[0].Expiry = 0.0 + assert.Equal(t, []QueueInfo{ + { + Name: "one", + Size: 10, + Expiry: 0.0, + Tries: 0, + Delay: 0.1, + Uploading: false, + ID: id, + }, + }, queue) + + <-pi.started + + queue = wb.Queue() + require.Equal(t, 1, len(queue)) + assert.Less(t, queue[0].Expiry, 0.0) + assert.Greater(t, queue[0].Expiry, -1.0) + queue[0].Expiry = 0.0 + assert.Equal(t, []QueueInfo{ + { + Name: "one", + Size: 10, + Expiry: 0.0, + Tries: 1, + Delay: 0.1, + Uploading: true, + ID: id, + }, + }, queue) + + pi.finish(nil) // transfer successful + waitUntilNoTransfers(t, wb) + + queue = wb.Queue() + assert.Equal(t, []QueueInfo{}, queue) +} + // Test queuing more than fs.Config.Transfers func TestWriteBackMaxQueue(t *testing.T) { ctx := context.Background()