mirror of
https://github.com/rclone/rclone.git
synced 2025-01-20 03:32:44 +08:00
multithread: refactor multithread operation to use OpenChunkWriter if available #7056
If the feature OpenChunkWriter is not available, multithread tries to create an adapter from OpenWriterAt to OpenChunkWriter.
This commit is contained in:
parent
7701d1d33d
commit
181fecaec3
|
@ -9,13 +9,13 @@ import (
|
||||||
|
|
||||||
"github.com/rclone/rclone/fs"
|
"github.com/rclone/rclone/fs"
|
||||||
"github.com/rclone/rclone/fs/accounting"
|
"github.com/rclone/rclone/fs/accounting"
|
||||||
|
"github.com/rclone/rclone/lib/readers"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
"golang.org/x/sync/semaphore"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
multithreadChunkSize = 64 << 10
|
multithreadChunkSize = 64 << 10
|
||||||
multithreadChunkSizeMask = multithreadChunkSize - 1
|
|
||||||
multithreadReadBufferSize = 32 * 1024
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// An offsetWriter maps writes at offset base to offset base+off in the underlying writer.
|
// An offsetWriter maps writes at offset base to offset base+off in the underlying writer.
|
||||||
|
@ -60,7 +60,7 @@ func doMultiThreadCopy(ctx context.Context, f fs.Fs, src fs.Object) bool {
|
||||||
}
|
}
|
||||||
// ...destination doesn't support it
|
// ...destination doesn't support it
|
||||||
dstFeatures := f.Features()
|
dstFeatures := f.Features()
|
||||||
if dstFeatures.OpenWriterAt == nil {
|
if dstFeatures.OpenChunkWriter == nil && dstFeatures.OpenWriterAt == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
// ...if --multi-thread-streams not in use and source and
|
// ...if --multi-thread-streams not in use and source and
|
||||||
|
@ -76,18 +76,17 @@ type multiThreadCopyState struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
partSize int64
|
partSize int64
|
||||||
size int64
|
size int64
|
||||||
wc fs.WriterAtCloser
|
|
||||||
src fs.Object
|
src fs.Object
|
||||||
acc *accounting.Account
|
acc *accounting.Account
|
||||||
streams int
|
streams int
|
||||||
|
numChunks int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy a single stream into place
|
// Copy a single stream into place
|
||||||
func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err error) {
|
func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int, writer fs.ChunkWriter) (err error) {
|
||||||
ci := fs.GetConfig(ctx)
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d failed: %v", stream+1, mc.streams, err)
|
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d failed: %v", stream+1, mc.numChunks, err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
start := int64(stream) * mc.partSize
|
start := int64(stream) * mc.partSize
|
||||||
|
@ -99,7 +98,7 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err
|
||||||
end = mc.size
|
end = mc.size
|
||||||
}
|
}
|
||||||
|
|
||||||
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v starting", stream+1, mc.streams, start, end, fs.SizeSuffix(end-start))
|
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v starting", stream+1, mc.numChunks, start, end, fs.SizeSuffix(end-start))
|
||||||
|
|
||||||
rc, err := Open(ctx, mc.src, &fs.RangeOption{Start: start, End: end - 1})
|
rc, err := Open(ctx, mc.src, &fs.RangeOption{Start: start, End: end - 1})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -107,119 +106,99 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err
|
||||||
}
|
}
|
||||||
defer fs.CheckClose(rc, &err)
|
defer fs.CheckClose(rc, &err)
|
||||||
|
|
||||||
var writer io.Writer = newOffsetWriter(mc.wc, start)
|
bytesWritten, err := writer.WriteChunk(stream, readers.NewRepeatableReader(rc))
|
||||||
if ci.MultiThreadWriteBufferSize > 0 {
|
|
||||||
writer = bufio.NewWriterSize(writer, int(ci.MultiThreadWriteBufferSize))
|
|
||||||
fs.Debugf(mc.src, "multi-thread copy: write buffer set to %v", ci.MultiThreadWriteBufferSize)
|
|
||||||
}
|
|
||||||
// Copy the data
|
|
||||||
buf := make([]byte, multithreadReadBufferSize)
|
|
||||||
offset := start
|
|
||||||
for {
|
|
||||||
// Check if context cancelled and exit if so
|
|
||||||
if mc.ctx.Err() != nil {
|
|
||||||
return mc.ctx.Err()
|
|
||||||
}
|
|
||||||
nr, er := rc.Read(buf)
|
|
||||||
if nr > 0 {
|
|
||||||
err = mc.acc.AccountRead(nr)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("multipart copy: accounting failed: %w", err)
|
return err
|
||||||
}
|
}
|
||||||
nw, ew := writer.Write(buf[0:nr])
|
// FIXME: Wrap ReadSeeker for Accounting
|
||||||
if nw > 0 {
|
// However, to ensure reporting is correctly seeks have to be handled properly
|
||||||
offset += int64(nw)
|
errAccRead := mc.acc.AccountRead(int(bytesWritten))
|
||||||
}
|
if errAccRead != nil {
|
||||||
if ew != nil {
|
return errAccRead
|
||||||
return fmt.Errorf("multipart copy: write failed: %w", ew)
|
|
||||||
}
|
|
||||||
if nr != nw {
|
|
||||||
return fmt.Errorf("multipart copy: %w", io.ErrShortWrite)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if er != nil {
|
|
||||||
if er != io.EOF {
|
|
||||||
return fmt.Errorf("multipart copy: read failed: %w", er)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we were buffering, flush do disk
|
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v finished", stream+1, mc.numChunks, start, end, fs.SizeSuffix(bytesWritten))
|
||||||
switch w := writer.(type) {
|
|
||||||
case *bufio.Writer:
|
|
||||||
er2 := w.Flush()
|
|
||||||
if er2 != nil {
|
|
||||||
return fmt.Errorf("multipart copy: flush failed: %w", er2)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if offset != end {
|
|
||||||
return fmt.Errorf("multipart copy: wrote %d bytes but expected to write %d", offset-start, end-start)
|
|
||||||
}
|
|
||||||
|
|
||||||
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v finished", stream+1, mc.streams, start, end, fs.SizeSuffix(end-start))
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate the chunk sizes and updated number of streams
|
// Given a file size and a chunkSize
|
||||||
func (mc *multiThreadCopyState) calculateChunks() {
|
// it returns the number of chunks, so that chunkSize * numChunks >= size
|
||||||
partSize := mc.size / int64(mc.streams)
|
func calculateNumChunks(size int64, chunkSize int64) int {
|
||||||
// Round partition size up so partSize * streams >= size
|
numChunks := size / chunkSize
|
||||||
if (mc.size % int64(mc.streams)) != 0 {
|
if size%chunkSize != 0 {
|
||||||
partSize++
|
numChunks++
|
||||||
}
|
|
||||||
// round partSize up to nearest multithreadChunkSize boundary
|
|
||||||
mc.partSize = (partSize + multithreadChunkSizeMask) &^ multithreadChunkSizeMask
|
|
||||||
// recalculate number of streams
|
|
||||||
mc.streams = int(mc.size / mc.partSize)
|
|
||||||
// round streams up so partSize * streams >= size
|
|
||||||
if (mc.size % mc.partSize) != 0 {
|
|
||||||
mc.streams++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return int(numChunks)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy src to (f, remote) using streams download threads and the OpenWriterAt feature
|
// Copy src to (f, remote) using streams download threads. It tries to use the OpenChunkWriter feature
|
||||||
|
// and if that's not available it creates an adapter using OpenWriterAt
|
||||||
func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, streams int, tr *accounting.Transfer) (newDst fs.Object, err error) {
|
func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, streams int, tr *accounting.Transfer) (newDst fs.Object, err error) {
|
||||||
|
openChunkWriter := f.Features().OpenChunkWriter
|
||||||
|
ci := fs.GetConfig(ctx)
|
||||||
|
if openChunkWriter == nil {
|
||||||
openWriterAt := f.Features().OpenWriterAt
|
openWriterAt := f.Features().OpenWriterAt
|
||||||
if openWriterAt == nil {
|
if openWriterAt == nil {
|
||||||
return nil, errors.New("multi-thread copy: OpenWriterAt not supported")
|
return nil, errors.New("multi-part copy: neither OpenChunkWriter nor OpenWriterAt supported")
|
||||||
}
|
}
|
||||||
|
openChunkWriter = openChunkWriterFromOpenWriterAt(openWriterAt, int64(ci.MultiThreadChunkSize), int64(ci.MultiThreadWriteBufferSize), f)
|
||||||
|
}
|
||||||
|
|
||||||
if src.Size() < 0 {
|
if src.Size() < 0 {
|
||||||
return nil, errors.New("multi-thread copy: can't copy unknown sized file")
|
return nil, fmt.Errorf("multi-thread copy: can't copy unknown sized file")
|
||||||
}
|
}
|
||||||
if src.Size() == 0 {
|
if src.Size() == 0 {
|
||||||
return nil, errors.New("multi-thread copy: can't copy zero sized file")
|
return nil, fmt.Errorf("multi-thread copy: can't copy zero sized file")
|
||||||
}
|
}
|
||||||
|
|
||||||
g, gCtx := errgroup.WithContext(ctx)
|
g, gCtx := errgroup.WithContext(ctx)
|
||||||
|
chunkSize, chunkWriter, err := openChunkWriter(ctx, remote, src)
|
||||||
|
|
||||||
|
if chunkSize > src.Size() {
|
||||||
|
fs.Debugf(src, "multi-thread copy: chunk size %v was bigger than source file size %v", fs.SizeSuffix(chunkSize), fs.SizeSuffix(src.Size()))
|
||||||
|
chunkSize = src.Size()
|
||||||
|
}
|
||||||
|
|
||||||
|
numChunks := calculateNumChunks(src.Size(), chunkSize)
|
||||||
|
if streams > numChunks {
|
||||||
|
fs.Debugf(src, "multi-thread copy: number of streams '%d' was bigger than number of chunks '%d'", streams, numChunks)
|
||||||
|
streams = numChunks
|
||||||
|
}
|
||||||
|
|
||||||
mc := &multiThreadCopyState{
|
mc := &multiThreadCopyState{
|
||||||
ctx: gCtx,
|
ctx: gCtx,
|
||||||
size: src.Size(),
|
size: src.Size(),
|
||||||
src: src,
|
src: src,
|
||||||
|
partSize: chunkSize,
|
||||||
streams: streams,
|
streams: streams,
|
||||||
|
numChunks: numChunks,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("multipart copy: failed to open chunk writer: %w", err)
|
||||||
}
|
}
|
||||||
mc.calculateChunks()
|
|
||||||
|
|
||||||
// Make accounting
|
// Make accounting
|
||||||
mc.acc = tr.Account(ctx, nil)
|
mc.acc = tr.Account(ctx, nil)
|
||||||
|
|
||||||
// create write file handle
|
fs.Debugf(src, "Starting multi-thread copy with %d parts of size %v with %v parallel streams", mc.numChunks, fs.SizeSuffix(mc.partSize), mc.streams)
|
||||||
mc.wc, err = openWriterAt(gCtx, remote, mc.size)
|
sem := semaphore.NewWeighted(int64(mc.streams))
|
||||||
if err != nil {
|
for chunk := 0; chunk < mc.numChunks; chunk++ {
|
||||||
return nil, fmt.Errorf("multipart copy: failed to open destination: %w", err)
|
fs.Debugf(src, "Acquiring semaphore...")
|
||||||
|
if err := sem.Acquire(ctx, 1); err != nil {
|
||||||
|
fs.Errorf(src, "Failed to acquire semaphore: %v", err)
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
currChunk := chunk
|
||||||
fs.Debugf(src, "Starting multi-thread copy with %d parts of size %v", mc.streams, fs.SizeSuffix(mc.partSize))
|
|
||||||
for stream := 0; stream < mc.streams; stream++ {
|
|
||||||
stream := stream
|
|
||||||
g.Go(func() (err error) {
|
g.Go(func() (err error) {
|
||||||
return mc.copyStream(gCtx, stream)
|
defer sem.Release(1)
|
||||||
|
return mc.copyStream(gCtx, currChunk, chunkWriter)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
err = g.Wait()
|
err = g.Wait()
|
||||||
closeErr := mc.wc.Close()
|
closeErr := chunkWriter.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -232,13 +211,94 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
|
||||||
return nil, fmt.Errorf("multi-thread copy: failed to find object after copy: %w", err)
|
return nil, fmt.Errorf("multi-thread copy: failed to find object after copy: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if f.Features().PartialUploads {
|
||||||
err = obj.SetModTime(ctx, src.ModTime(ctx))
|
err = obj.SetModTime(ctx, src.ModTime(ctx))
|
||||||
switch err {
|
switch err {
|
||||||
case nil, fs.ErrorCantSetModTime, fs.ErrorCantSetModTimeWithoutDelete:
|
case nil, fs.ErrorCantSetModTime, fs.ErrorCantSetModTimeWithoutDelete:
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("multi-thread copy: failed to set modification time: %w", err)
|
return nil, fmt.Errorf("multi-thread copy: failed to set modification time: %w", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fs.Debugf(src, "Finished multi-thread copy with %d parts of size %v", mc.streams, fs.SizeSuffix(mc.partSize))
|
fs.Debugf(src, "Finished multi-thread copy with %d parts of size %v", mc.numChunks, fs.SizeSuffix(mc.partSize))
|
||||||
return obj, nil
|
return obj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type writerAtChunkWriter struct {
|
||||||
|
ctx context.Context
|
||||||
|
remote string
|
||||||
|
size int64
|
||||||
|
writerAt fs.WriterAtCloser
|
||||||
|
chunkSize int64
|
||||||
|
chunks int
|
||||||
|
writeBufferSize int64
|
||||||
|
f fs.Fs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w writerAtChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) (int64, error) {
|
||||||
|
fs.Debugf(w.remote, "writing chunk %v", chunkNumber)
|
||||||
|
|
||||||
|
bytesToWrite := w.chunkSize
|
||||||
|
if chunkNumber == (w.chunks-1) && w.size%w.chunkSize != 0 {
|
||||||
|
bytesToWrite = w.size % w.chunkSize
|
||||||
|
}
|
||||||
|
|
||||||
|
var writer io.Writer = newOffsetWriter(w.writerAt, int64(chunkNumber)*w.chunkSize)
|
||||||
|
if w.writeBufferSize > 0 {
|
||||||
|
writer = bufio.NewWriterSize(writer, int(w.writeBufferSize))
|
||||||
|
}
|
||||||
|
n, err := io.Copy(writer, reader)
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
if n != bytesToWrite {
|
||||||
|
return -1, fmt.Errorf("expected to write %v bytes for chunk %v, but wrote %v bytes", bytesToWrite, chunkNumber, n)
|
||||||
|
}
|
||||||
|
// if we were buffering, flush do disk
|
||||||
|
switch w := writer.(type) {
|
||||||
|
case *bufio.Writer:
|
||||||
|
er2 := w.Flush()
|
||||||
|
if er2 != nil {
|
||||||
|
return -1, fmt.Errorf("multipart copy: flush failed: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w writerAtChunkWriter) Close() error {
|
||||||
|
return w.writerAt.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w writerAtChunkWriter) Abort() error {
|
||||||
|
obj, err := w.f.NewObject(w.ctx, w.remote)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("multi-thread copy: failed to find temp file when aborting chunk writer: %w", err)
|
||||||
|
}
|
||||||
|
return obj.Remove(w.ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func openChunkWriterFromOpenWriterAt(openWriterAt func(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error), chunkSize int64, writeBufferSize int64, f fs.Fs) func(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) {
|
||||||
|
return func(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) {
|
||||||
|
writerAt, err := openWriterAt(ctx, remote, src.Size())
|
||||||
|
if err != nil {
|
||||||
|
return -1, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if writeBufferSize > 0 {
|
||||||
|
fs.Debugf(src.Remote(), "multi-thread copy: write buffer set to %v", writeBufferSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
chunkWriter := &writerAtChunkWriter{
|
||||||
|
ctx: ctx,
|
||||||
|
remote: remote,
|
||||||
|
size: src.Size(),
|
||||||
|
chunkSize: chunkSize,
|
||||||
|
chunks: calculateNumChunks(src.Size(), chunkSize),
|
||||||
|
writerAt: writerAt,
|
||||||
|
writeBufferSize: writeBufferSize,
|
||||||
|
f: f,
|
||||||
|
}
|
||||||
|
|
||||||
|
return chunkSize, chunkWriter, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -86,27 +86,24 @@ func TestDoMultiThreadCopy(t *testing.T) {
|
||||||
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMultithreadCalculateChunks(t *testing.T) {
|
func TestMultithreadCalculateNumChunks(t *testing.T) {
|
||||||
for _, test := range []struct {
|
for _, test := range []struct {
|
||||||
size int64
|
size int64
|
||||||
streams int
|
chunkSize int64
|
||||||
wantPartSize int64
|
wantNumChunks int
|
||||||
wantStreams int
|
|
||||||
}{
|
}{
|
||||||
{size: 1, streams: 10, wantPartSize: multithreadChunkSize, wantStreams: 1},
|
{size: 1, chunkSize: multithreadChunkSize, wantNumChunks: 1},
|
||||||
{size: 1 << 20, streams: 1, wantPartSize: 1 << 20, wantStreams: 1},
|
{size: 1 << 20, chunkSize: 1, wantNumChunks: 1 << 20},
|
||||||
{size: 1 << 20, streams: 2, wantPartSize: 1 << 19, wantStreams: 2},
|
{size: 1 << 20, chunkSize: 2, wantNumChunks: 1 << 19},
|
||||||
{size: (1 << 20) + 1, streams: 2, wantPartSize: (1 << 19) + multithreadChunkSize, wantStreams: 2},
|
{size: (1 << 20) + 1, chunkSize: 2, wantNumChunks: (1 << 19) + 1},
|
||||||
{size: (1 << 20) - 1, streams: 2, wantPartSize: (1 << 19), wantStreams: 2},
|
{size: (1 << 20) - 1, chunkSize: 2, wantNumChunks: 1 << 19},
|
||||||
} {
|
} {
|
||||||
t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) {
|
t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) {
|
||||||
mc := &multiThreadCopyState{
|
mc := &multiThreadCopyState{
|
||||||
size: test.size,
|
size: test.size,
|
||||||
streams: test.streams,
|
|
||||||
}
|
}
|
||||||
mc.calculateChunks()
|
mc.numChunks = calculateNumChunks(test.size, test.chunkSize)
|
||||||
assert.Equal(t, test.wantPartSize, mc.partSize)
|
assert.Equal(t, test.wantNumChunks, mc.numChunks)
|
||||||
assert.Equal(t, test.wantStreams, mc.streams)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user