From 8506066926fbbcb45829e409f73996b4b149560a Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Sat, 20 Jun 2020 10:26:25 +0100 Subject: [PATCH] vfs: use call after functions in writeback to simplify code This also fixes a bug in the uploader which didn't restart the timer when the queue was empty. --- vfs/vfscache/writeback.go | 75 +++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 42 deletions(-) diff --git a/vfs/vfscache/writeback.go b/vfs/vfscache/writeback.go index c4bcde4ba..84b4efece 100644 --- a/vfs/vfscache/writeback.go +++ b/vfs/vfscache/writeback.go @@ -22,12 +22,13 @@ type putFn func(context.Context) error // writeBack keeps track of the items which need to be written back to the disk at some point type writeBack struct { + ctx context.Context mu sync.Mutex items writeBackItems // priority queue of *writeBackItem - writeBackItems are in here while awaiting transfer only lookup map[*Item]*writeBackItem // for getting a *writeBackItem from a *Item - writeBackItems are in here until cancelled opt *vfscommon.Options // VFS options timer *time.Timer // next scheduled time for the uploader - kick chan struct{} // send on this channel to wake up the uploader + expiry time.Time // time the next item exires or IsZero uploads int // number of uploads in progress } @@ -36,15 +37,12 @@ type writeBack struct { // cancel the context to stop the background goroutine func newWriteBack(ctx context.Context, opt *vfscommon.Options) *writeBack { wb := &writeBack{ + ctx: ctx, items: writeBackItems{}, lookup: make(map[*Item]*writeBackItem), opt: opt, - timer: time.NewTimer(time.Second), - kick: make(chan struct{}, 1), } - wb.timer.Stop() heap.Init(&wb.items) - go wb.uploader(ctx) return wb } @@ -193,13 +191,31 @@ func (wb *writeBack) _peekItem() (wbItem *writeBackItem) { func (wb *writeBack) _resetTimer() { wbItem := wb._peekItem() if wbItem == nil { - wb.timer.Stop() + if wb.expiry.IsZero() { + return + } + wb.expiry = time.Time{} + fs.Debugf(nil, "resetTimer STOP") + if wb.timer != nil { + wb.timer.Stop() + wb.timer = nil + } } else { + if wb.expiry.Equal(wbItem.expiry) { + return + } + wb.expiry = wbItem.expiry dt := time.Until(wbItem.expiry) if dt < 0 { dt = 0 } - wb.timer.Reset(dt) + fs.Debugf(nil, "resetTimer dt=%v", dt) + if wb.timer != nil { + wb.timer.Stop() + } + wb.timer = time.AfterFunc(dt, func() { + wb.processItems(wb.ctx) + }) } } @@ -207,7 +223,7 @@ func (wb *writeBack) _resetTimer() { // is already there. // // if modified is false then it it doesn't a pending upload -func (wb *writeBack) add(item *Item, name string, modified bool, putFn putFn) { +func (wb *writeBack) add(item *Item, name string, modified bool, putFn putFn) *writeBackItem { wb.mu.Lock() defer wb.mu.Unlock() @@ -224,6 +240,7 @@ func (wb *writeBack) add(item *Item, name string, modified bool, putFn putFn) { } wbItem.putFn = putFn wb._resetTimer() + return wbItem } // Call when a file is removed. This cancels a writeback if there is @@ -248,19 +265,6 @@ func (wb *writeBack) remove(item *Item) (found bool) { return found } -// kick the upload checker -// -// This should be called at the end of uploads just in case we had to -// pause uploades because max items was exceeded -// -// call with the lock held -func (wb *writeBack) _kickUploader() { - select { - case wb.kick <- struct{}{}: - default: - } -} - // upload the item - called as a goroutine // // uploading will have been incremented here already @@ -276,7 +280,7 @@ func (wb *writeBack) upload(ctx context.Context, wbItem *writeBackItem) { wbItem.cancel() // cancel context to release resources since store done - fs.Debugf(wbItem.name, "uploading = false %p item %p", wbItem, wbItem.item) + //fs.Debugf(wbItem.name, "uploading = false %p item %p", wbItem, wbItem.item) wbItem.uploading = false wb.uploads-- @@ -301,7 +305,7 @@ func (wb *writeBack) upload(ctx context.Context, wbItem *writeBackItem) { // show that we are done with the item wb._delItem(wbItem) } - wb._kickUploader() + wb._resetTimer() close(wbItem.done) } @@ -348,7 +352,11 @@ func (wb *writeBack) processItems(ctx context.Context) { wb.mu.Lock() defer wb.mu.Unlock() - resetTimer := false + if wb.ctx.Err() != nil { + return + } + + resetTimer := true for wbItem := wb._peekItem(); wbItem != nil && time.Until(wbItem.expiry) <= 0; wbItem = wb._peekItem() { // If reached transfer limit don't restart the timer if wb.uploads >= fs.Config.Transfers { @@ -356,10 +364,9 @@ func (wb *writeBack) processItems(ctx context.Context) { resetTimer = false break } - resetTimer = true // Pop the item, mark as uploading and start the uploader wbItem = wb._popItem() - fs.Debugf(wbItem.name, "uploading = true %p item %p", wbItem, wbItem.item) + //fs.Debugf(wbItem.name, "uploading = true %p item %p", wbItem, wbItem.item) wbItem.uploading = true wb.uploads++ newCtx, cancel := context.WithCancel(ctx) @@ -373,22 +380,6 @@ func (wb *writeBack) processItems(ctx context.Context) { } } -// Looks for items which need writing back and write them back until -// the context is cancelled -func (wb *writeBack) uploader(ctx context.Context) { - for { - select { - case <-ctx.Done(): - wb.timer.Stop() - return - case <-wb.timer.C: - wb.processItems(ctx) - case <-wb.kick: - wb.processItems(ctx) - } - } -} - // return the number of uploads in progress func (wb *writeBack) getStats() (uploadsInProgress, uploadsQueued int) { wb.mu.Lock()