package writeback import ( "container/heap" "context" "errors" "fmt" "strings" "sync" "testing" "time" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/vfs/vfscommon" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func newTestWriteBack(t *testing.T) (wb *WriteBack, cancel func()) { ctx, cancel := context.WithCancel(context.Background()) opt := vfscommon.Opt opt.WriteBack = fs.Duration(100 * time.Millisecond) wb = New(ctx, &opt) return wb, cancel } // string for debugging - make a copy and pop the items out in order func (wb *WriteBack) string(t *testing.T) string { wb.mu.Lock() defer wb.mu.Unlock() ws := wb.items // check indexes OK first for i := range ws { assert.Equal(t, i, ws[i].index, ws[i].name) } wsCopy := make(writeBackItems, len(ws)) // deep copy the elements for i := range wsCopy { item := *ws[i] wsCopy[i] = &item } // print them var out []string for wsCopy.Len() > 0 { out = append(out, heap.Pop(&wsCopy).(*writeBackItem).name) } return strings.Join(out, ",") } func TestWriteBackItems(t *testing.T) { // Test the items heap behaves properly now := time.Now() wbItem1 := writeBackItem{name: "one", expiry: now.Add(1 * time.Second)} wbItem2 := writeBackItem{name: "two", expiry: now.Add(2 * time.Second)} wbItem3 := writeBackItem{name: "three", expiry: now.Add(4 * time.Second)} wb := &WriteBack{ items: writeBackItems{}, } heap.Init(&wb.items) assert.Equal(t, "", wb.string(t)) heap.Push(&wb.items, &wbItem2) assert.Equal(t, "two", wb.string(t)) heap.Push(&wb.items, &wbItem3) assert.Equal(t, "two,three", wb.string(t)) heap.Push(&wb.items, &wbItem1) assert.Equal(t, "one,two,three", wb.string(t)) wb.items._update(&wbItem1, now.Add(3*time.Second)) assert.Equal(t, "two,one,three", wb.string(t)) wb.items._update(&wbItem1, now.Add(5*time.Second)) assert.Equal(t, "two,three,one", wb.string(t)) // Set all times the same - should sort in insertion order wb.items._update(&wbItem1, now) wb.items._update(&wbItem2, now) wb.items._update(&wbItem3, now) assert.Equal(t, "one,two,three", wb.string(t)) } func checkOnHeap(t *testing.T, wb *WriteBack, wbItem *writeBackItem) { wb.mu.Lock() defer wb.mu.Unlock() assert.True(t, wbItem.onHeap) for i := range wb.items { if wb.items[i] == wbItem { return } } assert.Failf(t, "expecting %q on heap", wbItem.name) } func checkNotOnHeap(t *testing.T, wb *WriteBack, wbItem *writeBackItem) { wb.mu.Lock() defer wb.mu.Unlock() assert.False(t, wbItem.onHeap) for i := range wb.items { if wb.items[i] == wbItem { t.Errorf("not expecting %q on heap", wbItem.name) } } } func checkInLookup(t *testing.T, wb *WriteBack, wbItem *writeBackItem) { wb.mu.Lock() defer wb.mu.Unlock() assert.Equal(t, wbItem, wb.lookup[wbItem.id]) } func checkNotInLookup(t *testing.T, wb *WriteBack, wbItem *writeBackItem) { wb.mu.Lock() defer wb.mu.Unlock() assert.Nil(t, wb.lookup[wbItem.id]) } func TestWriteBackItemCRUD(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() // _peekItem empty assert.Nil(t, wb._peekItem()) wbItem1 := wb._newItem(0, "one", 10) checkOnHeap(t, wb, wbItem1) checkInLookup(t, wb, wbItem1) wbItem2 := wb._newItem(0, "two", 10) checkOnHeap(t, wb, wbItem2) checkInLookup(t, wb, wbItem2) wbItem3 := wb._newItem(0, "three", 10) checkOnHeap(t, wb, wbItem3) checkInLookup(t, wb, wbItem3) assert.Equal(t, "one,two,three", wb.string(t)) // _delItem wb._delItem(wbItem2) checkOnHeap(t, wb, wbItem2) checkNotInLookup(t, wb, wbItem2) // _addItem wb._addItem(wbItem2) checkOnHeap(t, wb, wbItem2) checkInLookup(t, wb, wbItem2) // _popItem assert.True(t, wbItem1.onHeap) poppedWbItem := wb._popItem() assert.Equal(t, wbItem1, poppedWbItem) checkNotOnHeap(t, wb, wbItem1) checkInLookup(t, wb, wbItem1) assert.Equal(t, "two,three", wb.string(t)) // _pushItem wb._pushItem(wbItem1) checkOnHeap(t, wb, wbItem1) checkInLookup(t, wb, wbItem1) assert.Equal(t, "one,two,three", wb.string(t)) // push twice wb._pushItem(wbItem1) assert.Equal(t, "one,two,three", wb.string(t)) // _peekItem assert.Equal(t, wbItem1, wb._peekItem()) // _removeItem assert.Equal(t, "one,two,three", wb.string(t)) wb._removeItem(wbItem2) checkNotOnHeap(t, wb, wbItem2) checkInLookup(t, wb, wbItem2) assert.Equal(t, "one,three", wb.string(t)) // remove twice wb._removeItem(wbItem2) checkNotOnHeap(t, wb, wbItem2) checkInLookup(t, wb, wbItem2) assert.Equal(t, "one,three", wb.string(t)) } func assertTimerRunning(t *testing.T, wb *WriteBack, running bool) { wb.mu.Lock() if running { assert.NotEqual(t, time.Time{}, wb.expiry) assert.NotNil(t, wb.timer) } else { assert.Equal(t, time.Time{}, wb.expiry) assert.Nil(t, wb.timer) } wb.mu.Unlock() } func TestWriteBackResetTimer(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() // Reset the timer on an empty queue wb._resetTimer() // Check timer is stopped assertTimerRunning(t, wb, false) _ = wb._newItem(0, "three", 10) // Reset the timer on an queue with stuff wb._resetTimer() // Check timer is not stopped assertTimerRunning(t, wb, true) } // A "transfer" for testing type putItem struct { wg sync.WaitGroup mu sync.Mutex t *testing.T started chan struct{} errChan chan error running bool cancelled bool called bool } func newPutItem(t *testing.T) *putItem { return &putItem{ t: t, started: make(chan struct{}, 1), } } // put the object as per PutFn interface func (pi *putItem) put(ctx context.Context) (err error) { pi.wg.Add(1) defer pi.wg.Done() pi.mu.Lock() pi.called = true if pi.running { assert.Fail(pi.t, "upload already running") } pi.running = true pi.errChan = make(chan error, 1) pi.mu.Unlock() pi.started <- struct{}{} cancelled := false select { case err = <-pi.errChan: case <-ctx.Done(): err = ctx.Err() cancelled = true } pi.mu.Lock() pi.running = false pi.cancelled = cancelled pi.mu.Unlock() return err } // finish the "transfer" with the error passed in func (pi *putItem) finish(err error) { pi.mu.Lock() if !pi.running { assert.Fail(pi.t, "upload not running") } pi.mu.Unlock() pi.errChan <- err pi.wg.Wait() } func waitUntilNoTransfers(t *testing.T, wb *WriteBack) { for i := 0; i < 100; i++ { wb.mu.Lock() uploads := wb.uploads wb.mu.Unlock() if uploads == 0 { return } time.Sleep(10 * time.Millisecond) } t.Errorf("failed to wait for transfer to finish") } // This is the happy path with everything working func TestWriteBackAddOK(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() pi := newPutItem(t) var inID Handle wb.SetID(&inID) assert.Equal(t, Handle(1), inID) id := wb.Add(inID, "one", 10, true, pi.put) assert.Equal(t, inID, id) wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) assert.Equal(t, "one", wb.string(t)) <-pi.started checkNotOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) pi.finish(nil) // transfer successful waitUntilNoTransfers(t, wb) checkNotOnHeap(t, wb, wbItem) checkNotInLookup(t, wb, wbItem) } // Now test the upload failing and being retried func TestWriteBackAddFailRetry(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() pi := newPutItem(t) id := wb.Add(0, "one", 10, true, pi.put) wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) assert.Equal(t, "one", wb.string(t)) <-pi.started checkNotOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) pi.finish(errors.New("transfer failed BOOM")) waitUntilNoTransfers(t, wb) checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) // check the retry <-pi.started checkNotOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) pi.finish(nil) // transfer successful waitUntilNoTransfers(t, wb) checkNotOnHeap(t, wb, wbItem) checkNotInLookup(t, wb, wbItem) } // Now test the upload being cancelled by another upload being added func TestWriteBackAddUpdate(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() pi := newPutItem(t) id := wb.Add(0, "one", 10, true, pi.put) wbItem := wb.lookup[id] assert.Equal(t, int64(10), wbItem.size) // check size checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) assert.Equal(t, "one", wb.string(t)) <-pi.started checkNotOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) // Now the upload has started add another one pi2 := newPutItem(t) id2 := wb.Add(id, "one", 20, true, pi2.put) assert.Equal(t, id, id2) assert.Equal(t, int64(20), wbItem.size) // check size has changed checkOnHeap(t, wb, wbItem) // object awaiting writeback time checkInLookup(t, wb, wbItem) // check the previous transfer was cancelled assert.True(t, pi.cancelled) // check the retry <-pi2.started checkNotOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) pi2.finish(nil) // transfer successful waitUntilNoTransfers(t, wb) checkNotOnHeap(t, wb, wbItem) checkNotInLookup(t, wb, wbItem) } // Now test the upload being not cancelled by another upload being added func TestWriteBackAddUpdateNotModified(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() pi := newPutItem(t) id := wb.Add(0, "one", 10, false, pi.put) wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) assert.Equal(t, "one", wb.string(t)) <-pi.started checkNotOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) // Now the upload has started add another one pi2 := newPutItem(t) id2 := wb.Add(id, "one", 10, false, pi2.put) assert.Equal(t, id, id2) checkNotOnHeap(t, wb, wbItem) // object still being transferred checkInLookup(t, wb, wbItem) // Because modified was false above this should not cancel the // transfer assert.False(t, pi.cancelled) // wait for original transfer to finish pi.finish(nil) waitUntilNoTransfers(t, wb) checkNotOnHeap(t, wb, wbItem) checkNotInLookup(t, wb, wbItem) assert.False(t, pi2.called) } // Now test the upload being not cancelled by another upload being // added because the upload hasn't started yet func TestWriteBackAddUpdateNotStarted(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() pi := newPutItem(t) id := wb.Add(0, "one", 10, true, pi.put) wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) assert.Equal(t, "one", wb.string(t)) // Immediately add another upload before the first has started pi2 := newPutItem(t) id2 := wb.Add(id, "one", 10, true, pi2.put) assert.Equal(t, id, id2) checkOnHeap(t, wb, wbItem) // object still awaiting transfer checkInLookup(t, wb, wbItem) // Wait for the upload to start <-pi2.started checkNotOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) // Because modified was false above this should not cancel the // transfer assert.False(t, pi.cancelled) // wait for new transfer to finish pi2.finish(nil) waitUntilNoTransfers(t, wb) checkNotOnHeap(t, wb, wbItem) checkNotInLookup(t, wb, wbItem) assert.False(t, pi.called) } func TestWriteBackGetStats(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() pi := newPutItem(t) wb.Add(0, "one", 10, true, pi.put) inProgress, queued := wb.Stats() assert.Equal(t, queued, 1) assert.Equal(t, inProgress, 0) <-pi.started inProgress, queued = wb.Stats() assert.Equal(t, queued, 0) assert.Equal(t, inProgress, 1) pi.finish(nil) // transfer successful waitUntilNoTransfers(t, wb) inProgress, queued = wb.Stats() assert.Equal(t, queued, 0) assert.Equal(t, inProgress, 0) } func TestWriteBackQueue(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() pi := newPutItem(t) id := wb.Add(0, "one", 10, true, pi.put) queue := wb.Queue() require.Equal(t, 1, len(queue)) assert.Greater(t, queue[0].Expiry, 0.0) assert.Less(t, queue[0].Expiry, 1.0) queue[0].Expiry = 0.0 assert.Equal(t, []QueueInfo{ { Name: "one", Size: 10, Expiry: 0.0, Tries: 0, Delay: 0.1, Uploading: false, ID: id, }, }, queue) <-pi.started queue = wb.Queue() require.Equal(t, 1, len(queue)) assert.Less(t, queue[0].Expiry, 0.0) assert.Greater(t, queue[0].Expiry, -1.0) queue[0].Expiry = 0.0 assert.Equal(t, []QueueInfo{ { Name: "one", Size: 10, Expiry: 0.0, Tries: 1, Delay: 0.1, Uploading: true, ID: id, }, }, queue) pi.finish(nil) // transfer successful waitUntilNoTransfers(t, wb) queue = wb.Queue() assert.Equal(t, []QueueInfo{}, queue) } // Test queuing more than fs.Config.Transfers func TestWriteBackMaxQueue(t *testing.T) { ctx := context.Background() ci := fs.GetConfig(ctx) wb, cancel := newTestWriteBack(t) defer cancel() maxTransfers := ci.Transfers toTransfer := maxTransfers + 2 // put toTransfer things in the queue pis := []*putItem{} for i := 0; i < toTransfer; i++ { pi := newPutItem(t) pis = append(pis, pi) wb.Add(0, fmt.Sprintf("number%d", 1), 10, true, pi.put) } inProgress, queued := wb.Stats() assert.Equal(t, toTransfer, queued) assert.Equal(t, 0, inProgress) // now start the first maxTransfers - this should stop the timer for i := 0; i < maxTransfers; i++ { <-pis[i].started } // timer should be stopped now assertTimerRunning(t, wb, false) inProgress, queued = wb.Stats() assert.Equal(t, toTransfer-maxTransfers, queued) assert.Equal(t, maxTransfers, inProgress) // now finish the first maxTransfers for i := 0; i < maxTransfers; i++ { pis[i].finish(nil) } // now start and finish the remaining transfers one at a time for i := maxTransfers; i < toTransfer; i++ { <-pis[i].started pis[i].finish(nil) } waitUntilNoTransfers(t, wb) inProgress, queued = wb.Stats() assert.Equal(t, queued, 0) assert.Equal(t, inProgress, 0) } func TestWriteBackRename(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() // cancel when not in writeback wb.Rename(1, "nonExistent") // add item pi1 := newPutItem(t) id := wb.Add(0, "one", 10, true, pi1.put) wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) assert.Equal(t, wbItem.name, "one") // rename when not uploading wb.Rename(id, "two") checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) assert.False(t, pi1.cancelled) assert.Equal(t, wbItem.name, "two") // add item pi2 := newPutItem(t) id = wb.Add(id, "two", 10, true, pi2.put) wbItem = wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) assert.Equal(t, wbItem.name, "two") // wait for upload to start <-pi2.started checkNotOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) // rename when uploading - goes back on heap wb.Rename(id, "three") checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) assert.True(t, pi2.cancelled) assert.Equal(t, wbItem.name, "three") } // TestWriteBackRenameDuplicates checks that if we rename an entry and // make a duplicate, we remove the duplicate. func TestWriteBackRenameDuplicates(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() // add item "one", 10 pi1 := newPutItem(t) id1 := wb.Add(0, "one", 10, true, pi1.put) wbItem1 := wb.lookup[id1] checkOnHeap(t, wb, wbItem1) checkInLookup(t, wb, wbItem1) assert.Equal(t, wbItem1.name, "one") <-pi1.started checkNotOnHeap(t, wb, wbItem1) checkInLookup(t, wb, wbItem1) // add item "two" pi2 := newPutItem(t) id2 := wb.Add(0, "two", 10, true, pi2.put) wbItem2 := wb.lookup[id2] checkOnHeap(t, wb, wbItem2) checkInLookup(t, wb, wbItem2) assert.Equal(t, wbItem2.name, "two") <-pi2.started checkNotOnHeap(t, wb, wbItem2) checkInLookup(t, wb, wbItem2) // rename "two" to "one" wb.Rename(id2, "one") // check "one" is cancelled and removed from heap and lookup checkNotOnHeap(t, wb, wbItem1) checkNotInLookup(t, wb, wbItem1) assert.True(t, pi1.cancelled) assert.Equal(t, wbItem1.name, "one") // check "two" (now called "one"!) has been cancelled and will // be retried checkOnHeap(t, wb, wbItem2) checkInLookup(t, wb, wbItem2) assert.True(t, pi2.cancelled) assert.Equal(t, wbItem2.name, "one") } func TestWriteBackCancelUpload(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() // cancel when not in writeback assert.False(t, wb.cancelUpload(1)) // add item pi := newPutItem(t) id := wb.Add(0, "one", 10, true, pi.put) wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) // cancel when not uploading assert.False(t, wb.cancelUpload(id)) checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) // wait for upload to start <-pi.started checkNotOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) // cancel when uploading assert.True(t, wb.cancelUpload(id)) checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) assert.True(t, pi.cancelled) }