diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index e5357dc0b..d9e83be42 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -11,7 +11,6 @@ import ( "github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/lib/atexit" "github.com/rclone/rclone/lib/multipart" - "github.com/rclone/rclone/lib/readers" "golang.org/x/sync/errgroup" ) @@ -53,13 +52,13 @@ func doMultiThreadCopy(ctx context.Context, f fs.Fs, src fs.Object) bool { // state for a multi-thread copy type multiThreadCopyState struct { - ctx context.Context - partSize int64 - size int64 - src fs.Object - acc *accounting.Account - numChunks int - noSeek bool // set if sure the receiving fs won't seek the input + ctx context.Context + partSize int64 + size int64 + src fs.Object + acc *accounting.Account + numChunks int + noBuffering bool // set to read the input without buffering } // Copy a single chunk into place @@ -88,10 +87,11 @@ func (mc *multiThreadCopyState) copyChunk(ctx context.Context, chunk int, writer defer fs.CheckClose(rc, &err) var rs io.ReadSeeker - if mc.noSeek { + if mc.noBuffering { // Read directly if we are sure we aren't going to seek // and account with accounting - rs = readers.NoSeeker{Reader: mc.acc.WrapStream(rc)} + rc.SetAccounting(mc.acc.AccountRead) + rs = rc } else { // Read the chunk into buffered reader rw := multipart.NewRW() @@ -130,15 +130,25 @@ func calculateNumChunks(size int64, chunkSize int64) int { func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, concurrency int, tr *accounting.Transfer, options ...fs.OpenOption) (newDst fs.Object, err error) { openChunkWriter := f.Features().OpenChunkWriter ci := fs.GetConfig(ctx) - noseek := false + noBuffering := false if openChunkWriter == nil { openWriterAt := f.Features().OpenWriterAt if openWriterAt == nil { return nil, errors.New("multi-thread copy: neither OpenChunkWriter nor OpenWriterAt supported") } openChunkWriter = openChunkWriterFromOpenWriterAt(openWriterAt, int64(ci.MultiThreadChunkSize), int64(ci.MultiThreadWriteBufferSize), f) - // We don't seek the chunks with OpenWriterAt - noseek = true + // If we are using OpenWriterAt we don't seek the chunks so don't need to buffer + fs.Debugf(src, "multi-thread copy: disabling buffering because destination uses OpenWriterAt") + noBuffering = true + } else if src.Fs().Features().IsLocal { + // If the source fs is local we don't need to buffer + fs.Debugf(src, "multi-thread copy: disabling buffering because source is local disk") + noBuffering = true + } else if f.Features().ChunkWriterDoesntSeek { + // If the destination Fs promises not to seek its chunks + // (except for retries) then we don't need buffering. + fs.Debugf(src, "multi-thread copy: disabling buffering because destination has set ChunkWriterDoesntSeek") + noBuffering = true } if src.Size() < 0 { @@ -192,12 +202,12 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, g.SetLimit(concurrency) mc := &multiThreadCopyState{ - ctx: gCtx, - size: src.Size(), - src: src, - partSize: info.ChunkSize, - numChunks: numChunks, - noSeek: noseek, + ctx: gCtx, + size: src.Size(), + src: src, + partSize: info.ChunkSize, + numChunks: numChunks, + noBuffering: noBuffering, } // Make accounting