diff --git a/backend/dropbox/dropbox.go b/backend/dropbox/dropbox.go index 0bf5c4567..58df03367 100755 --- a/backend/dropbox/dropbox.go +++ b/backend/dropbox/dropbox.go @@ -34,6 +34,7 @@ import ( "time" "github.com/dropbox/dropbox-sdk-go-unofficial/dropbox" + "github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/async" "github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/auth" "github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/common" "github.com/dropbox/dropbox-sdk-go-unofficial/dropbox/files" @@ -222,6 +223,7 @@ type Object struct { type batcher struct { f *Fs // Fs this batch is part of mu sync.Mutex // lock for vars below + commitMu sync.Mutex // lock for waiting for batch maxBatch int // maximum size for batch active int // number of batches being sent items []*files.UploadSessionFinishArg // current uncommitted files @@ -265,23 +267,51 @@ func (b *batcher) End(started bool) error { return b._commit(false) } +// Waits for the batch to complete - call with batchMu held +func (b *batcher) _waitForBatchResult(res *files.UploadSessionFinishBatchLaunch) (batchResult *files.UploadSessionFinishBatchResult, err error) { + if res.AsyncJobId == "" { + return res.Complete, nil + } + var batchStatus *files.UploadSessionFinishBatchJobStatus + sleepTime := time.Second + const maxTries = 120 + for try := 1; try <= maxTries; try++ { + err = b.f.pacer.Call(func() (bool, error) { + batchStatus, err = b.f.srv.UploadSessionFinishBatchCheck(&async.PollArg{ + AsyncJobId: res.AsyncJobId, + }) + return shouldRetry(err) + }) + if err != nil { + fs.Errorf(b.f, "failed to wait for batch: %v", err) + break + } + if batchStatus.Tag == "complete" { + break + } + fs.Debugf(b.f, "sleeping for %v to wait for batch to complete, try %d/%d", sleepTime, try, maxTries) + time.Sleep(sleepTime) + } + return batchStatus.Complete, nil +} + // commit a batch - call with batchMu held // // if finalizing is true then it doesn't unregister Finalize as this // causes a deadlock during finalization. func (b *batcher) _commit(finalizing bool) (err error) { + b.commitMu.Lock() batch := "batch" if finalizing { batch = "last batch" } fs.Debugf(b.f, "comitting %s length %d", batch, len(b.items)) - // FIXME this ignores the objects returned var arg = &files.UploadSessionFinishBatchArg{ Entries: b.items, } - //var res *file.UploadSessionFinishBatchLaunch + var res *files.UploadSessionFinishBatchLaunch err = b.f.pacer.Call(func() (bool, error) { - _, err = b.f.srv.UploadSessionFinishBatch(arg) + res, err = b.f.srv.UploadSessionFinishBatch(arg) // If error is insufficient space then don't retry if e, ok := err.(files.UploadSessionFinishAPIError); ok { if e.EndpointError != nil && e.EndpointError.Path != nil && e.EndpointError.Path.Tag == files.WriteErrorInsufficientSpace { @@ -293,14 +323,32 @@ func (b *batcher) _commit(finalizing bool) (err error) { return err != nil, err }) if err != nil { + b.commitMu.Unlock() return err } - // Show batches are empty + + // Clear batch b.items = nil - if !finalizing { - atexit.Unregister(b.atexit) - b.atexit = nil + + // If finalizing, don't unregister or get result + if finalizing { + b.commitMu.Unlock() + return nil } + + // Unregister the atexit since queue is empty + atexit.Unregister(b.atexit) + b.atexit = nil + + // Wait for the batch to finish before we proceed in the background + go func() { + defer b.commitMu.Unlock() + _, err = b._waitForBatchResult(res) + if err != nil { + fs.Errorf(b.f, "Error waiting for batch to finish: %v", err) + } + }() + return nil } @@ -1203,7 +1251,7 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size fs.Debugf(o, "Streaming chunk %d/%d", cur, cur) } else if chunks == 0 { fs.Debugf(o, "Streaming chunk %d/unknown", cur) - } else { + } else if chunks != 1 { fs.Debugf(o, "Uploading chunk %d/%d", cur, chunks) } }