mirror of
https://github.com/rclone/rclone.git
synced 2024-11-25 09:41:44 +08:00
vfs: decouple writeback from Item so it can be tested
This commit is contained in:
parent
7e4ba54608
commit
43018973ac
|
@ -608,7 +608,9 @@ func (item *Item) Close(storeFn StoreFn) (err error) {
|
|||
} else {
|
||||
// asynchronous writeback
|
||||
item.mu.Unlock()
|
||||
item.c.writeback.add(item, item.name, item.modified, storeFn)
|
||||
item.c.writeback.add(item, item.name, item.modified, func(ctx context.Context) error {
|
||||
return item.store(ctx, storeFn)
|
||||
})
|
||||
item.mu.Lock()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,9 @@ const (
|
|||
maxUploadDelay = 5 * time.Minute // max delay betwen upload attempts
|
||||
)
|
||||
|
||||
// putFn is the interface that item provides to store the data
|
||||
type putFn func(context.Context) error
|
||||
|
||||
// writeBack keeps track of the items which need to be written back to the disk at some point
|
||||
type writeBack struct {
|
||||
mu sync.Mutex
|
||||
|
@ -61,7 +64,7 @@ type writeBackItem struct {
|
|||
onHeap bool // true if this item is on the items heap
|
||||
cancel context.CancelFunc // To cancel the upload with
|
||||
done chan struct{} // closed when the cancellation completes
|
||||
storeFn StoreFn // To write the object back with
|
||||
putFn putFn // To write the object data
|
||||
tries int // number of times we have tried to upload
|
||||
delay time.Duration // delay between upload attempts
|
||||
}
|
||||
|
@ -204,7 +207,7 @@ func (wb *writeBack) _resetTimer() {
|
|||
// is already there.
|
||||
//
|
||||
// if modified is false then it it doesn't a pending upload
|
||||
func (wb *writeBack) add(item *Item, name string, modified bool, storeFn StoreFn) {
|
||||
func (wb *writeBack) add(item *Item, name string, modified bool, putFn putFn) {
|
||||
wb.mu.Lock()
|
||||
defer wb.mu.Unlock()
|
||||
|
||||
|
@ -219,7 +222,7 @@ func (wb *writeBack) add(item *Item, name string, modified bool, storeFn StoreFn
|
|||
// Kick the timer on
|
||||
wb.items._update(wbItem, wb._newExpiry())
|
||||
}
|
||||
wbItem.storeFn = storeFn
|
||||
wbItem.putFn = putFn
|
||||
wb._resetTimer()
|
||||
}
|
||||
|
||||
|
@ -264,11 +267,11 @@ func (wb *writeBack) _kickUploader() {
|
|||
func (wb *writeBack) upload(ctx context.Context, wbItem *writeBackItem) {
|
||||
wb.mu.Lock()
|
||||
defer wb.mu.Unlock()
|
||||
item := wbItem.item
|
||||
putFn := wbItem.putFn
|
||||
wbItem.tries++
|
||||
|
||||
wb.mu.Unlock()
|
||||
err := item.store(ctx, wbItem.storeFn)
|
||||
err := putFn(ctx)
|
||||
wb.mu.Lock()
|
||||
|
||||
wbItem.cancel() // cancel context to release resources since store done
|
||||
|
|
Loading…
Reference in New Issue
Block a user