From 6f16588123d72f58fdd44cbadcb6e9a87fbd6fe7 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 22 Aug 2019 21:30:55 +0100 Subject: [PATCH] s3,b2,googlecloudstorage,swift,qingstor,azureblob: fixes after code review #3421 - change the interface of listBuckets() removing dir parameter and adding context - add makeBucket() and use in place of Mkdir("") - this fixes some corner cases in Copy/Update - mark all the listed buckets OK in ListR Thanks to @yparitcher for the review. --- backend/azureblob/azureblob.go | 28 ++++++++++------ backend/b2/b2.go | 33 +++++++++++-------- .../googlecloudstorage/googlecloudstorage.go | 27 +++++++++------ backend/qingstor/qingstor.go | 28 +++++++++------- backend/s3/s3.go | 27 +++++++++------ backend/swift/swift.go | 27 +++++++++------ 6 files changed, 106 insertions(+), 64 deletions(-) diff --git a/backend/azureblob/azureblob.go b/backend/azureblob/azureblob.go index 18dff3cc7..b379c4767 100644 --- a/backend/azureblob/azureblob.go +++ b/backend/azureblob/azureblob.go @@ -654,10 +654,7 @@ func (f *Fs) listDir(ctx context.Context, container, directory, prefix string, a } // listContainers returns all the containers to out -func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) { - if dir != "" { - return nil, fs.ErrorListBucketRequired - } +func (f *Fs) listContainers(ctx context.Context) (entries fs.DirEntries, err error) { if f.isLimited { f.cntURLcacheMu.Lock() for container := range f.cntURLcache { @@ -691,7 +688,10 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) { func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { container, directory := f.split(dir) if container == "" { - return f.listContainers(directory) + if directory != "" { + return nil, fs.ErrorListBucketRequired + } + return f.listContainers(ctx) } return f.listDir(ctx, container, directory, f.rootDirectory, f.rootContainer == "") } @@ -725,7 +725,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( }) } if container == "" { - entries, err := f.listContainers("") + entries, err := f.listContainers(ctx) if err != nil { return err } @@ -739,15 +739,17 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( if err != nil { return err } + // container must be present if listing succeeded + f.cache.MarkOK(container) } } else { err = listR(container, directory, f.rootDirectory, f.rootContainer == "") if err != nil { return err } + // container must be present if listing succeeded + f.cache.MarkOK(container) } - // container must be present if listing succeeded - f.cache.MarkOK(container) return list.Flush() } @@ -800,6 +802,11 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options . // Mkdir creates the container if it doesn't exist func (f *Fs) Mkdir(ctx context.Context, dir string) error { container, _ := f.split(dir) + return f.makeContainer(ctx, container) +} + +// makeContainer creates the container if it doesn't exist +func (f *Fs) makeContainer(ctx context.Context, container string) error { return f.cache.Create(container, func() error { // now try to create the container return f.pacer.Call(func() (bool, error) { @@ -913,7 +920,7 @@ func (f *Fs) Purge(ctx context.Context) error { // If it isn't possible then return fs.ErrorCantCopy func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { dstContainer, dstPath := f.split(remote) - err := f.Mkdir(ctx, "") + err := f.makeContainer(ctx, dstContainer) if err != nil { return nil, err } @@ -1362,7 +1369,8 @@ outer: // // The new object may have been created if an error is returned func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) { - err = o.fs.Mkdir(ctx, "") + container, _ := o.split() + err = o.fs.makeContainer(ctx, container) if err != nil { return err } diff --git a/backend/b2/b2.go b/backend/b2/b2.go index 4db5dc692..cbdcd9ace 100644 --- a/backend/b2/b2.go +++ b/backend/b2/b2.go @@ -726,10 +726,7 @@ func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addB } // listBuckets returns all the buckets to out -func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) { - if dir != "" { - return nil, fs.ErrorListBucketRequired - } +func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) { err = f.listBucketsToFn(func(bucket *api.Bucket) error { d := fs.NewDir(bucket.Name, time.Time{}) entries = append(entries, d) @@ -753,7 +750,10 @@ func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) { func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { bucket, directory := f.split(dir) if bucket == "" { - return f.listBuckets(directory) + if directory != "" { + return nil, fs.ErrorListBucketRequired + } + return f.listBuckets(ctx) } return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "") } @@ -788,7 +788,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( }) } if bucket == "" { - entries, err := f.listBuckets("") + entries, err := f.listBuckets(ctx) if err != nil { return err } @@ -802,15 +802,17 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( if err != nil { return err } + // bucket must be present if listing succeeded + f.cache.MarkOK(bucket) } } else { err = listR(bucket, directory, f.rootDirectory, f.rootBucket == "") if err != nil { return err } + // bucket must be present if listing succeeded + f.cache.MarkOK(bucket) } - // bucket must be present if listing succeeded - f.cache.MarkOK(bucket) return list.Flush() } @@ -951,6 +953,11 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt // Mkdir creates the bucket if it doesn't exist func (f *Fs) Mkdir(ctx context.Context, dir string) error { bucket, _ := f.split(dir) + return f.makeBucket(ctx, bucket) +} + +// makeBucket creates the bucket if it doesn't exist +func (f *Fs) makeBucket(ctx context.Context, bucket string) error { return f.cache.Create(bucket, func() error { opts := rest.Opts{ Method: "POST", @@ -1189,7 +1196,7 @@ func (f *Fs) CleanUp(ctx context.Context) error { // If it isn't possible then return fs.ErrorCantCopy func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { dstBucket, dstPath := f.split(remote) - err := f.Mkdir(ctx, "") + err := f.makeBucket(ctx, dstBucket) if err != nil { return nil, err } @@ -1670,13 +1677,13 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op if o.fs.opt.Versions { return errNotWithVersions } - err = o.fs.Mkdir(ctx, "") - if err != nil { - return err - } size := src.Size() bucket, bucketPath := o.split() + err = o.fs.makeBucket(ctx, bucket) + if err != nil { + return err + } if size == -1 { // Check if the file is large enough for a chunked upload (needs to be at least two chunks) buf := o.fs.getUploadBlock() diff --git a/backend/googlecloudstorage/googlecloudstorage.go b/backend/googlecloudstorage/googlecloudstorage.go index ddc162867..56fd0e49a 100644 --- a/backend/googlecloudstorage/googlecloudstorage.go +++ b/backend/googlecloudstorage/googlecloudstorage.go @@ -597,10 +597,7 @@ func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addB } // listBuckets lists the buckets -func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) { - if dir != "" { - return nil, fs.ErrorListBucketRequired - } +func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) { if f.opt.ProjectNumber == "" { return nil, errors.New("can't list buckets without project number") } @@ -638,7 +635,10 @@ func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) { func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { bucket, directory := f.split(dir) if bucket == "" { - return f.listBuckets(dir) + if directory != "" { + return nil, fs.ErrorListBucketRequired + } + return f.listBuckets(ctx) } return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "") } @@ -672,7 +672,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( }) } if bucket == "" { - entries, err := f.listBuckets("") + entries, err := f.listBuckets(ctx) if err != nil { return err } @@ -686,15 +686,17 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( if err != nil { return err } + // bucket must be present if listing succeeded + f.cache.MarkOK(bucket) } } else { err = listR(bucket, directory, f.rootDirectory, f.rootBucket == "") if err != nil { return err } + // bucket must be present if listing succeeded + f.cache.MarkOK(bucket) } - // bucket must be present if listing succeeded - f.cache.MarkOK(bucket) return list.Flush() } @@ -720,6 +722,11 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt // Mkdir creates the bucket if it doesn't exist func (f *Fs) Mkdir(ctx context.Context, dir string) (err error) { bucket, _ := f.split(dir) + return f.makeBucket(ctx, bucket) +} + +// makeBucket creates the bucket if it doesn't exist +func (f *Fs) makeBucket(ctx context.Context, bucket string) (err error) { return f.cache.Create(bucket, func() error { // List something from the bucket to see if it exists. Doing it like this enables the use of a // service account that only has the "Storage Object Admin" role. See #2193 for details. @@ -798,7 +805,7 @@ func (f *Fs) Precision() time.Duration { // If it isn't possible then return fs.ErrorCantCopy func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { dstBucket, dstPath := f.split(remote) - err := f.Mkdir(ctx, "") + err := f.makeBucket(ctx, dstBucket) if err != nil { return nil, err } @@ -1006,7 +1013,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read // The new object may have been created if an error is returned func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { bucket, bucketPath := o.split() - err := o.fs.Mkdir(ctx, "") + err := o.fs.makeBucket(ctx, bucket) if err != nil { return err } diff --git a/backend/qingstor/qingstor.go b/backend/qingstor/qingstor.go index 5acb536c9..77a712dd5 100644 --- a/backend/qingstor/qingstor.go +++ b/backend/qingstor/qingstor.go @@ -426,7 +426,7 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options . // If it isn't possible then return fs.ErrorCantCopy func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { dstBucket, dstPath := f.split(remote) - err := f.Mkdir(ctx, "") + err := f.makeBucket(ctx, dstBucket) if err != nil { return nil, err } @@ -636,11 +636,7 @@ func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addB } // listBuckets lists the buckets to out -func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) { - if dir != "" { - return nil, fs.ErrorListBucketRequired - } - +func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) { req := qs.ListBucketsInput{ Location: &f.zone, } @@ -668,7 +664,10 @@ func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) { func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { bucket, directory := f.split(dir) if bucket == "" { - return f.listBuckets(dir) + if directory != "" { + return nil, fs.ErrorListBucketRequired + } + return f.listBuckets(ctx) } return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "") } @@ -702,7 +701,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( }) } if bucket == "" { - entries, err := f.listBuckets("") + entries, err := f.listBuckets(ctx) if err != nil { return err } @@ -716,21 +715,28 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( if err != nil { return err } + // bucket must be present if listing succeeded + f.cache.MarkOK(bucket) } } else { err = listR(bucket, directory, f.rootDirectory, f.rootBucket == "") if err != nil { return err } + // bucket must be present if listing succeeded + f.cache.MarkOK(bucket) } - // bucket must be present if listing succeeded - f.cache.MarkOK(bucket) return list.Flush() } // Mkdir creates the bucket if it doesn't exist func (f *Fs) Mkdir(ctx context.Context, dir string) error { bucket, _ := f.split(dir) + return f.makeBucket(ctx, bucket) +} + +// makeBucket creates the bucket if it doesn't exist +func (f *Fs) makeBucket(ctx context.Context, bucket string) error { return f.cache.Create(bucket, func() error { bucketInit, err := f.svc.Bucket(bucket, f.zone) if err != nil { @@ -967,7 +973,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadClo func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { // The maximum size of upload object is multipartUploadSize * MaxMultipleParts bucket, bucketPath := o.split() - err := o.fs.Mkdir(ctx, "") + err := o.fs.makeBucket(ctx, bucket) if err != nil { return err } diff --git a/backend/s3/s3.go b/backend/s3/s3.go index 2ea83638d..5fdc731d5 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -1347,10 +1347,7 @@ func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addB } // listBuckets lists the buckets to out -func (f *Fs) listBuckets(ctx context.Context, dir string) (entries fs.DirEntries, err error) { - if dir != "" { - return nil, fs.ErrorListBucketRequired - } +func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) { req := s3.ListBucketsInput{} var resp *s3.ListBucketsOutput err = f.pacer.Call(func() (bool, error) { @@ -1381,7 +1378,10 @@ func (f *Fs) listBuckets(ctx context.Context, dir string) (entries fs.DirEntries func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { bucket, directory := f.split(dir) if bucket == "" { - return f.listBuckets(ctx, dir) + if directory != "" { + return nil, fs.ErrorListBucketRequired + } + return f.listBuckets(ctx) } return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "") } @@ -1415,7 +1415,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( }) } if bucket == "" { - entries, err := f.listBuckets(ctx, "") + entries, err := f.listBuckets(ctx) if err != nil { return err } @@ -1429,15 +1429,17 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( if err != nil { return err } + // bucket must be present if listing succeeded + f.cache.MarkOK(bucket) } } else { err = listR(bucket, directory, f.rootDirectory, f.rootBucket == "") if err != nil { return err } + // bucket must be present if listing succeeded + f.cache.MarkOK(bucket) } - // bucket must be present if listing succeeded - f.cache.MarkOK(bucket) return list.Flush() } @@ -1481,6 +1483,11 @@ func (f *Fs) bucketExists(ctx context.Context, bucket string) (bool, error) { // Mkdir creates the bucket if it doesn't exist func (f *Fs) Mkdir(ctx context.Context, dir string) error { bucket, _ := f.split(dir) + return f.makeBucket(ctx, bucket) +} + +// makeBucket creates the bucket if it doesn't exist +func (f *Fs) makeBucket(ctx context.Context, bucket string) error { return f.cache.Create(bucket, func() error { req := s3.CreateBucketInput{ Bucket: &bucket, @@ -1554,7 +1561,7 @@ func pathEscape(s string) string { // If it isn't possible then return fs.ErrorCantCopy func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { dstBucket, dstPath := f.split(remote) - err := f.Mkdir(ctx, "") + err := f.makeBucket(ctx, dstBucket) if err != nil { return nil, err } @@ -1813,7 +1820,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read // Update the Object from in with modTime and size func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { bucket, bucketPath := o.split() - err := o.fs.Mkdir(ctx, "") + err := o.fs.makeBucket(ctx, bucket) if err != nil { return err } diff --git a/backend/swift/swift.go b/backend/swift/swift.go index fccf65d57..d207ef68f 100644 --- a/backend/swift/swift.go +++ b/backend/swift/swift.go @@ -624,10 +624,7 @@ func (f *Fs) listDir(container, directory, prefix string, addContainer bool) (en } // listContainers lists the containers -func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) { - if dir != "" { - return nil, fs.ErrorListBucketRequired - } +func (f *Fs) listContainers(ctx context.Context) (entries fs.DirEntries, err error) { var containers []swift.Container err = f.pacer.Call(func() (bool, error) { containers, err = f.c.ContainersAll(nil) @@ -656,7 +653,10 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) { func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { container, directory := f.split(dir) if container == "" { - return f.listContainers(directory) + if directory != "" { + return nil, fs.ErrorListBucketRequired + } + return f.listContainers(ctx) } return f.listDir(container, directory, f.rootDirectory, f.rootContainer == "") } @@ -686,7 +686,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( }) } if container == "" { - entries, err := f.listContainers("") + entries, err := f.listContainers(ctx) if err != nil { return err } @@ -700,15 +700,17 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( if err != nil { return err } + // container must be present if listing succeeded + f.cache.MarkOK(container) } } else { err = listR(container, directory, f.rootDirectory, f.rootContainer == "") if err != nil { return err } + // container must be present if listing succeeded + f.cache.MarkOK(container) } - // container must be present if listing succeeded - f.cache.MarkOK(container) return list.Flush() } @@ -758,6 +760,11 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt // Mkdir creates the container if it doesn't exist func (f *Fs) Mkdir(ctx context.Context, dir string) error { container, _ := f.split(dir) + return f.makeContainer(ctx, container) +} + +// makeContainer creates the container if it doesn't exist +func (f *Fs) makeContainer(ctx context.Context, container string) error { return f.cache.Create(container, func() error { // Check to see if container exists first var err error = swift.ContainerNotFound @@ -849,7 +856,7 @@ func (f *Fs) Purge(ctx context.Context) error { // If it isn't possible then return fs.ErrorCantCopy func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { dstContainer, dstPath := f.split(remote) - err := f.Mkdir(ctx, "") + err := f.makeContainer(ctx, dstContainer) if err != nil { return nil, err } @@ -1219,7 +1226,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op if container == "" { return fserrors.FatalError(errors.New("can't upload files to the root")) } - err := o.fs.Mkdir(ctx, "") + err := o.fs.makeContainer(ctx, container) if err != nil { return err }