package vfs import ( "context" "io" "os" "sync" "time" "github.com/pkg/errors" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/chunkedreader" "github.com/rclone/rclone/fs/hash" ) // ReadFileHandle is an open for read file handle on a File type ReadFileHandle struct { baseHandle done func(err error) mu sync.Mutex cond *sync.Cond // cond lock for out of sequence reads closed bool // set if handle has been closed r *accounting.Account readCalled bool // set if read has been called size int64 // size of the object (0 for unknown length) offset int64 // offset of read of o roffset int64 // offset of Read() calls noSeek bool sizeUnknown bool // set if size of source is not known file *File hash *hash.MultiHasher opened bool remote string } // Check interfaces var ( _ io.Reader = (*ReadFileHandle)(nil) _ io.ReaderAt = (*ReadFileHandle)(nil) _ io.Seeker = (*ReadFileHandle)(nil) _ io.Closer = (*ReadFileHandle)(nil) ) func newReadFileHandle(f *File) (*ReadFileHandle, error) { var mhash *hash.MultiHasher var err error o := f.getObject() if !f.VFS().Opt.NoChecksum { hashes := hash.NewHashSet(o.Fs().Hashes().GetOne()) // just pick one hash mhash, err = hash.NewMultiHasherTypes(hashes) if err != nil { fs.Errorf(o.Fs(), "newReadFileHandle hash error: %v", err) } } fh := &ReadFileHandle{ remote: o.Remote(), noSeek: f.VFS().Opt.NoSeek, file: f, hash: mhash, size: nonNegative(o.Size()), sizeUnknown: o.Size() < 0, } fh.cond = sync.NewCond(&fh.mu) return fh, nil } // openPending opens the file if there is a pending open // call with the lock held func (fh *ReadFileHandle) openPending() (err error) { if fh.opened { return nil } o := fh.file.getObject() r, err := chunkedreader.New(context.TODO(), o, int64(fh.file.VFS().Opt.ChunkSize), int64(fh.file.VFS().Opt.ChunkSizeLimit)).Open() if err != nil { return err } tr := accounting.GlobalStats().NewTransfer(o) fh.done = tr.Done fh.r = tr.Account(context.TODO(), r).WithBuffer() // account the transfer fh.opened = true return nil } // String converts it to printable func (fh *ReadFileHandle) String() string { if fh == nil { return "" } fh.mu.Lock() defer fh.mu.Unlock() if fh.file == nil { return "" } return fh.file.String() + " (r)" } // Node returns the Node assocuated with this - satisfies Noder interface func (fh *ReadFileHandle) Node() Node { fh.mu.Lock() defer fh.mu.Unlock() return fh.file } // seek to a new offset // // if reopen is true, then we won't attempt to use an io.Seeker interface // // Must be called with fh.mu held func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) { if fh.noSeek { return ESPIPE } fh.hash = nil if !reopen { ar := fh.r.GetAsyncReader() // try to fulfill the seek with buffer discard if ar != nil && ar.SkipBytes(int(offset-fh.offset)) { fh.offset = offset return nil } } fh.r.StopBuffering() // stop the background reading first oldReader := fh.r.GetReader() r, ok := oldReader.(*chunkedreader.ChunkedReader) if !ok { fs.Logf(fh.remote, "ReadFileHandle.Read expected reader to be a ChunkedReader, got %T", oldReader) reopen = true } if !reopen { fs.Debugf(fh.remote, "ReadFileHandle.seek from %d to %d (fs.RangeSeeker)", fh.offset, offset) _, err = r.RangeSeek(context.TODO(), offset, io.SeekStart, -1) if err != nil { fs.Debugf(fh.remote, "ReadFileHandle.Read fs.RangeSeeker failed: %v", err) return err } } else { fs.Debugf(fh.remote, "ReadFileHandle.seek from %d to %d", fh.offset, offset) // close old one err = oldReader.Close() if err != nil { fs.Debugf(fh.remote, "ReadFileHandle.Read seek close old failed: %v", err) } // re-open with a seek o := fh.file.getObject() r = chunkedreader.New(context.TODO(), o, int64(fh.file.VFS().Opt.ChunkSize), int64(fh.file.VFS().Opt.ChunkSizeLimit)) _, err := r.Seek(offset, 0) if err != nil { fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err) return err } r, err = r.Open() if err != nil { fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err) return err } } fh.r.UpdateReader(context.TODO(), r) fh.offset = offset return nil } // Seek the file - returns ESPIPE if seeking isn't possible func (fh *ReadFileHandle) Seek(offset int64, whence int) (n int64, err error) { fh.mu.Lock() defer fh.mu.Unlock() if fh.noSeek { return 0, ESPIPE } size := fh.size switch whence { case io.SeekStart: fh.roffset = 0 case io.SeekEnd: fh.roffset = size } fh.roffset += offset // we don't check the offset - the next Read will return fh.roffset, nil } // ReadAt reads len(p) bytes into p starting at offset off in the // underlying input source. It returns the number of bytes read (0 <= // n <= len(p)) and any error encountered. // // When ReadAt returns n < len(p), it returns a non-nil error // explaining why more bytes were not returned. In this respect, // ReadAt is stricter than Read. // // Even if ReadAt returns n < len(p), it may use all of p as scratch // space during the call. If some data is available but not len(p) // bytes, ReadAt blocks until either all the data is available or an // error occurs. In this respect ReadAt is different from Read. // // If the n = len(p) bytes returned by ReadAt are at the end of the // input source, ReadAt may return either err == EOF or err == nil. // // If ReadAt is reading from an input source with a seek offset, // ReadAt should not affect nor be affected by the underlying seek // offset. // // Clients of ReadAt can execute parallel ReadAt calls on the same // input source. // // Implementations must not retain p. func (fh *ReadFileHandle) ReadAt(p []byte, off int64) (n int, err error) { fh.mu.Lock() defer fh.mu.Unlock() return fh.readAt(p, off) } // This waits for *poff to equal off or aborts after the timeout. // // Waits here potentially affect all seeks so need to keep them short // // Call with fh.mu Locked func waitSequential(what string, remote string, cond *sync.Cond, maxWait time.Duration, poff *int64, off int64) { var ( timeout = time.NewTimer(maxWait) done = make(chan struct{}) abort = false ) go func() { select { case <-timeout.C: // take the lock to make sure that cond.Wait() is called before // cond.Broadcast. NB cond.L == mu cond.L.Lock() // set abort flag and give all the waiting goroutines a kick on timeout abort = true fs.Debugf(remote, "aborting in-sequence %s wait, off=%d", what, off) cond.Broadcast() cond.L.Unlock() case <-done: } }() for *poff != off && !abort { fs.Debugf(remote, "waiting for in-sequence %s to %d for %v", what, off, maxWait) cond.Wait() } // tidy up end timer close(done) timeout.Stop() if *poff != off { fs.Debugf(remote, "failed to wait for in-sequence %s to %d", what, off) } } // Implementation of ReadAt - call with lock held func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) { // defer log.Trace(fh.remote, "p[%d], off=%d", len(p), off)("n=%d, err=%v", &n, &err) err = fh.openPending() // FIXME pending open could be more efficient in the presence of seek (and retries) if err != nil { return 0, err } // fs.Debugf(fh.remote, "ReadFileHandle.Read size %d offset %d", reqSize, off) if fh.closed { fs.Errorf(fh.remote, "ReadFileHandle.Read error: %v", EBADF) return 0, ECLOSED } maxBuf := 1024 * 1024 if len(p) < maxBuf { maxBuf = len(p) } if gap := off - fh.offset; gap > 0 && gap < int64(8*maxBuf) { waitSequential("read", fh.remote, fh.cond, fh.file.VFS().Opt.ReadWait, &fh.offset, off) } doSeek := off != fh.offset if doSeek && fh.noSeek { return 0, ESPIPE } var newOffset int64 retries := 0 reqSize := len(p) doReopen := false for { if doSeek { // Are we attempting to seek beyond the end of the // file - if so just return EOF leaving the underlying // file in an unchanged state. if off >= fh.size { fs.Debugf(fh.remote, "ReadFileHandle.Read attempt to read beyond end of file: %d > %d", off, fh.size) return 0, io.EOF } // Otherwise do the seek err = fh.seek(off, doReopen) } else { err = nil } if err == nil { if reqSize > 0 { fh.readCalled = true } n, err = io.ReadFull(fh.r, p) newOffset = fh.offset + int64(n) // if err == nil && rand.Intn(10) == 0 { // err = errors.New("random error") // } if err == nil { break } else if (err == io.ErrUnexpectedEOF || err == io.EOF) && (newOffset == fh.size || fh.sizeUnknown) { if fh.sizeUnknown { // size is now known since we have read to the end fh.sizeUnknown = false fh.size = newOffset } // Have read to end of file - reset error err = nil break } } if retries >= fs.Config.LowLevelRetries { break } retries++ fs.Errorf(fh.remote, "ReadFileHandle.Read error: low level retry %d/%d: %v", retries, fs.Config.LowLevelRetries, err) doSeek = true doReopen = true } if err != nil { fs.Errorf(fh.remote, "ReadFileHandle.Read error: %v", err) } else { fh.offset = newOffset // fs.Debugf(fh.remote, "ReadFileHandle.Read OK") if fh.hash != nil { _, err = fh.hash.Write(p[:n]) if err != nil { fs.Errorf(fh.remote, "ReadFileHandle.Read HashError: %v", err) return 0, err } } // If we have no error and we didn't fill the buffer, must be EOF if n != len(p) { err = io.EOF } } fh.cond.Broadcast() // wake everyone up waiting for an in-sequence read return n, err } func (fh *ReadFileHandle) checkHash() error { if fh.hash == nil || !fh.readCalled || fh.offset < fh.size { return nil } o := fh.file.getObject() for hashType, dstSum := range fh.hash.Sums() { srcSum, err := o.Hash(context.TODO(), hashType) if err != nil { if os.IsNotExist(errors.Cause(err)) { // if it was file not found then at // this point we don't care any more continue } return err } if !hash.Equals(dstSum, srcSum) { return errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, dstSum, srcSum) } } return nil } // Read reads up to len(p) bytes into p. It returns the number of bytes read (0 // <= n <= len(p)) and any error encountered. Even if Read returns n < len(p), // it may use all of p as scratch space during the call. If some data is // available but not len(p) bytes, Read conventionally returns what is // available instead of waiting for more. // // When Read encounters an error or end-of-file condition after successfully // reading n > 0 bytes, it returns the number of bytes read. It may return the // (non-nil) error from the same call or return the error (and n == 0) from a // subsequent call. An instance of this general case is that a Reader returning // a non-zero number of bytes at the end of the input stream may return either // err == EOF or err == nil. The next Read should return 0, EOF. // // Callers should always process the n > 0 bytes returned before considering // the error err. Doing so correctly handles I/O errors that happen after // reading some bytes and also both of the allowed EOF behaviors. // // Implementations of Read are discouraged from returning a zero byte count // with a nil error, except when len(p) == 0. Callers should treat a return of // 0 and nil as indicating that nothing happened; in particular it does not // indicate EOF. // // Implementations must not retain p. func (fh *ReadFileHandle) Read(p []byte) (n int, err error) { fh.mu.Lock() defer fh.mu.Unlock() if fh.roffset >= fh.size && !fh.sizeUnknown { return 0, io.EOF } n, err = fh.readAt(p, fh.roffset) fh.roffset += int64(n) return n, err } // close the file handle returning EBADF if it has been // closed already. // // Must be called with fh.mu held func (fh *ReadFileHandle) close() error { if fh.closed { return ECLOSED } fh.closed = true if fh.opened { var err error defer func() { fh.done(err) }() // Close first so that we have hashes err = fh.r.Close() if err != nil { return err } // Now check the hash err = fh.checkHash() if err != nil { return err } } return nil } // Close closes the file func (fh *ReadFileHandle) Close() error { fh.mu.Lock() defer fh.mu.Unlock() return fh.close() } // Flush is called each time the file or directory is closed. // Because there can be multiple file descriptors referring to a // single opened file, Flush can be called multiple times. func (fh *ReadFileHandle) Flush() error { fh.mu.Lock() defer fh.mu.Unlock() if !fh.opened { return nil } // fs.Debugf(fh.remote, "ReadFileHandle.Flush") if err := fh.checkHash(); err != nil { fs.Errorf(fh.remote, "ReadFileHandle.Flush error: %v", err) return err } // fs.Debugf(fh.remote, "ReadFileHandle.Flush OK") return nil } // Release is called when we are finished with the file handle // // It isn't called directly from userspace so the error is ignored by // the kernel func (fh *ReadFileHandle) Release() error { fh.mu.Lock() defer fh.mu.Unlock() if !fh.opened { return nil } if fh.closed { fs.Debugf(fh.remote, "ReadFileHandle.Release nothing to do") return nil } fs.Debugf(fh.remote, "ReadFileHandle.Release closing") err := fh.close() if err != nil { fs.Errorf(fh.remote, "ReadFileHandle.Release error: %v", err) } else { // fs.Debugf(fh.remote, "ReadFileHandle.Release OK") } return err } // Size returns the size of the underlying file func (fh *ReadFileHandle) Size() int64 { fh.mu.Lock() defer fh.mu.Unlock() return fh.size } // Stat returns info about the file func (fh *ReadFileHandle) Stat() (os.FileInfo, error) { fh.mu.Lock() defer fh.mu.Unlock() return fh.file, nil }