From fb61ed8506e68db0ea19c62a08daa9cdee59dda5 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Fri, 29 May 2020 12:36:31 +0100 Subject: [PATCH] b2: Implement server side copy for files > 5GB - fixes #3991 This factors copy out of SetModTime and Copy so it can be called from both places. This also reworks all the multipart uploading to use sync.Errgroup and memory pooling like the other backends. This makes it more memory efficient and handle errors better. See: https://forum.rclone.org/t/copying-files-within-a-b2-bucket/16680/10 --- backend/b2/api/types.go | 8 + backend/b2/b2.go | 204 ++++++++++++++--------- backend/b2/upload.go | 355 +++++++++++++++++++++++----------------- 3 files changed, 339 insertions(+), 228 deletions(-) diff --git a/backend/b2/api/types.go b/backend/b2/api/types.go index fcf0c9834..a9d6af2b1 100644 --- a/backend/b2/api/types.go +++ b/backend/b2/api/types.go @@ -337,3 +337,11 @@ type CopyFileRequest struct { Info map[string]string `json:"fileInfo,omitempty"` // This field stores the metadata that will be stored with the file. (REPLACE only) DestBucketID string `json:"destinationBucketId,omitempty"` // The destination ID of the bucket if set, if not the source bucket will be used } + +// CopyPartRequest is the request for b2_copy_part - the response is UploadPartResponse +type CopyPartRequest struct { + SourceID string `json:"sourceFileId"` // The ID of the source file being copied. + LargeFileID string `json:"largeFileId"` // The ID of the large file the part will belong to, as returned by b2_start_large_file. + PartNumber int64 `json:"partNumber"` // Which part this is (starting from 1) + Range string `json:"range,omitempty"` // The range of bytes to copy. If not provided, the whole source file will be copied. +} diff --git a/backend/b2/b2.go b/backend/b2/b2.go index bddb09ca0..df8e9fc3f 100644 --- a/backend/b2/b2.go +++ b/backend/b2/b2.go @@ -33,6 +33,7 @@ import ( "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/pacer" + "github.com/rclone/rclone/lib/pool" "github.com/rclone/rclone/lib/rest" ) @@ -54,6 +55,9 @@ const ( minChunkSize = 5 * fs.MebiByte defaultChunkSize = 96 * fs.MebiByte defaultUploadCutoff = 200 * fs.MebiByte + largeFileCopyCutoff = 4 * fs.GibiByte // 5E9 is the max + memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long + memoryPoolUseMmap = false ) // Globals @@ -113,6 +117,16 @@ Files above this size will be uploaded in chunks of "--b2-chunk-size". This value should be set no larger than 4.657GiB (== 5GB).`, Default: defaultUploadCutoff, Advanced: true, + }, { + Name: "copy_cutoff", + Help: `Cutoff for switching to multipart copy + +Any files larger than this that need to be server side copied will be +copied in chunks of this size. + +The minimum is 0 and the maximum is 4.6GB.`, + Default: largeFileCopyCutoff, + Advanced: true, }, { Name: "chunk_size", Help: `Upload chunk size. Must fit in memory. @@ -150,6 +164,18 @@ The duration before the download authorization token will expire. The minimum value is 1 second. The maximum value is one week.`, Default: fs.Duration(7 * 24 * time.Hour), Advanced: true, + }, { + Name: "memory_pool_flush_time", + Default: memoryPoolFlushTime, + Advanced: true, + Help: `How often internal memory buffer pools will be flushed. +Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations. +This option controls how often unused buffers will be removed from the pool.`, + }, { + Name: "memory_pool_use_mmap", + Default: memoryPoolUseMmap, + Advanced: true, + Help: `Whether to use mmap buffers in internal memory pool.`, }, { Name: config.ConfigEncoding, Help: config.ConfigEncodingHelp, @@ -173,10 +199,13 @@ type Options struct { Versions bool `config:"versions"` HardDelete bool `config:"hard_delete"` UploadCutoff fs.SizeSuffix `config:"upload_cutoff"` + CopyCutoff fs.SizeSuffix `config:"copy_cutoff"` ChunkSize fs.SizeSuffix `config:"chunk_size"` DisableCheckSum bool `config:"disable_checksum"` DownloadURL string `config:"download_url"` DownloadAuthorizationDuration fs.Duration `config:"download_auth_duration"` + MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"` + MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"` Enc encoder.MultiEncoder `config:"encoding"` } @@ -199,7 +228,8 @@ type Fs struct { uploads map[string][]*api.GetUploadURLResponse // Upload URLs by buckedID authMu sync.Mutex // lock for authorizing the account pacer *fs.Pacer // To pace and retry the API calls - bufferTokens chan []byte // control concurrency of multipart uploads + uploadToken *pacer.TokenDispenser // control concurrency + pool *pool.Pool // memory pool } // Object describes a b2 object @@ -335,7 +365,6 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) err = checkUploadChunkSize(cs) if err == nil { old, f.opt.ChunkSize = f.opt.ChunkSize, cs - f.fillBufferTokens() // reset the buffer tokens } return } @@ -396,6 +425,13 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { _bucketType: make(map[string]string, 1), uploads: make(map[string][]*api.GetUploadURLResponse), pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), + uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers), + pool: pool.New( + time.Duration(opt.MemoryPoolFlushTime), + int(opt.ChunkSize), + fs.Config.Transfers, + opt.MemoryPoolUseMmap, + ), } f.setRoot(root) f.features = (&fs.Features{ @@ -410,7 +446,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { f.srv.SetHeader(testModeHeader, testMode) fs.Debugf(f, "Setting test header \"%s: %s\"", testModeHeader, testMode) } - f.fillBufferTokens() err = f.authorizeAccount(ctx) if err != nil { return nil, errors.Wrap(err, "failed to authorize account") @@ -533,32 +568,25 @@ func (f *Fs) clearUploadURL(bucketID string) { f.uploadMu.Unlock() } -// Fill up (or reset) the buffer tokens -func (f *Fs) fillBufferTokens() { - f.bufferTokens = make(chan []byte, fs.Config.Transfers) - for i := 0; i < fs.Config.Transfers; i++ { - f.bufferTokens <- nil +// getBuf gets a buffer of f.opt.ChunkSize and an upload token +// +// If noBuf is set then it just gets an upload token +func (f *Fs) getBuf(noBuf bool) (buf []byte) { + f.uploadToken.Get() + if !noBuf { + buf = f.pool.Get() } -} - -// getUploadBlock gets a block from the pool of size chunkSize -func (f *Fs) getUploadBlock() []byte { - buf := <-f.bufferTokens - if buf == nil { - buf = make([]byte, f.opt.ChunkSize) - } - // fs.Debugf(f, "Getting upload block %p", buf) return buf } -// putUploadBlock returns a block to the pool of size chunkSize -func (f *Fs) putUploadBlock(buf []byte) { - buf = buf[:cap(buf)] - if len(buf) != int(f.opt.ChunkSize) { - panic("bad blocksize returned to pool") +// putBuf returns a buffer to the memory pool and an upload token +// +// If noBuf is set then it just returns the upload token +func (f *Fs) putBuf(buf []byte, noBuf bool) { + if !noBuf { + f.pool.Put(buf) } - // fs.Debugf(f, "Returning upload block %p", buf) - f.bufferTokens <- buf + f.uploadToken.Put() } // Return an Object from a path @@ -1205,6 +1233,63 @@ func (f *Fs) CleanUp(ctx context.Context) error { return f.purge(ctx, f.rootBucket, f.rootDirectory, true) } +// copy does a server side copy from dstObj <- srcObj +// +// If newInfo is nil then the metadata will be copied otherwise it +// will be replaced with newInfo +func (f *Fs) copy(ctx context.Context, dstObj *Object, srcObj *Object, newInfo *api.File) (err error) { + if srcObj.size >= int64(f.opt.CopyCutoff) { + if newInfo == nil { + newInfo, err = srcObj.getMetaData(ctx) + if err != nil { + return err + } + } + up, err := f.newLargeUpload(ctx, dstObj, nil, srcObj, f.opt.CopyCutoff, true, newInfo) + if err != nil { + return err + } + return up.Upload(ctx) + } + + dstBucket, dstPath := dstObj.split() + err = f.makeBucket(ctx, dstBucket) + if err != nil { + return err + } + + destBucketID, err := f.getBucketID(ctx, dstBucket) + if err != nil { + return err + } + + opts := rest.Opts{ + Method: "POST", + Path: "/b2_copy_file", + } + var request = api.CopyFileRequest{ + SourceID: srcObj.id, + Name: f.opt.Enc.FromStandardPath(dstPath), + DestBucketID: destBucketID, + } + if newInfo == nil { + request.MetadataDirective = "COPY" + } else { + request.MetadataDirective = "REPLACE" + request.ContentType = newInfo.ContentType + request.Info = newInfo.Info + } + var response api.FileInfo + err = f.pacer.Call(func() (bool, error) { + resp, err := f.srv.CallJSON(ctx, &opts, &request, &response) + return f.shouldRetry(ctx, resp, err) + }) + if err != nil { + return err + } + return dstObj.decodeMetaDataFileInfo(&response) +} + // Copy src to this remote using server side copy operations. // // This is stored with the remote path given @@ -1215,47 +1300,21 @@ func (f *Fs) CleanUp(ctx context.Context) error { // // If it isn't possible then return fs.ErrorCantCopy func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { - dstBucket, dstPath := f.split(remote) - err := f.makeBucket(ctx, dstBucket) - if err != nil { - return nil, err - } srcObj, ok := src.(*Object) if !ok { fs.Debugf(src, "Can't copy - not same remote type") return nil, fs.ErrorCantCopy } - destBucketID, err := f.getBucketID(ctx, dstBucket) - if err != nil { - return nil, err - } - opts := rest.Opts{ - Method: "POST", - Path: "/b2_copy_file", - } - var request = api.CopyFileRequest{ - SourceID: srcObj.id, - Name: f.opt.Enc.FromStandardPath(dstPath), - MetadataDirective: "COPY", - DestBucketID: destBucketID, - } - var response api.FileInfo - err = f.pacer.Call(func() (bool, error) { - resp, err := f.srv.CallJSON(ctx, &opts, &request, &response) - return f.shouldRetry(ctx, resp, err) - }) - if err != nil { - return nil, err - } - o := &Object{ + // Temporary Object under construction + dstObj := &Object{ fs: f, remote: remote, } - err = o.decodeMetaDataFileInfo(&response) + err := f.copy(ctx, dstObj, srcObj, nil) if err != nil { return nil, err } - return o, nil + return dstObj, nil } // Hashes returns the supported hash sets. @@ -1526,28 +1585,10 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error { if err != nil { return err } - _, bucketPath := o.split() info.Info[timeKey] = timeString(modTime) - opts := rest.Opts{ - Method: "POST", - Path: "/b2_copy_file", - } - var request = api.CopyFileRequest{ - SourceID: o.id, - Name: o.fs.opt.Enc.FromStandardPath(bucketPath), // copy to same name - MetadataDirective: "REPLACE", - ContentType: info.ContentType, - Info: info.Info, - } - var response api.FileInfo - err = o.fs.pacer.Call(func() (bool, error) { - resp, err := o.fs.srv.CallJSON(ctx, &opts, &request, &response) - return o.fs.shouldRetry(ctx, resp, err) - }) - if err != nil { - return err - } - return o.decodeMetaDataFileInfo(&response) + + // Copy to the same name, overwriting the metadata only + return o.fs.copy(ctx, o, o, info) } // Storable returns if this object is storable @@ -1723,7 +1764,8 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op } if size == -1 { // Check if the file is large enough for a chunked upload (needs to be at least two chunks) - buf := o.fs.getUploadBlock() + buf := o.fs.getBuf(false) + n, err := io.ReadFull(in, buf) if err == nil { bufReader := bufio.NewReader(in) @@ -1733,22 +1775,24 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op if err == nil { fs.Debugf(o, "File is big enough for chunked streaming") - up, err := o.fs.newLargeUpload(ctx, o, in, src) + up, err := o.fs.newLargeUpload(ctx, o, in, src, o.fs.opt.ChunkSize, false, nil) if err != nil { - o.fs.putUploadBlock(buf) + o.fs.putBuf(buf, false) return err } + // NB Stream returns the buffer and token return up.Stream(ctx, buf) } else if err == io.EOF || err == io.ErrUnexpectedEOF { fs.Debugf(o, "File has %d bytes, which makes only one chunk. Using direct upload.", n) - defer o.fs.putUploadBlock(buf) + defer o.fs.putBuf(buf, false) size = int64(n) in = bytes.NewReader(buf[:n]) } else { + o.fs.putBuf(buf, false) return err } } else if size > int64(o.fs.opt.UploadCutoff) { - up, err := o.fs.newLargeUpload(ctx, o, in, src) + up, err := o.fs.newLargeUpload(ctx, o, in, src, o.fs.opt.ChunkSize, false, nil) if err != nil { return err } diff --git a/backend/b2/upload.go b/backend/b2/upload.go index d390c340b..bc179c714 100644 --- a/backend/b2/upload.go +++ b/backend/b2/upload.go @@ -21,6 +21,7 @@ import ( "github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/lib/rest" + "golang.org/x/sync/errgroup" ) type hashAppendingReader struct { @@ -68,20 +69,26 @@ func newHashAppendingReader(in io.Reader, h gohash.Hash) *hashAppendingReader { // largeUpload is used to control the upload of large files which need chunking type largeUpload struct { - f *Fs // parent Fs - o *Object // object being uploaded - in io.Reader // read the data from here - wrap accounting.WrapFn // account parts being transferred - id string // ID of the file being uploaded - size int64 // total size - parts int64 // calculated number of parts, if known - sha1s []string // slice of SHA1s for each part - uploadMu sync.Mutex // lock for upload variable - uploads []*api.GetUploadPartURLResponse // result of get upload URL calls + f *Fs // parent Fs + o *Object // object being uploaded + doCopy bool // doing copy rather than upload + what string // text name of operation for logs + in io.Reader // read the data from here + wrap accounting.WrapFn // account parts being transferred + id string // ID of the file being uploaded + size int64 // total size + parts int64 // calculated number of parts, if known + sha1s []string // slice of SHA1s for each part + uploadMu sync.Mutex // lock for upload variable + uploads []*api.GetUploadPartURLResponse // result of get upload URL calls + chunkSize int64 // chunk size to use + src *Object // if copying, object we are reading from } // newLargeUpload starts an upload of object o from in with metadata in src -func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs.ObjectInfo) (up *largeUpload, err error) { +// +// If newInfo is set then metadata from that will be used instead of reading it from src +func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs.ObjectInfo, chunkSize fs.SizeSuffix, doCopy bool, newInfo *api.File) (up *largeUpload, err error) { remote := o.remote size := src.Size() parts := int64(0) @@ -89,8 +96,8 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs if size == -1 { fs.Debugf(o, "Streaming upload with --b2-chunk-size %s allows uploads of up to %s and will fail only when that limit is reached.", f.opt.ChunkSize, maxParts*f.opt.ChunkSize) } else { - parts = size / int64(o.fs.opt.ChunkSize) - if size%int64(o.fs.opt.ChunkSize) != 0 { + parts = size / int64(chunkSize) + if size%int64(chunkSize) != 0 { parts++ } if parts > maxParts { @@ -99,7 +106,6 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs sha1SliceSize = parts } - modTime := src.ModTime(ctx) opts := rest.Opts{ Method: "POST", Path: "/b2_start_large_file", @@ -110,18 +116,24 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs return nil, err } var request = api.StartLargeFileRequest{ - BucketID: bucketID, - Name: f.opt.Enc.FromStandardPath(bucketPath), - ContentType: fs.MimeType(ctx, src), - Info: map[string]string{ - timeKey: timeString(modTime), - }, + BucketID: bucketID, + Name: f.opt.Enc.FromStandardPath(bucketPath), } - // Set the SHA1 if known - if !o.fs.opt.DisableCheckSum { - if calculatedSha1, err := src.Hash(ctx, hash.SHA1); err == nil && calculatedSha1 != "" { - request.Info[sha1Key] = calculatedSha1 + if newInfo == nil { + modTime := src.ModTime(ctx) + request.ContentType = fs.MimeType(ctx, src) + request.Info = map[string]string{ + timeKey: timeString(modTime), } + // Set the SHA1 if known + if !o.fs.opt.DisableCheckSum || doCopy { + if calculatedSha1, err := src.Hash(ctx, hash.SHA1); err == nil && calculatedSha1 != "" { + request.Info[sha1Key] = calculatedSha1 + } + } + } else { + request.ContentType = newInfo.ContentType + request.Info = newInfo.Info } var response api.StartLargeFileResponse err = f.pacer.Call(func() (bool, error) { @@ -131,18 +143,24 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs if err != nil { return nil, err } + up = &largeUpload{ + f: f, + o: o, + doCopy: doCopy, + what: "upload", + id: response.ID, + size: size, + parts: parts, + sha1s: make([]string, sha1SliceSize), + chunkSize: int64(chunkSize), + } // unwrap the accounting from the input, we use wrap to put it // back on after the buffering - in, wrap := accounting.UnWrap(in) - up = &largeUpload{ - f: f, - o: o, - in: in, - wrap: wrap, - id: response.ID, - size: size, - parts: parts, - sha1s: make([]string, sha1SliceSize), + if doCopy { + up.what = "copy" + up.src = src.(*Object) + } else { + up.in, up.wrap = accounting.UnWrap(in) } return up, nil } @@ -256,9 +274,41 @@ func (up *largeUpload) transferChunk(ctx context.Context, part int64, body []byt return err } +// Copy a chunk +func (up *largeUpload) copyChunk(ctx context.Context, part int64, partSize int64) error { + err := up.f.pacer.Call(func() (bool, error) { + fs.Debugf(up.o, "Copying chunk %d length %d", part, partSize) + opts := rest.Opts{ + Method: "POST", + Path: "/b2_copy_part", + } + offset := (part - 1) * up.chunkSize // where we are in the source file + var request = api.CopyPartRequest{ + SourceID: up.src.id, + LargeFileID: up.id, + PartNumber: part, + Range: fmt.Sprintf("bytes=%d-%d", offset, offset+partSize-1), + } + var response api.UploadPartResponse + resp, err := up.f.srv.CallJSON(ctx, &opts, &request, &response) + retry, err := up.f.shouldRetry(ctx, resp, err) + if err != nil { + fs.Debugf(up.o, "Error copying chunk %d (retry=%v): %v: %#v", part, retry, err, err) + } + up.sha1s[part-1] = response.SHA1 + return retry, err + }) + if err != nil { + fs.Debugf(up.o, "Error copying chunk %d: %v", part, err) + } else { + fs.Debugf(up.o, "Done copying chunk %d", part) + } + return err +} + // finish closes off the large upload func (up *largeUpload) finish(ctx context.Context) error { - fs.Debugf(up.o, "Finishing large file upload with %d parts", up.parts) + fs.Debugf(up.o, "Finishing large file %s with %d parts", up.what, up.parts) opts := rest.Opts{ Method: "POST", Path: "/b2_finish_large_file", @@ -295,136 +345,145 @@ func (up *largeUpload) cancel(ctx context.Context) error { return err } -func (up *largeUpload) managedTransferChunk(ctx context.Context, wg *sync.WaitGroup, errs chan error, part int64, buf []byte) { - wg.Add(1) - go func(part int64, buf []byte) { - defer wg.Done() - defer up.f.putUploadBlock(buf) - err := up.transferChunk(ctx, part, buf) - if err != nil { - select { - case errs <- err: - default: - } - } - }(part, buf) -} - -func (up *largeUpload) finishOrCancelOnError(ctx context.Context, err error, errs chan error) error { - if err == nil { - select { - case err = <-errs: - default: - } +// If the error pointer is not nil then cancel the transfer +func (up *largeUpload) cancelOnError(ctx context.Context, err *error) { + if *err == nil { + return } - if err != nil { - fs.Debugf(up.o, "Cancelling large file upload due to error: %v", err) - cancelErr := up.cancel(ctx) - if cancelErr != nil { - fs.Errorf(up.o, "Failed to cancel large file upload: %v", cancelErr) - } - return err + fs.Debugf(up.o, "Cancelling large file %s due to error: %v", up.what, *err) + cancelErr := up.cancel(ctx) + if cancelErr != nil { + fs.Errorf(up.o, "Failed to cancel large file %s: %v", up.what, cancelErr) } - return up.finish(ctx) } // Stream uploads the chunks from the input, starting with a required initial // chunk. Assumes the file size is unknown and will upload until the input // reaches EOF. +// +// Note that initialUploadBlock must be returned to f.putBuf() func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock []byte) (err error) { + defer up.cancelOnError(ctx, &err) fs.Debugf(up.o, "Starting streaming of large file (id %q)", up.id) - errs := make(chan error, 1) - hasMoreParts := true - var wg sync.WaitGroup - - // Transfer initial chunk + var ( + g, gCtx = errgroup.WithContext(ctx) + hasMoreParts = true + ) up.size = int64(len(initialUploadBlock)) - up.managedTransferChunk(ctx, &wg, errs, 1, initialUploadBlock) + g.Go(func() error { + for part := int64(1); hasMoreParts; part++ { + // Get a block of memory from the pool and token which limits concurrency. + var buf []byte + if part == 1 { + buf = initialUploadBlock + } else { + buf = up.f.getBuf(false) + } -outer: - for part := int64(2); hasMoreParts; part++ { - // Check any errors - select { - case err = <-errs: - break outer - default: + // Fail fast, in case an errgroup managed function returns an error + // gCtx is cancelled. There is no point in uploading all the other parts. + if gCtx.Err() != nil { + up.f.putBuf(buf, false) + return nil + } + + // Read the chunk + var n int + if part == 1 { + n = len(buf) + } else { + n, err = io.ReadFull(up.in, buf) + if err == io.ErrUnexpectedEOF { + fs.Debugf(up.o, "Read less than a full chunk, making this the last one.") + buf = buf[:n] + hasMoreParts = false + } else if err == io.EOF { + fs.Debugf(up.o, "Could not read any more bytes, previous chunk was the last.") + up.f.putBuf(buf, false) + return nil + } else if err != nil { + // other kinds of errors indicate failure + up.f.putBuf(buf, false) + return err + } + } + + // Keep stats up to date + up.parts = part + up.size += int64(n) + if part > maxParts { + up.f.putBuf(buf, false) + return errors.Errorf("%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size", up.o, up.size, up.parts, maxParts) + } + + part := part // for the closure + g.Go(func() (err error) { + defer up.f.putBuf(buf, false) + return up.transferChunk(gCtx, part, buf) + }) } - - // Get a block of memory - buf := up.f.getUploadBlock() - - // Read the chunk - var n int - n, err = io.ReadFull(up.in, buf) - if err == io.ErrUnexpectedEOF { - fs.Debugf(up.o, "Read less than a full chunk, making this the last one.") - buf = buf[:n] - hasMoreParts = false - err = nil - } else if err == io.EOF { - fs.Debugf(up.o, "Could not read any more bytes, previous chunk was the last.") - up.f.putUploadBlock(buf) - err = nil - break outer - } else if err != nil { - // other kinds of errors indicate failure - up.f.putUploadBlock(buf) - break outer - } - - // Keep stats up to date - up.parts = part - up.size += int64(n) - if part > maxParts { - err = errors.Errorf("%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size", up.o, up.size, up.parts, maxParts) - break outer - } - - // Transfer the chunk - up.managedTransferChunk(ctx, &wg, errs, part, buf) + return nil + }) + err = g.Wait() + if err != nil { + return err } - wg.Wait() up.sha1s = up.sha1s[:up.parts] - - return up.finishOrCancelOnError(ctx, err, errs) + return up.finish(ctx) } // Upload uploads the chunks from the input -func (up *largeUpload) Upload(ctx context.Context) error { - fs.Debugf(up.o, "Starting upload of large file in %d chunks (id %q)", up.parts, up.id) - remaining := up.size - errs := make(chan error, 1) - var wg sync.WaitGroup - var err error -outer: - for part := int64(1); part <= up.parts; part++ { - // Check any errors - select { - case err = <-errs: - break outer - default: +func (up *largeUpload) Upload(ctx context.Context) (err error) { + defer up.cancelOnError(ctx, &err) + fs.Debugf(up.o, "Starting %s of large file in %d chunks (id %q)", up.what, up.parts, up.id) + var ( + g, gCtx = errgroup.WithContext(ctx) + remaining = up.size + ) + g.Go(func() error { + for part := int64(1); part <= up.parts; part++ { + // Get a block of memory from the pool and token which limits concurrency. + buf := up.f.getBuf(up.doCopy) + + // Fail fast, in case an errgroup managed function returns an error + // gCtx is cancelled. There is no point in uploading all the other parts. + if gCtx.Err() != nil { + up.f.putBuf(buf, up.doCopy) + return nil + } + + reqSize := remaining + if reqSize >= up.chunkSize { + reqSize = up.chunkSize + } + + if !up.doCopy { + // Read the chunk + buf = buf[:reqSize] + _, err = io.ReadFull(up.in, buf) + if err != nil { + up.f.putBuf(buf, up.doCopy) + return err + } + } + + part := part // for the closure + g.Go(func() (err error) { + defer up.f.putBuf(buf, up.doCopy) + if !up.doCopy { + err = up.transferChunk(gCtx, part, buf) + } else { + err = up.copyChunk(gCtx, part, reqSize) + } + return err + }) + remaining -= reqSize } - - reqSize := remaining - if reqSize >= int64(up.f.opt.ChunkSize) { - reqSize = int64(up.f.opt.ChunkSize) - } - - // Get a block of memory - buf := up.f.getUploadBlock()[:reqSize] - - // Read the chunk - _, err = io.ReadFull(up.in, buf) - if err != nil { - up.f.putUploadBlock(buf) - break outer - } - - // Transfer the chunk - up.managedTransferChunk(ctx, &wg, errs, part, buf) - remaining -= reqSize + return nil + }) + err = g.Wait() + if err != nil { + return err } - wg.Wait() - - return up.finishOrCancelOnError(ctx, err, errs) + return up.finish(ctx) }