vfs: keep a record of the file size in the writeback queue

This commit is contained in:
Nick Craig-Wood 2024-06-20 15:20:47 +01:00
parent 842396c8a0
commit bfec159504
3 changed files with 30 additions and 25 deletions

View File

@ -737,7 +737,7 @@ func (item *Item) Close(storeFn StoreFn) (err error) {
item.c.writeback.SetID(&item.writeBackID) item.c.writeback.SetID(&item.writeBackID)
id := item.writeBackID id := item.writeBackID
item.mu.Unlock() item.mu.Unlock()
item.c.writeback.Add(id, item.name, item.modified, func(ctx context.Context) error { item.c.writeback.Add(id, item.name, item.info.Size, item.modified, func(ctx context.Context) error {
return item.store(ctx, storeFn) return item.store(ctx, storeFn)
}) })
item.mu.Lock() item.mu.Lock()

View File

@ -62,6 +62,7 @@ func New(ctx context.Context, opt *vfscommon.Options) *WriteBack {
// writeBack.mu must be held to manipulate this // writeBack.mu must be held to manipulate this
type writeBackItem struct { type writeBackItem struct {
name string // name of the item so we don't have to read it from item name string // name of the item so we don't have to read it from item
size int64 // size of the item so we don't have to read it from item
id Handle // id of the item id Handle // id of the item
index int // index into the priority queue for update index int // index into the priority queue for update
expiry time.Time // When this expires we will write it back expiry time.Time // When this expires we will write it back
@ -135,10 +136,11 @@ func (wb *WriteBack) _newExpiry() time.Time {
// make a new writeBackItem // make a new writeBackItem
// //
// call with the lock held // call with the lock held
func (wb *WriteBack) _newItem(id Handle, name string) *writeBackItem { func (wb *WriteBack) _newItem(id Handle, name string, size int64) *writeBackItem {
wb.SetID(&id) wb.SetID(&id)
wbItem := &writeBackItem{ wbItem := &writeBackItem{
name: name, name: name,
size: size,
expiry: wb._newExpiry(), expiry: wb._newExpiry(),
delay: time.Duration(wb.opt.WriteBack), delay: time.Duration(wb.opt.WriteBack),
id: id, id: id,
@ -256,13 +258,13 @@ func (wb *WriteBack) SetID(pid *Handle) {
// //
// If modified is false then it it doesn't cancel a pending upload if // If modified is false then it it doesn't cancel a pending upload if
// there is one as there is no need. // there is one as there is no need.
func (wb *WriteBack) Add(id Handle, name string, modified bool, putFn PutFn) Handle { func (wb *WriteBack) Add(id Handle, name string, size int64, modified bool, putFn PutFn) Handle {
wb.mu.Lock() wb.mu.Lock()
defer wb.mu.Unlock() defer wb.mu.Unlock()
wbItem, ok := wb.lookup[id] wbItem, ok := wb.lookup[id]
if !ok { if !ok {
wbItem = wb._newItem(id, name) wbItem = wb._newItem(id, name, size)
} else { } else {
if wbItem.uploading && modified { if wbItem.uploading && modified {
// We are uploading already so cancel the upload // We are uploading already so cancel the upload
@ -272,6 +274,7 @@ func (wb *WriteBack) Add(id Handle, name string, modified bool, putFn PutFn) Han
wb.items._update(wbItem, wb._newExpiry()) wb.items._update(wbItem, wb._newExpiry())
} }
wbItem.putFn = putFn wbItem.putFn = putFn
wbItem.size = size
wb._resetTimer() wb._resetTimer()
return wbItem.id return wbItem.id
} }

View File

@ -122,15 +122,15 @@ func TestWriteBackItemCRUD(t *testing.T) {
// _peekItem empty // _peekItem empty
assert.Nil(t, wb._peekItem()) assert.Nil(t, wb._peekItem())
wbItem1 := wb._newItem(0, "one") wbItem1 := wb._newItem(0, "one", 10)
checkOnHeap(t, wb, wbItem1) checkOnHeap(t, wb, wbItem1)
checkInLookup(t, wb, wbItem1) checkInLookup(t, wb, wbItem1)
wbItem2 := wb._newItem(0, "two") wbItem2 := wb._newItem(0, "two", 10)
checkOnHeap(t, wb, wbItem2) checkOnHeap(t, wb, wbItem2)
checkInLookup(t, wb, wbItem2) checkInLookup(t, wb, wbItem2)
wbItem3 := wb._newItem(0, "three") wbItem3 := wb._newItem(0, "three", 10)
checkOnHeap(t, wb, wbItem3) checkOnHeap(t, wb, wbItem3)
checkInLookup(t, wb, wbItem3) checkInLookup(t, wb, wbItem3)
@ -201,7 +201,7 @@ func TestWriteBackResetTimer(t *testing.T) {
// Check timer is stopped // Check timer is stopped
assertTimerRunning(t, wb, false) assertTimerRunning(t, wb, false)
_ = wb._newItem(0, "three") _ = wb._newItem(0, "three", 10)
// Reset the timer on an queue with stuff // Reset the timer on an queue with stuff
wb._resetTimer() wb._resetTimer()
@ -297,7 +297,7 @@ func TestWriteBackAddOK(t *testing.T) {
wb.SetID(&inID) wb.SetID(&inID)
assert.Equal(t, Handle(1), inID) assert.Equal(t, Handle(1), inID)
id := wb.Add(inID, "one", true, pi.put) id := wb.Add(inID, "one", 10, true, pi.put)
assert.Equal(t, inID, id) assert.Equal(t, inID, id)
wbItem := wb.lookup[id] wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem) checkOnHeap(t, wb, wbItem)
@ -321,7 +321,7 @@ func TestWriteBackAddFailRetry(t *testing.T) {
pi := newPutItem(t) pi := newPutItem(t)
id := wb.Add(0, "one", true, pi.put) id := wb.Add(0, "one", 10, true, pi.put)
wbItem := wb.lookup[id] wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem) checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem) checkInLookup(t, wb, wbItem)
@ -354,8 +354,9 @@ func TestWriteBackAddUpdate(t *testing.T) {
pi := newPutItem(t) pi := newPutItem(t)
id := wb.Add(0, "one", true, pi.put) id := wb.Add(0, "one", 10, true, pi.put)
wbItem := wb.lookup[id] wbItem := wb.lookup[id]
assert.Equal(t, int64(10), wbItem.size) // check size
checkOnHeap(t, wb, wbItem) checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem) checkInLookup(t, wb, wbItem)
assert.Equal(t, "one", wb.string(t)) assert.Equal(t, "one", wb.string(t))
@ -367,8 +368,9 @@ func TestWriteBackAddUpdate(t *testing.T) {
// Now the upload has started add another one // Now the upload has started add another one
pi2 := newPutItem(t) pi2 := newPutItem(t)
id2 := wb.Add(id, "one", true, pi2.put) id2 := wb.Add(id, "one", 20, true, pi2.put)
assert.Equal(t, id, id2) assert.Equal(t, id, id2)
assert.Equal(t, int64(20), wbItem.size) // check size has changed
checkOnHeap(t, wb, wbItem) // object awaiting writeback time checkOnHeap(t, wb, wbItem) // object awaiting writeback time
checkInLookup(t, wb, wbItem) checkInLookup(t, wb, wbItem)
@ -393,7 +395,7 @@ func TestWriteBackAddUpdateNotModified(t *testing.T) {
pi := newPutItem(t) pi := newPutItem(t)
id := wb.Add(0, "one", false, pi.put) id := wb.Add(0, "one", 10, false, pi.put)
wbItem := wb.lookup[id] wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem) checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem) checkInLookup(t, wb, wbItem)
@ -406,7 +408,7 @@ func TestWriteBackAddUpdateNotModified(t *testing.T) {
// Now the upload has started add another one // Now the upload has started add another one
pi2 := newPutItem(t) pi2 := newPutItem(t)
id2 := wb.Add(id, "one", false, pi2.put) id2 := wb.Add(id, "one", 10, false, pi2.put)
assert.Equal(t, id, id2) assert.Equal(t, id, id2)
checkNotOnHeap(t, wb, wbItem) // object still being transferred checkNotOnHeap(t, wb, wbItem) // object still being transferred
checkInLookup(t, wb, wbItem) checkInLookup(t, wb, wbItem)
@ -432,7 +434,7 @@ func TestWriteBackAddUpdateNotStarted(t *testing.T) {
pi := newPutItem(t) pi := newPutItem(t)
id := wb.Add(0, "one", true, pi.put) id := wb.Add(0, "one", 10, true, pi.put)
wbItem := wb.lookup[id] wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem) checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem) checkInLookup(t, wb, wbItem)
@ -441,7 +443,7 @@ func TestWriteBackAddUpdateNotStarted(t *testing.T) {
// Immediately add another upload before the first has started // Immediately add another upload before the first has started
pi2 := newPutItem(t) pi2 := newPutItem(t)
id2 := wb.Add(id, "one", true, pi2.put) id2 := wb.Add(id, "one", 10, true, pi2.put)
assert.Equal(t, id, id2) assert.Equal(t, id, id2)
checkOnHeap(t, wb, wbItem) // object still awaiting transfer checkOnHeap(t, wb, wbItem) // object still awaiting transfer
checkInLookup(t, wb, wbItem) checkInLookup(t, wb, wbItem)
@ -470,7 +472,7 @@ func TestWriteBackGetStats(t *testing.T) {
pi := newPutItem(t) pi := newPutItem(t)
wb.Add(0, "one", true, pi.put) wb.Add(0, "one", 10, true, pi.put)
inProgress, queued := wb.Stats() inProgress, queued := wb.Stats()
assert.Equal(t, queued, 1) assert.Equal(t, queued, 1)
@ -506,7 +508,7 @@ func TestWriteBackMaxQueue(t *testing.T) {
for i := 0; i < toTransfer; i++ { for i := 0; i < toTransfer; i++ {
pi := newPutItem(t) pi := newPutItem(t)
pis = append(pis, pi) pis = append(pis, pi)
wb.Add(0, fmt.Sprintf("number%d", 1), true, pi.put) wb.Add(0, fmt.Sprintf("number%d", 1), 10, true, pi.put)
} }
inProgress, queued := wb.Stats() inProgress, queued := wb.Stats()
@ -551,7 +553,7 @@ func TestWriteBackRename(t *testing.T) {
// add item // add item
pi1 := newPutItem(t) pi1 := newPutItem(t)
id := wb.Add(0, "one", true, pi1.put) id := wb.Add(0, "one", 10, true, pi1.put)
wbItem := wb.lookup[id] wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem) checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem) checkInLookup(t, wb, wbItem)
@ -566,7 +568,7 @@ func TestWriteBackRename(t *testing.T) {
// add item // add item
pi2 := newPutItem(t) pi2 := newPutItem(t)
id = wb.Add(id, "two", true, pi2.put) id = wb.Add(id, "two", 10, true, pi2.put)
wbItem = wb.lookup[id] wbItem = wb.lookup[id]
checkOnHeap(t, wb, wbItem) checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem) checkInLookup(t, wb, wbItem)
@ -591,9 +593,9 @@ func TestWriteBackRenameDuplicates(t *testing.T) {
wb, cancel := newTestWriteBack(t) wb, cancel := newTestWriteBack(t)
defer cancel() defer cancel()
// add item "one" // add item "one", 10
pi1 := newPutItem(t) pi1 := newPutItem(t)
id1 := wb.Add(0, "one", true, pi1.put) id1 := wb.Add(0, "one", 10, true, pi1.put)
wbItem1 := wb.lookup[id1] wbItem1 := wb.lookup[id1]
checkOnHeap(t, wb, wbItem1) checkOnHeap(t, wb, wbItem1)
checkInLookup(t, wb, wbItem1) checkInLookup(t, wb, wbItem1)
@ -605,7 +607,7 @@ func TestWriteBackRenameDuplicates(t *testing.T) {
// add item "two" // add item "two"
pi2 := newPutItem(t) pi2 := newPutItem(t)
id2 := wb.Add(0, "two", true, pi2.put) id2 := wb.Add(0, "two", 10, true, pi2.put)
wbItem2 := wb.lookup[id2] wbItem2 := wb.lookup[id2]
checkOnHeap(t, wb, wbItem2) checkOnHeap(t, wb, wbItem2)
checkInLookup(t, wb, wbItem2) checkInLookup(t, wb, wbItem2)
@ -641,7 +643,7 @@ func TestWriteBackCancelUpload(t *testing.T) {
// add item // add item
pi := newPutItem(t) pi := newPutItem(t)
id := wb.Add(0, "one", true, pi.put) id := wb.Add(0, "one", 10, true, pi.put)
wbItem := wb.lookup[id] wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem) checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem) checkInLookup(t, wb, wbItem)