dropbox: make sure we wait for previous batch to finish before starting a new one

This commit is contained in:
Nick Craig-Wood 2020-09-08 15:00:45 +01:00
parent 56e8d75cab
commit c741c02fb6

View File

@ -34,6 +34,7 @@ import (
"time" "time"
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox" "github.com/dropbox/dropbox-sdk-go-unofficial/dropbox"
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/async"
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/auth" "github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/auth"
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/common" "github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/common"
"github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/files" "github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/files"
@ -222,6 +223,7 @@ type Object struct {
type batcher struct { type batcher struct {
f *Fs // Fs this batch is part of f *Fs // Fs this batch is part of
mu sync.Mutex // lock for vars below mu sync.Mutex // lock for vars below
commitMu sync.Mutex // lock for waiting for batch
maxBatch int // maximum size for batch maxBatch int // maximum size for batch
active int // number of batches being sent active int // number of batches being sent
items []*files.UploadSessionFinishArg // current uncommitted files items []*files.UploadSessionFinishArg // current uncommitted files
@ -265,23 +267,51 @@ func (b *batcher) End(started bool) error {
return b._commit(false) return b._commit(false)
} }
// Waits for the batch to complete - call with batchMu held
func (b *batcher) _waitForBatchResult(res *files.UploadSessionFinishBatchLaunch) (batchResult *files.UploadSessionFinishBatchResult, err error) {
if res.AsyncJobId == "" {
return res.Complete, nil
}
var batchStatus *files.UploadSessionFinishBatchJobStatus
sleepTime := time.Second
const maxTries = 120
for try := 1; try <= maxTries; try++ {
err = b.f.pacer.Call(func() (bool, error) {
batchStatus, err = b.f.srv.UploadSessionFinishBatchCheck(&async.PollArg{
AsyncJobId: res.AsyncJobId,
})
return shouldRetry(err)
})
if err != nil {
fs.Errorf(b.f, "failed to wait for batch: %v", err)
break
}
if batchStatus.Tag == "complete" {
break
}
fs.Debugf(b.f, "sleeping for %v to wait for batch to complete, try %d/%d", sleepTime, try, maxTries)
time.Sleep(sleepTime)
}
return batchStatus.Complete, nil
}
// commit a batch - call with batchMu held // commit a batch - call with batchMu held
// //
// if finalizing is true then it doesn't unregister Finalize as this // if finalizing is true then it doesn't unregister Finalize as this
// causes a deadlock during finalization. // causes a deadlock during finalization.
func (b *batcher) _commit(finalizing bool) (err error) { func (b *batcher) _commit(finalizing bool) (err error) {
b.commitMu.Lock()
batch := "batch" batch := "batch"
if finalizing { if finalizing {
batch = "last batch" batch = "last batch"
} }
fs.Debugf(b.f, "comitting %s length %d", batch, len(b.items)) fs.Debugf(b.f, "comitting %s length %d", batch, len(b.items))
// FIXME this ignores the objects returned
var arg = &files.UploadSessionFinishBatchArg{ var arg = &files.UploadSessionFinishBatchArg{
Entries: b.items, Entries: b.items,
} }
//var res *file.UploadSessionFinishBatchLaunch var res *files.UploadSessionFinishBatchLaunch
err = b.f.pacer.Call(func() (bool, error) { err = b.f.pacer.Call(func() (bool, error) {
_, err = b.f.srv.UploadSessionFinishBatch(arg) res, err = b.f.srv.UploadSessionFinishBatch(arg)
// If error is insufficient space then don't retry // If error is insufficient space then don't retry
if e, ok := err.(files.UploadSessionFinishAPIError); ok { if e, ok := err.(files.UploadSessionFinishAPIError); ok {
if e.EndpointError != nil && e.EndpointError.Path != nil && e.EndpointError.Path.Tag == files.WriteErrorInsufficientSpace { if e.EndpointError != nil && e.EndpointError.Path != nil && e.EndpointError.Path.Tag == files.WriteErrorInsufficientSpace {
@ -293,14 +323,32 @@ func (b *batcher) _commit(finalizing bool) (err error) {
return err != nil, err return err != nil, err
}) })
if err != nil { if err != nil {
b.commitMu.Unlock()
return err return err
} }
// Show batches are empty
// Clear batch
b.items = nil b.items = nil
if !finalizing {
atexit.Unregister(b.atexit) // If finalizing, don't unregister or get result
b.atexit = nil if finalizing {
b.commitMu.Unlock()
return nil
} }
// Unregister the atexit since queue is empty
atexit.Unregister(b.atexit)
b.atexit = nil
// Wait for the batch to finish before we proceed in the background
go func() {
defer b.commitMu.Unlock()
_, err = b._waitForBatchResult(res)
if err != nil {
fs.Errorf(b.f, "Error waiting for batch to finish: %v", err)
}
}()
return nil return nil
} }
@ -1203,7 +1251,7 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
fs.Debugf(o, "Streaming chunk %d/%d", cur, cur) fs.Debugf(o, "Streaming chunk %d/%d", cur, cur)
} else if chunks == 0 { } else if chunks == 0 {
fs.Debugf(o, "Streaming chunk %d/unknown", cur) fs.Debugf(o, "Streaming chunk %d/unknown", cur)
} else { } else if chunks != 1 {
fs.Debugf(o, "Uploading chunk %d/%d", cur, chunks) fs.Debugf(o, "Uploading chunk %d/%d", cur, chunks)
} }
} }