diff --git a/backend/azureblob/azureblob.go b/backend/azureblob/azureblob.go index 441473e13..8c5c182c7 100644 --- a/backend/azureblob/azureblob.go +++ b/backend/azureblob/azureblob.go @@ -44,11 +44,13 @@ import ( "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/walk" + "github.com/rclone/rclone/lib/atexit" "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/env" "github.com/rclone/rclone/lib/multipart" "github.com/rclone/rclone/lib/pacer" + "golang.org/x/sync/errgroup" ) const ( @@ -312,6 +314,47 @@ Note that chunks are stored in memory and there may be up to in memory.`, Default: 16, Advanced: true, + }, { + Name: "copy_cutoff", + Help: `Cutoff for switching to multipart copy. + +Any files larger than this that need to be server-side copied will be +copied in chunks of chunk_size using the put block list API. + +Files smaller than this limit will be copied with the Copy Blob API.`, + Default: 8 * fs.Mebi, + Advanced: true, + }, { + Name: "copy_concurrency", + Help: `Concurrency for multipart copy. + +This is the number of chunks of the same file that are copied +concurrently. + +These chunks are not buffered in memory and Microsoft recommends +setting this value to greater than 1000 in the azcopy documentation. + +https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-optimize#increase-concurrency + +In tests, copy speed increases almost linearly with copy +concurrency.`, + Default: 512, + Advanced: true, + }, { + Name: "use_copy_blob", + Help: `Whether to use the Copy Blob API when copying to the same storage account. + +If true (the default) then rclone will use the Copy Blob API for +copies to the same storage account even when the size is above the +copy_cutoff. + +Rclone assumes that the same storage account means the same config +and does not check for the same storage account in different configs. + +There should be no need to change this value. +`, + Default: true, + Advanced: true, }, { Name: "list_chunk", Help: `Size of blob list. @@ -478,6 +521,9 @@ type Options struct { UseAZ bool `config:"use_az"` Endpoint string `config:"endpoint"` ChunkSize fs.SizeSuffix `config:"chunk_size"` + CopyCutoff fs.SizeSuffix `config:"copy_cutoff"` + CopyConcurrency int `config:"copy_concurrency"` + UseCopyBlob bool `config:"use_copy_blob"` UploadConcurrency int `config:"upload_concurrency"` ListChunkSize uint `config:"list_chunk"` AccessTier string `config:"access_tier"` @@ -502,6 +548,9 @@ type Fs struct { cntSVCcacheMu sync.Mutex // mutex to protect cntSVCcache cntSVCcache map[string]*container.Client // reference to containerClient per container svc *service.Client // client to access azblob + cred azcore.TokenCredential // how to generate tokens (may be nil) + sharedKeyCred *service.SharedKeyCredential // shared key credentials (may be nil) + anonymous bool // if this is anonymous access rootContainer string // container part of root (if any) rootDirectory string // directory part of root (if any) isLimited bool // if limited to one container @@ -641,6 +690,14 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) return } +func (f *Fs) setCopyCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { + err = checkUploadChunkSize(cs) + if err == nil { + old, f.opt.CopyCutoff = f.opt.CopyCutoff, cs + } + return +} + type servicePrincipalCredentials struct { AppID string `json:"appId"` Password string `json:"password"` @@ -726,12 +783,13 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e f.publicAccess = container.PublicAccessType(opt.PublicAccess) f.setRoot(root) f.features = (&fs.Features{ - ReadMimeType: true, - WriteMimeType: true, - BucketBased: true, - BucketBasedRootOK: true, - SetTier: true, - GetTier: true, + ReadMimeType: true, + WriteMimeType: true, + BucketBased: true, + BucketBasedRootOK: true, + SetTier: true, + GetTier: true, + ServerSideAcrossConfigs: true, }).Fill(ctx, f) if opt.DirectoryMarkers { f.features.CanHaveEmptyDirectories = true @@ -746,12 +804,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e ClientOptions: policyClientOptions, } - // Here we auth by setting one of cred, sharedKeyCred, f.svc or anonymous - var ( - cred azcore.TokenCredential - sharedKeyCred *service.SharedKeyCredential - anonymous = false - ) + // Here we auth by setting one of f.cred, f.sharedKeyCred, f.svc or f.anonymous switch { case opt.EnvAuth: // Read account from environment if needed @@ -763,7 +816,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e ClientOptions: policyClientOptions, DisableInstanceDiscovery: opt.DisableInstanceDiscovery, } - cred, err = azidentity.NewDefaultAzureCredential(&options) + f.cred, err = azidentity.NewDefaultAzureCredential(&options) if err != nil { return nil, fmt.Errorf("create azure environment credential failed: %w", err) } @@ -777,12 +830,12 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e if opt.Endpoint == "" { opt.Endpoint = emulatorBlobEndpoint } - sharedKeyCred, err = service.NewSharedKeyCredential(opt.Account, opt.Key) + f.sharedKeyCred, err = service.NewSharedKeyCredential(opt.Account, opt.Key) if err != nil { return nil, fmt.Errorf("create new shared key credential for emulator failed: %w", err) } case opt.Account != "" && opt.Key != "": - sharedKeyCred, err = service.NewSharedKeyCredential(opt.Account, opt.Key) + f.sharedKeyCred, err = service.NewSharedKeyCredential(opt.Account, opt.Key) if err != nil { return nil, fmt.Errorf("create new shared key credential failed: %w", err) } @@ -817,7 +870,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e options := azidentity.ClientSecretCredentialOptions{ ClientOptions: policyClientOptions, } - cred, err = azidentity.NewClientSecretCredential(opt.Tenant, opt.ClientID, opt.ClientSecret, &options) + f.cred, err = azidentity.NewClientSecretCredential(opt.Tenant, opt.ClientID, opt.ClientSecret, &options) if err != nil { return nil, fmt.Errorf("error creating a client secret credential: %w", err) } @@ -851,7 +904,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e ClientOptions: policyClientOptions, SendCertificateChain: opt.ClientSendCertificateChain, } - cred, err = azidentity.NewClientCertificateCredential( + f.cred, err = azidentity.NewClientCertificateCredential( opt.Tenant, opt.ClientID, certs, key, &options, ) if err != nil { @@ -866,7 +919,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e if err != nil { return nil, fmt.Errorf("user password decode failed - did you obscure it?: %w", err) } - cred, err = azidentity.NewUsernamePasswordCredential( + f.cred, err = azidentity.NewUsernamePasswordCredential( opt.Tenant, opt.ClientID, opt.Username, password, &options, ) if err != nil { @@ -885,7 +938,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e options := azidentity.ClientSecretCredentialOptions{ ClientOptions: policyClientOptions, } - cred, err = azidentity.NewClientSecretCredential(parsedCreds.Tenant, parsedCreds.AppID, parsedCreds.Password, &options) + f.cred, err = azidentity.NewClientSecretCredential(parsedCreds.Tenant, parsedCreds.AppID, parsedCreds.Password, &options) if err != nil { return nil, fmt.Errorf("error creating a client secret credential: %w", err) } @@ -907,19 +960,19 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e case opt.MSIResourceID != "": options.ID = azidentity.ResourceID(opt.MSIResourceID) } - cred, err = azidentity.NewManagedIdentityCredential(&options) + f.cred, err = azidentity.NewManagedIdentityCredential(&options) if err != nil { return nil, fmt.Errorf("failed to acquire MSI token: %w", err) } case opt.UseAZ: var options = azidentity.AzureCLICredentialOptions{} - cred, err = azidentity.NewAzureCLICredential(&options) + f.cred, err = azidentity.NewAzureCLICredential(&options) if err != nil { return nil, fmt.Errorf("failed to create Azure CLI credentials: %w", err) } case opt.Account != "": // Anonymous access - anonymous = true + f.anonymous = true default: return nil, errors.New("no authentication method configured") } @@ -937,19 +990,19 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e } opt.Endpoint = u.String() } - if sharedKeyCred != nil { + if f.sharedKeyCred != nil { // Shared key cred - f.svc, err = service.NewClientWithSharedKeyCredential(opt.Endpoint, sharedKeyCred, &clientOpt) + f.svc, err = service.NewClientWithSharedKeyCredential(opt.Endpoint, f.sharedKeyCred, &clientOpt) if err != nil { return nil, fmt.Errorf("create client with shared key failed: %w", err) } - } else if cred != nil { + } else if f.cred != nil { // Azidentity cred - f.svc, err = service.NewClient(opt.Endpoint, cred, &clientOpt) + f.svc, err = service.NewClient(opt.Endpoint, f.cred, &clientOpt) if err != nil { return nil, fmt.Errorf("create client failed: %w", err) } - } else if anonymous { + } else if f.anonymous { // Anonymous public access f.svc, err = service.NewClientWithNoCredential(opt.Endpoint, &clientOpt) if err != nil { @@ -1503,7 +1556,7 @@ func (f *Fs) makeContainer(ctx context.Context, container string) error { // When a container is deleted, a container with the same name cannot be created // for at least 30 seconds; the container may not be available for more than 30 // seconds if the service is still processing the request. - time.Sleep(6 * time.Second) // default 10 retries will be 60 seconds + time.Sleep(12 * time.Second) // default 10 retries will be 120 seconds f.cache.MarkDeleted(container) return true, err case bloberror.AuthorizationFailure: @@ -1611,6 +1664,214 @@ func (f *Fs) Purge(ctx context.Context, dir string) error { return f.deleteContainer(ctx, container) } +// getAuth gets auth to copy o. +// +// tokenOK is used to signal that token based auth (Microsoft Entra +// ID) is acceptable. +// +// This will return srcURL to read the object, which may be a SAS URL. +// +// If noAuth is set then the srcURL returned will be a plain object +// URL (not a SAS) and token will be empty. +// +// If tokenOK is true it may also return a token for the auth. +func (o *Object) getAuth(ctx context.Context, tokenOK bool, noAuth bool) (srcURL string, token *string, err error) { + f := o.fs + srcBlobSVC := o.getBlobSVC() + srcURL = srcBlobSVC.URL() + + switch { + case noAuth: + // If same storage account then no auth needed + case f.cred != nil: + if !tokenOK { + return srcURL, token, errors.New("not supported: Microsoft Entra ID") + } + options := policy.TokenRequestOptions{} + accessToken, err := f.cred.GetToken(ctx, options) + if err != nil { + return srcURL, token, fmt.Errorf("failed to create access token: %w", err) + } + token = &accessToken.Token + case f.sharedKeyCred != nil: + // Generate a short lived SAS URL if using shared key credentials + expiry := time.Now().Add(time.Hour) + sasOptions := blob.GetSASURLOptions{} + srcURL, err = srcBlobSVC.GetSASURL(sas.BlobPermissions{Read: true}, expiry, &sasOptions) + if err != nil { + return srcURL, token, fmt.Errorf("failed to create SAS URL: %w", err) + } + case f.anonymous || f.opt.SASURL != "": + // If using a SASURL or anonymous, no need for any extra auth + default: + return srcURL, token, errors.New("unknown authentication type") + } + return srcURL, token, nil +} + +// Do multipart parallel copy. +// +// This uses these APIs: +// +// - PutBlockFromURL - https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-from-url +// - PutBlockList - https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list +func (f *Fs) copyMultipart(ctx context.Context, remote, dstContainer, dstPath string, src *Object) (dst fs.Object, err error) { + srcProperties, err := src.readMetaDataAlways(ctx) + if err != nil { + return nil, fmt.Errorf("multipart copy: failed to read source object: %w", err) + } + + // Create the dst object by altering a copy of the src object + obj := *src + o := &obj + o.fs = f + o.remote = remote + + srcURL, token, err := src.getAuth(ctx, true, false) + if err != nil { + return nil, fmt.Errorf("multipart copy: %w", err) + } + + bic, err := newBlockIDCreator() + if err != nil { + return nil, err + } + + dstBlockBlobSVC := f.getBlockBlobSVC(dstContainer, dstPath) + + defer atexit.OnError(&err, func() { + // Try to abort the upload, but ignore the error. + fs.Debugf(o, "Cancelling multipart copy") + _ = o.clearUncommittedBlocks(ctx) + })() + + var ( + srcSize = src.size + partSize = int64(chunksize.Calculator(o, src.size, blockblob.MaxBlocks, f.opt.ChunkSize)) + numParts = (srcSize-1)/partSize + 1 + blockIDs = make([]string, numParts) // list of blocks for finalize + g, gCtx = errgroup.WithContext(ctx) + ) + g.SetLimit(f.opt.CopyConcurrency) + + fs.Debugf(o, "Starting multipart copy with %d parts of size %v", numParts, fs.SizeSuffix(partSize)) + for partNum := uint64(0); partNum < uint64(numParts); partNum++ { + // Fail fast, in case an errgroup managed function returns an error + // gCtx is cancelled. There is no point in uploading all the other parts. + if gCtx.Err() != nil { + break + } + partNum := partNum // for closure + g.Go(func() error { + blockID := bic.newBlockID(partNum) + options := blockblob.StageBlockFromURLOptions{ + Range: blob.HTTPRange{ + Offset: int64(partNum) * partSize, + Count: partSize, + }, + // Specifies the authorization scheme and signature for the copy source. + CopySourceAuthorization: token, + // CPKInfo *blob.CPKInfo + // CPKScopeInfo *blob.CPKScopeInfo + } + // Partial last block + if remaining := srcSize - options.Range.Offset; remaining < options.Range.Count { + options.Range.Count = remaining + } + fs.Debugf(o, "multipart copy: starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(options.Range.Count), fs.SizeSuffix(options.Range.Offset), fs.SizeSuffix(srcSize)) + err := f.pacer.Call(func() (bool, error) { + _, err := dstBlockBlobSVC.StageBlockFromURL(ctx, blockID, srcURL, &options) + if err != nil { + return f.shouldRetry(ctx, err) + } + return false, nil + }) + if err != nil { + return fmt.Errorf("multipart copy: failed to copy chunk %d with %v bytes: %w", partNum+1, -1, err) + } + blockIDs[partNum] = blockID + return nil + }) + } + err = g.Wait() + if err != nil { + return nil, err + } + + // Convert metadata from source object + options := blockblob.CommitBlockListOptions{ + Metadata: srcProperties.Metadata, + Tier: parseTier(f.opt.AccessTier), + HTTPHeaders: &blob.HTTPHeaders{ + BlobCacheControl: srcProperties.CacheControl, + BlobContentDisposition: srcProperties.ContentDisposition, + BlobContentEncoding: srcProperties.ContentEncoding, + BlobContentLanguage: srcProperties.ContentLanguage, + BlobContentMD5: srcProperties.ContentMD5, + BlobContentType: srcProperties.ContentType, + }, + } + + // Finalise the upload session + err = f.pacer.Call(func() (bool, error) { + _, err := dstBlockBlobSVC.CommitBlockList(ctx, blockIDs, &options) + return f.shouldRetry(ctx, err) + }) + if err != nil { + return nil, fmt.Errorf("failed to complete multipart copy: %w", err) + } + + fs.Debugf(o, "multipart copy finished") + return f.NewObject(ctx, remote) +} + +// Do single part copy. +// +// This uses these APIs: +// +// - Copy Blob - https://docs.microsoft.com/rest/api/storageservices/copy-blob +// - Get Blob Properties - https://docs.microsoft.com/rest/api/storageservices/get-blob-properties +func (f *Fs) copySinglepart(ctx context.Context, remote, dstContainer, dstPath string, src *Object) (dst fs.Object, err error) { + dstBlobSVC := f.getBlobSVC(dstContainer, dstPath) + + // Get the source auth - none needed for same storage account + srcURL, _, err := src.getAuth(ctx, false, f == src.fs) + if err != nil { + return nil, fmt.Errorf("single part copy: source auth: %w", err) + } + + // Start the copy + options := blob.StartCopyFromURLOptions{ + Tier: parseTier(f.opt.AccessTier), + } + var startCopy blob.StartCopyFromURLResponse + err = f.pacer.Call(func() (bool, error) { + startCopy, err = dstBlobSVC.StartCopyFromURL(ctx, srcURL, &options) + return f.shouldRetry(ctx, err) + }) + if err != nil { + return nil, fmt.Errorf("single part copy: copy blob: %w", err) + } + + // Poll for completion if necessary + // + // The for loop is never executed for same storage account copies. + copyStatus := startCopy.CopyStatus + getOptions := blob.GetPropertiesOptions{} + pollTime := 100 * time.Millisecond + for copyStatus != nil && string(*copyStatus) == string(container.CopyStatusTypePending) { + time.Sleep(pollTime) + getMetadata, err := dstBlobSVC.GetProperties(ctx, &getOptions) + if err != nil { + return nil, err + } + copyStatus = getMetadata.CopyStatus + pollTime = min(2*pollTime, time.Second) + } + + return f.NewObject(ctx, remote) +} + // Copy src to this remote using server-side copy operations. // // This is stored with the remote path given. @@ -1631,36 +1892,29 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, fs.Debugf(src, "Can't copy - not same remote type") return nil, fs.ErrorCantCopy } - dstBlobSVC := f.getBlobSVC(dstContainer, dstPath) - srcBlobSVC := srcObj.getBlobSVC() - srcURL := srcBlobSVC.URL() - options := blob.StartCopyFromURLOptions{ - Tier: parseTier(f.opt.AccessTier), - } - var startCopy blob.StartCopyFromURLResponse - err = f.pacer.Call(func() (bool, error) { - startCopy, err = dstBlobSVC.StartCopyFromURL(ctx, srcURL, &options) - return f.shouldRetry(ctx, err) - }) - if err != nil { - return nil, err + // Assume we are copying to a different storage account if we + // are copying across configs. + sameStorageAccount := f == srcObj.fs + + // If we are using Microsoft Entra ID token based auth then + // copySinglepart does not work + usingEntraID := f.cred != nil + + // Use multipart copy if size > cutoff + // or using Entra ID and we are not using the same storage account + useMultiPart := srcObj.size >= int64(f.opt.CopyCutoff) || (usingEntraID && !sameStorageAccount) + + // Force the use of copy blob if on the same storage account + // and the user hasn't forbidden it. + if f.opt.UseCopyBlob && sameStorageAccount { + useMultiPart = false } - copyStatus := startCopy.CopyStatus - getOptions := blob.GetPropertiesOptions{} - pollTime := 100 * time.Millisecond - for copyStatus != nil && string(*copyStatus) == string(container.CopyStatusTypePending) { - time.Sleep(pollTime) - getMetadata, err := dstBlobSVC.GetProperties(ctx, &getOptions) - if err != nil { - return nil, err - } - copyStatus = getMetadata.CopyStatus - pollTime = min(2*pollTime, time.Second) + if useMultiPart { + return f.copyMultipart(ctx, remote, dstContainer, dstPath, srcObj) } - - return f.NewObject(ctx, remote) + return f.copySinglepart(ctx, remote, dstContainer, dstPath, srcObj) } // ------------------------------------------------------------ diff --git a/backend/azureblob/azureblob_test.go b/backend/azureblob/azureblob_test.go index 1d498bb82..6f2b65acf 100644 --- a/backend/azureblob/azureblob_test.go +++ b/backend/azureblob/azureblob_test.go @@ -15,13 +15,17 @@ import ( // TestIntegration runs integration tests against the remote func TestIntegration(t *testing.T) { + name := "TestAzureBlob" fstests.Run(t, &fstests.Opt{ - RemoteName: "TestAzureBlob:", + RemoteName: name + ":", NilObject: (*Object)(nil), TiersToTest: []string{"Hot", "Cool", "Cold"}, ChunkedUpload: fstests.ChunkedUploadConfig{ MinChunkSize: defaultChunkSize, }, + ExtraConfig: []fstests.ExtraConfigItem{ + {Name: name, Key: "use_copy_blob", Value: "false"}, + }, }) } @@ -40,6 +44,7 @@ func TestIntegration2(t *testing.T) { }, ExtraConfig: []fstests.ExtraConfigItem{ {Name: name, Key: "directory_markers", Value: "true"}, + {Name: name, Key: "use_copy_blob", Value: "false"}, }, }) } @@ -48,8 +53,13 @@ func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) { return f.setUploadChunkSize(cs) } +func (f *Fs) SetCopyCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) { + return f.setCopyCutoff(cs) +} + var ( _ fstests.SetUploadChunkSizer = (*Fs)(nil) + _ fstests.SetCopyCutoffer = (*Fs)(nil) ) func TestValidateAccessTier(t *testing.T) {