diff --git a/vfs/write.go b/vfs/write.go index 68ad2b7b9..7939a4823 100644 --- a/vfs/write.go +++ b/vfs/write.go @@ -5,6 +5,7 @@ import ( "io" "os" "sync" + "sync/atomic" "time" "github.com/rclone/rclone/fs" @@ -15,7 +16,8 @@ import ( type WriteFileHandle struct { baseHandle mu sync.Mutex - closed bool // set if handle has been closed + cond *sync.Cond // cond lock for out of sequence writes + closed bool // set if handle has been closed remote string pipeWriter *io.PipeWriter o fs.Object @@ -42,6 +44,7 @@ func newWriteFileHandle(d *Dir, f *File, remote string, flags int) (*WriteFileHa result: make(chan error, 1), file: f, } + fh.cond = sync.NewCond(&fh.mu) fh.file.addWriter(fh) return fh, nil } @@ -127,14 +130,28 @@ func (fh *WriteFileHandle) writeAt(p []byte, off int64) (n int, err error) { fs.Errorf(fh.remote, "WriteFileHandle.Write: error: %v", EBADF) return 0, ECLOSED } - // Wait a short time for sequential writes to appear - const maxTries = 1000 - const sleepTime = 1 * time.Millisecond - for try := 1; fh.offset != off && try <= maxTries; try++ { - //fs.Debugf(fh.remote, "waiting for in sequence write %d/%d", try, maxTries) - fh.mu.Unlock() - time.Sleep(sleepTime) - fh.mu.Lock() + if fh.offset != off { + // Set a background timer so we don't wait forever + timeout := time.NewTimer(10 * time.Second) + done := make(chan struct{}) + abort := int32(0) + go func() { + select { + case <-timeout.C: + // set abort flag an give all the waiting goroutines a kick on timeout + atomic.StoreInt32(&abort, 1) + fh.cond.Broadcast() + case <-done: + } + }() + // Wait for an in-sequence write or abort + for fh.offset != off && atomic.LoadInt32(&abort) == 0 { + // fs.Debugf(fh.remote, "waiting for in-sequence write to %d", off) + fh.cond.Wait() + } + // tidy up end timer + close(done) + timeout.Stop() } if fh.offset != off { fs.Errorf(fh.remote, "WriteFileHandle.Write: can't seek in file without --vfs-cache-mode >= writes") @@ -152,6 +169,7 @@ func (fh *WriteFileHandle) writeAt(p []byte, off int64) (n int, err error) { return 0, err } // fs.Debugf(fh.remote, "WriteFileHandle.Write OK (%d bytes written)", n) + fh.cond.Broadcast() // wake everyone up waiting for an in-sequence read return n, nil }