diff --git a/vfs/read.go b/vfs/read.go index 4f2cac54d..880314e5a 100644 --- a/vfs/read.go +++ b/vfs/read.go @@ -5,6 +5,8 @@ import ( "io" "os" "sync" + "sync/atomic" + "time" "github.com/pkg/errors" "github.com/rclone/rclone/fs" @@ -18,7 +20,8 @@ type ReadFileHandle struct { baseHandle done func(err error) mu sync.Mutex - closed bool // set if handle has been closed + cond *sync.Cond // cond lock for out of sequence reads + closed bool // set if handle has been closed r *accounting.Account readCalled bool // set if read has been called size int64 // size of the object (0 for unknown length) @@ -60,6 +63,7 @@ func newReadFileHandle(f *File) (*ReadFileHandle, error) { size: nonNegative(o.Size()), sizeUnknown: o.Size() < 0, } + fh.cond = sync.NewCond(&fh.mu) return fh, nil } @@ -221,6 +225,40 @@ func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) { fs.Errorf(fh.remote, "ReadFileHandle.Read error: %v", EBADF) return 0, ECLOSED } + maxBuf := 1024 * 1024 + if len(p) < maxBuf { + maxBuf = len(p) + } + if gap := off - fh.offset; gap > 0 && gap < int64(8*maxBuf) { + // Set a background timer so we don't wait for long + // Waits here potentially affect all seeks so need to keep them short + // This time here was made by finding the smallest when mounting a local backend + // that didn't cause seeks. + const maxWait = 5 * time.Millisecond + timeout := time.NewTimer(maxWait) + done := make(chan struct{}) + abort := int32(0) + go func() { + select { + case <-timeout.C: + // set abort flag an give all the waiting goroutines a kick on timeout + atomic.StoreInt32(&abort, 1) + fs.Debugf(fh.remote, "aborting in-sequence read wait, off=%d", off) + fh.cond.Broadcast() + case <-done: + } + }() + for fh.offset != off && atomic.LoadInt32(&abort) == 0 { + fs.Debugf(fh.remote, "waiting for in-sequence read to %d for %v", off, maxWait) + fh.cond.Wait() + } + // tidy up end timer + close(done) + timeout.Stop() + if fh.offset != off { + fs.Debugf(fh.remote, "failed to wait for in-sequence read to %d", off) + } + } doSeek := off != fh.offset if doSeek && fh.noSeek { return 0, ESPIPE @@ -292,6 +330,7 @@ func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) { err = io.EOF } } + fh.cond.Broadcast() // wake everyone up waiting for an in-sequence read return n, err }