mirror of
https://github.com/rclone/rclone.git
synced 2024-11-23 07:07:44 +08:00
azureblob: use the rclone HTTP client - fixes #2654
This enables --dump headers and --timeout to work properly.
This commit is contained in:
parent
35fba5bfdd
commit
5d1d93e163
|
@ -22,12 +22,14 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/Azure/azure-pipeline-go/pipeline"
|
||||||
"github.com/Azure/azure-storage-blob-go/azblob"
|
"github.com/Azure/azure-storage-blob-go/azblob"
|
||||||
"github.com/ncw/rclone/fs"
|
"github.com/ncw/rclone/fs"
|
||||||
"github.com/ncw/rclone/fs/accounting"
|
"github.com/ncw/rclone/fs/accounting"
|
||||||
"github.com/ncw/rclone/fs/config/configmap"
|
"github.com/ncw/rclone/fs/config/configmap"
|
||||||
"github.com/ncw/rclone/fs/config/configstruct"
|
"github.com/ncw/rclone/fs/config/configstruct"
|
||||||
"github.com/ncw/rclone/fs/fserrors"
|
"github.com/ncw/rclone/fs/fserrors"
|
||||||
|
"github.com/ncw/rclone/fs/fshttp"
|
||||||
"github.com/ncw/rclone/fs/hash"
|
"github.com/ncw/rclone/fs/hash"
|
||||||
"github.com/ncw/rclone/fs/walk"
|
"github.com/ncw/rclone/fs/walk"
|
||||||
"github.com/ncw/rclone/lib/pacer"
|
"github.com/ncw/rclone/lib/pacer"
|
||||||
|
@ -135,6 +137,7 @@ type Fs struct {
|
||||||
root string // the path we are working on if any
|
root string // the path we are working on if any
|
||||||
opt Options // parsed config options
|
opt Options // parsed config options
|
||||||
features *fs.Features // optional features
|
features *fs.Features // optional features
|
||||||
|
client *http.Client // http client we are using
|
||||||
svcURL *azblob.ServiceURL // reference to serviceURL
|
svcURL *azblob.ServiceURL // reference to serviceURL
|
||||||
cntURL *azblob.ContainerURL // reference to containerURL
|
cntURL *azblob.ContainerURL // reference to containerURL
|
||||||
container string // the container we are working on
|
container string // the container we are working on
|
||||||
|
@ -272,6 +275,38 @@ func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// httpClientFactory creates a Factory object that sends HTTP requests
|
||||||
|
// to a rclone's http.Client.
|
||||||
|
//
|
||||||
|
// copied from azblob.newDefaultHTTPClientFactory
|
||||||
|
func httpClientFactory(client *http.Client) pipeline.Factory {
|
||||||
|
return pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
|
||||||
|
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
|
||||||
|
r, err := client.Do(request.WithContext(ctx))
|
||||||
|
if err != nil {
|
||||||
|
err = pipeline.NewError(err, "HTTP request failed")
|
||||||
|
}
|
||||||
|
return pipeline.NewHTTPResponse(r), err
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// newPipeline creates a Pipeline using the specified credentials and options.
|
||||||
|
//
|
||||||
|
// this code was copied from azblob.NewPipeline
|
||||||
|
func (f *Fs) newPipeline(c azblob.Credential, o azblob.PipelineOptions) pipeline.Pipeline {
|
||||||
|
// Closest to API goes first; closest to the wire goes last
|
||||||
|
factories := []pipeline.Factory{
|
||||||
|
azblob.NewTelemetryPolicyFactory(o.Telemetry),
|
||||||
|
azblob.NewUniqueRequestIDPolicyFactory(),
|
||||||
|
azblob.NewRetryPolicyFactory(o.Retry),
|
||||||
|
c,
|
||||||
|
pipeline.MethodFactoryMarker(), // indicates at what stage in the pipeline the method factory is invoked
|
||||||
|
azblob.NewRequestLogPolicyFactory(o.RequestLog),
|
||||||
|
}
|
||||||
|
return pipeline.NewPipeline(factories, pipeline.Options{HTTPSender: httpClientFactory(f.client), Log: o.Log})
|
||||||
|
}
|
||||||
|
|
||||||
// NewFs contstructs an Fs from the path, container:path
|
// NewFs contstructs an Fs from the path, container:path
|
||||||
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
// Parse config into Options struct
|
// Parse config into Options struct
|
||||||
|
@ -307,6 +342,23 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
string(azblob.AccessTierHot), string(azblob.AccessTierCool), string(azblob.AccessTierArchive))
|
string(azblob.AccessTierHot), string(azblob.AccessTierCool), string(azblob.AccessTierArchive))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
f := &Fs{
|
||||||
|
name: name,
|
||||||
|
opt: *opt,
|
||||||
|
container: container,
|
||||||
|
root: directory,
|
||||||
|
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
|
||||||
|
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
|
||||||
|
client: fshttp.NewClient(fs.Config),
|
||||||
|
}
|
||||||
|
f.features = (&fs.Features{
|
||||||
|
ReadMimeType: true,
|
||||||
|
WriteMimeType: true,
|
||||||
|
BucketBased: true,
|
||||||
|
SetTier: true,
|
||||||
|
GetTier: true,
|
||||||
|
}).Fill(f)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
u *url.URL
|
u *url.URL
|
||||||
serviceURL azblob.ServiceURL
|
serviceURL azblob.ServiceURL
|
||||||
|
@ -323,7 +375,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "failed to make azure storage url from account and endpoint")
|
return nil, errors.Wrap(err, "failed to make azure storage url from account and endpoint")
|
||||||
}
|
}
|
||||||
pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
|
pipeline := f.newPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
|
||||||
serviceURL = azblob.NewServiceURL(*u, pipeline)
|
serviceURL = azblob.NewServiceURL(*u, pipeline)
|
||||||
containerURL = serviceURL.NewContainerURL(container)
|
containerURL = serviceURL.NewContainerURL(container)
|
||||||
case opt.SASURL != "":
|
case opt.SASURL != "":
|
||||||
|
@ -332,7 +384,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
return nil, errors.Wrapf(err, "failed to parse SAS URL")
|
return nil, errors.Wrapf(err, "failed to parse SAS URL")
|
||||||
}
|
}
|
||||||
// use anonymous credentials in case of sas url
|
// use anonymous credentials in case of sas url
|
||||||
pipeline := azblob.NewPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
|
pipeline := f.newPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
|
||||||
// Check if we have container level SAS or account level sas
|
// Check if we have container level SAS or account level sas
|
||||||
parts := azblob.NewBlobURLParts(*u)
|
parts := azblob.NewBlobURLParts(*u)
|
||||||
if parts.ContainerName != "" {
|
if parts.ContainerName != "" {
|
||||||
|
@ -349,24 +401,9 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
default:
|
default:
|
||||||
return nil, errors.New("Need account+key or connectionString or sasURL")
|
return nil, errors.New("Need account+key or connectionString or sasURL")
|
||||||
}
|
}
|
||||||
|
f.svcURL = &serviceURL
|
||||||
|
f.cntURL = &containerURL
|
||||||
|
|
||||||
f := &Fs{
|
|
||||||
name: name,
|
|
||||||
opt: *opt,
|
|
||||||
container: container,
|
|
||||||
root: directory,
|
|
||||||
svcURL: &serviceURL,
|
|
||||||
cntURL: &containerURL,
|
|
||||||
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
|
|
||||||
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
|
|
||||||
}
|
|
||||||
f.features = (&fs.Features{
|
|
||||||
ReadMimeType: true,
|
|
||||||
WriteMimeType: true,
|
|
||||||
BucketBased: true,
|
|
||||||
SetTier: true,
|
|
||||||
GetTier: true,
|
|
||||||
}).Fill(f)
|
|
||||||
if f.root != "" {
|
if f.root != "" {
|
||||||
f.root += "/"
|
f.root += "/"
|
||||||
// Check to see if the (container,directory) is actually an existing file
|
// Check to see if the (container,directory) is actually an existing file
|
||||||
|
|
Loading…
Reference in New Issue
Block a user