s3: copy parts in parallel when doing chunked server side copy

Before this change rclone copied each chunk serially.

After this change it does --s3-upload-concurrency at once.

See: https://forum.rclone.org/t/transfer-big-files-50gb-from-s3-bucket-to-another-s3-bucket-doesnt-starts/43209
This commit is contained in:
Nick Craig-Wood 2023-12-01 10:30:44 +00:00
parent c16c22d6e1
commit 1f6271fa15

View File

@ -61,6 +61,7 @@ import (
"github.com/rclone/rclone/lib/rest"
"github.com/rclone/rclone/lib/version"
"golang.org/x/net/http/httpguts"
"golang.org/x/sync/errgroup"
)
// The S3 providers
@ -2185,10 +2186,10 @@ If empty it will default to the environment variable "AWS_PROFILE" or
Sensitive: true,
}, {
Name: "upload_concurrency",
Help: `Concurrency for multipart uploads.
Help: `Concurrency for multipart uploads and copies.
This is the number of chunks of the same file that are uploaded
concurrently.
concurrently for multipart uploads and copies.
If you are uploading small numbers of large files over high-speed links
and these uploads do not fully utilize your bandwidth, then increasing
@ -4507,10 +4508,20 @@ func (f *Fs) copyMultipart(ctx context.Context, copyReq *s3.CopyObjectInput, dst
fs.Debugf(src, "Starting multipart copy with %d parts", numParts)
var parts []*s3.CompletedPart
var (
parts = make([]*s3.CompletedPart, numParts)
g, gCtx = errgroup.WithContext(ctx)
)
g.SetLimit(f.opt.UploadConcurrency)
for partNum := int64(1); partNum <= numParts; partNum++ {
if err := f.pacer.Call(func() (bool, error) {
partNum := partNum
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
break
}
partNum := partNum // for closure
g.Go(func() error {
var uout *s3.UploadPartCopyOutput
uploadPartReq := &s3.UploadPartCopyInput{}
//structs.SetFrom(uploadPartReq, copyReq)
setFrom_s3UploadPartCopyInput_s3CopyObjectInput(uploadPartReq, copyReq)
@ -4519,18 +4530,24 @@ func (f *Fs) copyMultipart(ctx context.Context, copyReq *s3.CopyObjectInput, dst
uploadPartReq.PartNumber = &partNum
uploadPartReq.UploadId = uid
uploadPartReq.CopySourceRange = aws.String(calculateRange(partSize, partNum-1, numParts, srcSize))
uout, err := f.c.UploadPartCopyWithContext(ctx, uploadPartReq)
err := f.pacer.Call(func() (bool, error) {
uout, err = f.c.UploadPartCopyWithContext(gCtx, uploadPartReq)
return f.shouldRetry(gCtx, err)
})
if err != nil {
return f.shouldRetry(ctx, err)
return err
}
parts = append(parts, &s3.CompletedPart{
parts[partNum-1] = &s3.CompletedPart{
PartNumber: &partNum,
ETag: uout.CopyPartResult.ETag,
})
return false, nil
}); err != nil {
return err
}
}
return nil
})
}
err = g.Wait()
if err != nil {
return err
}
return f.pacer.Call(func() (bool, error) {