From eaeef4811fd3fe52b1b35217f7b8ad484fee183b Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Tue, 6 Aug 2019 22:17:40 +0100 Subject: [PATCH] swift: make all operations work from the root #3421 --- backend/swift/swift.go | 351 ++++++++++++++++++++++------------------- 1 file changed, 186 insertions(+), 165 deletions(-) diff --git a/backend/swift/swift.go b/backend/swift/swift.go index 7c1cfbe64..fccf65d57 100644 --- a/backend/swift/swift.go +++ b/backend/swift/swift.go @@ -8,10 +8,8 @@ import ( "fmt" "io" "path" - "regexp" "strconv" "strings" - "sync" "time" "github.com/ncw/swift" @@ -24,6 +22,7 @@ import ( "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/operations" "github.com/rclone/rclone/fs/walk" + "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/readers" ) @@ -208,17 +207,16 @@ type Options struct { // Fs represents a remote swift server type Fs struct { - name string // name of this remote - root string // the path we are working on if any - features *fs.Features // optional features - opt Options // options for this backend - c *swift.Connection // the connection to the swift server - container string // the container we are working on - containerOKMu sync.Mutex // mutex to protect container OK - containerOK bool // true if we have created the container - segmentsContainer string // container to store the segments (if any) in - noCheckContainer bool // don't check the container before creating it - pacer *fs.Pacer // To pace the API calls + name string // name of this remote + root string // the path we are working on if any + features *fs.Features // optional features + opt Options // options for this backend + c *swift.Connection // the connection to the swift server + rootContainer string // container part of root (if any) + rootDirectory string // directory part of root (if any) + cache *bucket.Cache // cache of container status + noCheckContainer bool // don't check the container before creating it + pacer *fs.Pacer // To pace the API calls } // Object describes a swift object @@ -243,18 +241,18 @@ func (f *Fs) Name() string { // Root of the remote (as passed into NewFs) func (f *Fs) Root() string { - if f.root == "" { - return f.container - } - return f.container + "/" + f.root + return f.root } // String converts this Fs to a string func (f *Fs) String() string { - if f.root == "" { - return fmt.Sprintf("Swift container %s", f.container) + if f.rootContainer == "" { + return fmt.Sprintf("Swift root") } - return fmt.Sprintf("Swift container %s path %s", f.container, f.root) + if f.rootDirectory == "" { + return fmt.Sprintf("Swift container %s", f.rootContainer) + } + return fmt.Sprintf("Swift container %s path %s", f.rootContainer, f.rootDirectory) } // Features returns the optional features of this Fs @@ -313,21 +311,23 @@ func shouldRetryHeaders(headers swift.Headers, err error) (bool, error) { return shouldRetry(err) } -// Pattern to match a swift path -var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`) - -// parseParse parses a swift 'url' -func parsePath(path string) (container, directory string, err error) { - parts := matcher.FindStringSubmatch(path) - if parts == nil { - err = errors.Errorf("couldn't find container in swift path %q", path) - } else { - container, directory = parts[1], parts[2] - directory = strings.Trim(directory, "/") - } +// parsePath parses a remote 'url' +func parsePath(path string) (root string) { + root = strings.Trim(path, "/") return } +// split returns container and containerPath from the rootRelativePath +// relative to f.root +func (f *Fs) split(rootRelativePath string) (container, containerPath string) { + return bucket.Split(path.Join(f.root, rootRelativePath)) +} + +// split returns container and containerPath from the object +func (o *Object) split() (container, containerPath string) { + return o.fs.split(o.remote) +} + // swiftConnection makes a connection to swift func swiftConnection(opt *Options, name string) (*swift.Connection, error) { c := &swift.Connection{ @@ -410,47 +410,48 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) return } +// setRoot changes the root of the Fs +func (f *Fs) setRoot(root string) { + f.root = parsePath(root) + f.rootContainer, f.rootDirectory = bucket.Split(f.root) +} + // NewFsWithConnection constructs an Fs from the path, container:path // and authenticated connection. // // if noCheckContainer is set then the Fs won't check the container // exists before creating it. func NewFsWithConnection(opt *Options, name, root string, c *swift.Connection, noCheckContainer bool) (fs.Fs, error) { - container, directory, err := parsePath(root) - if err != nil { - return nil, err - } f := &Fs{ - name: name, - opt: *opt, - c: c, - container: container, - segmentsContainer: container + "_segments", - root: directory, - noCheckContainer: noCheckContainer, - pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))), + name: name, + opt: *opt, + c: c, + noCheckContainer: noCheckContainer, + pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))), + cache: bucket.NewCache(), } + f.setRoot(root) f.features = (&fs.Features{ - ReadMimeType: true, - WriteMimeType: true, - BucketBased: true, + ReadMimeType: true, + WriteMimeType: true, + BucketBased: true, + BucketBasedRootOK: true, }).Fill(f) - if f.root != "" { - f.root += "/" + if f.rootContainer != "" && f.rootDirectory != "" { // Check to see if the object exists - ignoring directory markers var info swift.Object + var err error err = f.pacer.Call(func() (bool, error) { var rxHeaders swift.Headers - info, rxHeaders, err = f.c.Object(container, directory) + info, rxHeaders, err = f.c.Object(f.rootContainer, f.rootDirectory) return shouldRetryHeaders(rxHeaders, err) }) if err == nil && info.ContentType != directoryMarkerContentType { - f.root = path.Dir(directory) - if f.root == "." { - f.root = "" - } else { - f.root += "/" + newRoot := path.Dir(f.root) + if newRoot == "." { + newRoot = "" } + f.setRoot(newRoot) // return an error with an fs which points to the parent return f, fs.ErrorIsFile } @@ -518,23 +519,26 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { type listFn func(remote string, object *swift.Object, isDirectory bool) error // listContainerRoot lists the objects into the function supplied from -// the container and root supplied +// the container and directory supplied. The remote has prefix +// removed from it and if addContainer is set then it adds the +// container to the start. // // Set recurse to read sub directories -func (f *Fs) listContainerRoot(container, root string, dir string, recurse bool, fn listFn) error { - prefix := root - if dir != "" { - prefix += dir + "/" +func (f *Fs) listContainerRoot(container, directory, prefix string, addContainer bool, recurse bool, fn listFn) error { + if prefix != "" { + prefix += "/" + } + if directory != "" { + directory += "/" } // Options for ObjectsWalk opts := swift.ObjectsOpts{ - Prefix: prefix, + Prefix: directory, Limit: listChunks, } if !recurse { opts.Delimiter = '/' } - rootLength := len(root) return f.c.ObjectsWalk(container, &opts, func(opts *swift.ObjectsOpts) (interface{}, error) { var objects []swift.Object var err error @@ -559,7 +563,10 @@ func (f *Fs) listContainerRoot(container, root string, dir string, recurse bool, // duplicate directories. Ignore them here. continue } - remote := object.Name[rootLength:] + remote := object.Name[len(prefix):] + if addContainer { + remote = path.Join(container, remote) + } err = fn(remote, object, isDirectory) if err != nil { break @@ -573,8 +580,8 @@ func (f *Fs) listContainerRoot(container, root string, dir string, recurse bool, type addEntryFn func(fs.DirEntry) error // list the objects into the function supplied -func (f *Fs) list(dir string, recurse bool, fn addEntryFn) error { - err := f.listContainerRoot(f.container, f.root, dir, recurse, func(remote string, object *swift.Object, isDirectory bool) (err error) { +func (f *Fs) list(container, directory, prefix string, addContainer bool, recurse bool, fn addEntryFn) error { + err := f.listContainerRoot(container, directory, prefix, addContainer, recurse, func(remote string, object *swift.Object, isDirectory bool) (err error) { if isDirectory { remote = strings.TrimRight(remote, "/") d := fs.NewDir(remote, time.Time{}).SetSize(object.Bytes) @@ -598,22 +605,13 @@ func (f *Fs) list(dir string, recurse bool, fn addEntryFn) error { return err } -// mark the container as being OK -func (f *Fs) markContainerOK() { - if f.container != "" { - f.containerOKMu.Lock() - f.containerOK = true - f.containerOKMu.Unlock() - } -} - // listDir lists a single directory -func (f *Fs) listDir(dir string) (entries fs.DirEntries, err error) { - if f.container == "" { +func (f *Fs) listDir(container, directory, prefix string, addContainer bool) (entries fs.DirEntries, err error) { + if container == "" { return nil, fs.ErrorListBucketRequired } // List the objects - err = f.list(dir, false, func(entry fs.DirEntry) error { + err = f.list(container, directory, prefix, addContainer, false, func(entry fs.DirEntry) error { entries = append(entries, entry) return nil }) @@ -621,7 +619,7 @@ func (f *Fs) listDir(dir string) (entries fs.DirEntries, err error) { return nil, err } // container must be present if listing succeeded - f.markContainerOK() + f.cache.MarkOK(container) return entries, nil } @@ -639,6 +637,7 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) { return nil, errors.Wrap(err, "container listing failed") } for _, container := range containers { + f.cache.MarkOK(container.Name) d := fs.NewDir(container.Name, time.Time{}).SetSize(container.Bytes).SetItems(container.Count) entries = append(entries, d) } @@ -655,10 +654,11 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) { // This should return ErrDirNotFound if the directory isn't // found. func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { - if f.container == "" { - return f.listContainers(dir) + container, directory := f.split(dir) + if container == "" { + return f.listContainers(directory) } - return f.listDir(dir) + return f.listDir(container, directory, f.rootDirectory, f.rootContainer == "") } // ListR lists the objects and directories of the Fs starting @@ -676,20 +676,39 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e // immediately. // // Don't implement this unless you have a more efficient way -// of listing recursively that doing a directory traversal. +// of listing recursively than doing a directory traversal. func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { - if f.container == "" { - return errors.New("container needed for recursive list") - } + container, directory := f.split(dir) list := walk.NewListRHelper(callback) - err = f.list(dir, true, func(entry fs.DirEntry) error { - return list.Add(entry) - }) - if err != nil { - return err + listR := func(container, directory, prefix string, addContainer bool) error { + return f.list(container, directory, prefix, addContainer, true, func(entry fs.DirEntry) error { + return list.Add(entry) + }) + } + if container == "" { + entries, err := f.listContainers("") + if err != nil { + return err + } + for _, entry := range entries { + err = list.Add(entry) + if err != nil { + return err + } + container := entry.Remote() + err = listR(container, "", f.rootDirectory, true) + if err != nil { + return err + } + } + } else { + err = listR(container, directory, f.rootDirectory, f.rootContainer == "") + if err != nil { + return err + } } // container must be present if listing succeeded - f.markContainerOK() + f.cache.MarkOK(container) return list.Flush() } @@ -738,57 +757,52 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt // Mkdir creates the container if it doesn't exist func (f *Fs) Mkdir(ctx context.Context, dir string) error { - f.containerOKMu.Lock() - defer f.containerOKMu.Unlock() - if f.containerOK { - return nil - } - // if we are at the root, then it is OK - if f.container == "" { - return nil - } - // Check to see if container exists first - var err error = swift.ContainerNotFound - if !f.noCheckContainer { - err = f.pacer.Call(func() (bool, error) { - var rxHeaders swift.Headers - _, rxHeaders, err = f.c.Container(f.container) - return shouldRetryHeaders(rxHeaders, err) - }) - } - if err == swift.ContainerNotFound { - headers := swift.Headers{} - if f.opt.StoragePolicy != "" { - headers["X-Storage-Policy"] = f.opt.StoragePolicy + container, _ := f.split(dir) + return f.cache.Create(container, func() error { + // Check to see if container exists first + var err error = swift.ContainerNotFound + if !f.noCheckContainer { + err = f.pacer.Call(func() (bool, error) { + var rxHeaders swift.Headers + _, rxHeaders, err = f.c.Container(container) + return shouldRetryHeaders(rxHeaders, err) + }) } - err = f.pacer.Call(func() (bool, error) { - err = f.c.ContainerCreate(f.container, headers) - return shouldRetry(err) - }) - } - if err == nil { - f.containerOK = true - } - return err + if err == swift.ContainerNotFound { + headers := swift.Headers{} + if f.opt.StoragePolicy != "" { + headers["X-Storage-Policy"] = f.opt.StoragePolicy + } + err = f.pacer.Call(func() (bool, error) { + err = f.c.ContainerCreate(container, headers) + return shouldRetry(err) + }) + if err == nil { + fs.Infof(f, "Container %q created", container) + } + } + return err + }, nil) } // Rmdir deletes the container if the fs is at the root // // Returns an error if it isn't empty func (f *Fs) Rmdir(ctx context.Context, dir string) error { - f.containerOKMu.Lock() - defer f.containerOKMu.Unlock() - if f.root != "" || dir != "" { + container, directory := f.split(dir) + if container == "" || directory != "" { return nil } - var err error - err = f.pacer.Call(func() (bool, error) { - err = f.c.ContainerDelete(f.container) - return shouldRetry(err) + err := f.cache.Remove(container, func() error { + err := f.pacer.Call(func() (bool, error) { + err := f.c.ContainerDelete(container) + return shouldRetry(err) + }) + if err == nil { + fs.Infof(f, "Container %q removed", container) + } + return err }) - if err == nil { - f.containerOK = false - } return err } @@ -807,7 +821,7 @@ func (f *Fs) Purge(ctx context.Context) error { go func() { delErr <- operations.DeleteFiles(ctx, toBeDeleted) }() - err := f.list("", true, func(entry fs.DirEntry) error { + err := f.list(f.rootContainer, f.rootDirectory, f.rootDirectory, f.rootContainer == "", true, func(entry fs.DirEntry) error { if o, ok := entry.(*Object); ok { toBeDeleted <- o } @@ -834,6 +848,7 @@ func (f *Fs) Purge(ctx context.Context) error { // // If it isn't possible then return fs.ErrorCantCopy func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { + dstContainer, dstPath := f.split(remote) err := f.Mkdir(ctx, "") if err != nil { return nil, err @@ -843,10 +858,10 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, fs.Debugf(src, "Can't copy - not same remote type") return nil, fs.ErrorCantCopy } - srcFs := srcObj.fs + srcContainer, srcPath := srcObj.split() err = f.pacer.Call(func() (bool, error) { var rxHeaders swift.Headers - rxHeaders, err = f.c.ObjectCopy(srcFs.container, srcFs.root+srcObj.remote, f.container, f.root+remote, nil) + rxHeaders, err = f.c.ObjectCopy(srcContainer, srcPath, dstContainer, dstPath, nil) return shouldRetryHeaders(rxHeaders, err) }) if err != nil { @@ -955,8 +970,9 @@ func (o *Object) readMetaData() (err error) { } var info swift.Object var h swift.Headers + container, containerPath := o.split() err = o.fs.pacer.Call(func() (bool, error) { - info, h, err = o.fs.c.Object(o.fs.container, o.fs.root+o.remote) + info, h, err = o.fs.c.Object(container, containerPath) return shouldRetryHeaders(h, err) }) if err != nil { @@ -1013,8 +1029,9 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error { newHeaders[k] = v } } + container, containerPath := o.split() return o.fs.pacer.Call(func() (bool, error) { - err = o.fs.c.ObjectUpdate(o.fs.container, o.fs.root+o.remote, newHeaders) + err = o.fs.c.ObjectUpdate(container, containerPath, newHeaders) return shouldRetry(err) }) } @@ -1032,9 +1049,10 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read fs.FixRangeOption(options, o.size) headers := fs.OpenOptionHeaders(options) _, isRanging := headers["Range"] + container, containerPath := o.split() err = o.fs.pacer.Call(func() (bool, error) { var rxHeaders swift.Headers - in, rxHeaders, err = o.fs.c.ObjectOpen(o.fs.container, o.fs.root+o.remote, !isRanging, headers) + in, rxHeaders, err = o.fs.c.ObjectOpen(container, containerPath, !isRanging, headers) return shouldRetryHeaders(rxHeaders, err) }) return @@ -1052,20 +1070,20 @@ func min(x, y int64) int64 { // // if except is passed in then segments with that prefix won't be deleted func (o *Object) removeSegments(except string) error { - segmentsRoot := o.fs.root + o.remote + "/" - err := o.fs.listContainerRoot(o.fs.segmentsContainer, segmentsRoot, "", true, func(remote string, object *swift.Object, isDirectory bool) error { + container, containerPath := o.split() + segmentsContainer := container + "_segments" + err := o.fs.listContainerRoot(segmentsContainer, containerPath, "", false, true, func(remote string, object *swift.Object, isDirectory bool) error { if isDirectory { return nil } if except != "" && strings.HasPrefix(remote, except) { - // fs.Debugf(o, "Ignoring current segment file %q in container %q", segmentsRoot+remote, o.fs.segmentsContainer) + // fs.Debugf(o, "Ignoring current segment file %q in container %q", segmentsRoot+remote, segmentsContainer) return nil } - segmentPath := segmentsRoot + remote - fs.Debugf(o, "Removing segment file %q in container %q", segmentPath, o.fs.segmentsContainer) + fs.Debugf(o, "Removing segment file %q in container %q", remote, segmentsContainer) var err error return o.fs.pacer.Call(func() (bool, error) { - err = o.fs.c.ObjectDelete(o.fs.segmentsContainer, segmentPath) + err = o.fs.c.ObjectDelete(segmentsContainer, remote) return shouldRetry(err) }) }) @@ -1074,11 +1092,11 @@ func (o *Object) removeSegments(except string) error { } // remove the segments container if empty, ignore errors err = o.fs.pacer.Call(func() (bool, error) { - err = o.fs.c.ContainerDelete(o.fs.segmentsContainer) + err = o.fs.c.ContainerDelete(segmentsContainer) return shouldRetry(err) }) if err == nil { - fs.Debugf(o, "Removed empty container %q", o.fs.segmentsContainer) + fs.Debugf(o, "Removed empty container %q", segmentsContainer) } return nil } @@ -1103,11 +1121,13 @@ func urlEncode(str string) string { // updateChunks updates the existing object using chunks to a separate // container. It returns a string which prefixes current segments. func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, contentType string) (string, error) { + container, containerPath := o.split() + segmentsContainer := container + "_segments" // Create the segmentsContainer if it doesn't exist var err error err = o.fs.pacer.Call(func() (bool, error) { var rxHeaders swift.Headers - _, rxHeaders, err = o.fs.c.Container(o.fs.segmentsContainer) + _, rxHeaders, err = o.fs.c.Container(segmentsContainer) return shouldRetryHeaders(rxHeaders, err) }) if err == swift.ContainerNotFound { @@ -1116,7 +1136,7 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, headers["X-Storage-Policy"] = o.fs.opt.StoragePolicy } err = o.fs.pacer.Call(func() (bool, error) { - err = o.fs.c.ContainerCreate(o.fs.segmentsContainer, headers) + err = o.fs.c.ContainerCreate(segmentsContainer, headers) return shouldRetry(err) }) } @@ -1127,7 +1147,7 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, left := size i := 0 uniquePrefix := fmt.Sprintf("%s/%d", swift.TimeToFloatString(time.Now()), size) - segmentsPath := fmt.Sprintf("%s%s/%s", o.fs.root, o.remote, uniquePrefix) + segmentsPath := path.Join(containerPath, uniquePrefix) in := bufio.NewReader(in0) segmentInfos := make([]string, 0, ((size / int64(o.fs.opt.ChunkSize)) + 1)) for { @@ -1136,7 +1156,7 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, if left > 0 { return "", err // read less than expected } - fs.Debugf(o, "Uploading segments into %q seems done (%v)", o.fs.segmentsContainer, err) + fs.Debugf(o, "Uploading segments into %q seems done (%v)", segmentsContainer, err) break } n := int64(o.fs.opt.ChunkSize) @@ -1147,46 +1167,45 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, } segmentReader := io.LimitReader(in, n) segmentPath := fmt.Sprintf("%s/%08d", segmentsPath, i) - fs.Debugf(o, "Uploading segment file %q into %q", segmentPath, o.fs.segmentsContainer) + fs.Debugf(o, "Uploading segment file %q into %q", segmentPath, segmentsContainer) err = o.fs.pacer.CallNoRetry(func() (bool, error) { var rxHeaders swift.Headers - rxHeaders, err = o.fs.c.ObjectPut(o.fs.segmentsContainer, segmentPath, segmentReader, true, "", "", headers) + rxHeaders, err = o.fs.c.ObjectPut(segmentsContainer, segmentPath, segmentReader, true, "", "", headers) if err == nil { segmentInfos = append(segmentInfos, segmentPath) } return shouldRetryHeaders(rxHeaders, err) }) if err != nil { - deleteChunks(o, segmentInfos) + deleteChunks(o, segmentsContainer, segmentInfos) segmentInfos = nil return "", err } i++ } // Upload the manifest - headers["X-Object-Manifest"] = urlEncode(fmt.Sprintf("%s/%s", o.fs.segmentsContainer, segmentsPath)) + headers["X-Object-Manifest"] = urlEncode(fmt.Sprintf("%s/%s", segmentsContainer, segmentsPath)) headers["Content-Length"] = "0" // set Content-Length as we know it emptyReader := bytes.NewReader(nil) - manifestName := o.fs.root + o.remote err = o.fs.pacer.Call(func() (bool, error) { var rxHeaders swift.Headers - rxHeaders, err = o.fs.c.ObjectPut(o.fs.container, manifestName, emptyReader, true, "", contentType, headers) + rxHeaders, err = o.fs.c.ObjectPut(container, containerPath, emptyReader, true, "", contentType, headers) return shouldRetryHeaders(rxHeaders, err) }) if err != nil { - deleteChunks(o, segmentInfos) + deleteChunks(o, segmentsContainer, segmentInfos) segmentInfos = nil } return uniquePrefix + "/", err } -func deleteChunks(o *Object, segmentInfos []string) { +func deleteChunks(o *Object, segmentsContainer string, segmentInfos []string) { if segmentInfos != nil && len(segmentInfos) > 0 { for _, v := range segmentInfos { - fs.Debugf(o, "Delete segment file %q on %q", v, o.fs.segmentsContainer) - e := o.fs.c.ObjectDelete(o.fs.segmentsContainer, v) + fs.Debugf(o, "Delete segment file %q on %q", v, segmentsContainer) + e := o.fs.c.ObjectDelete(segmentsContainer, v) if e != nil { - fs.Errorf(o, "Error occured in delete segment file %q on %q , error: %q", v, o.fs.segmentsContainer, e) + fs.Errorf(o, "Error occured in delete segment file %q on %q , error: %q", v, segmentsContainer, e) } } } @@ -1196,8 +1215,9 @@ func deleteChunks(o *Object, segmentInfos []string) { // // The new object may have been created if an error is returned func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { - if o.fs.container == "" { - return fserrors.FatalError(errors.New("container name needed in remote")) + container, containerPath := o.split() + if container == "" { + return fserrors.FatalError(errors.New("can't upload files to the root")) } err := o.fs.Mkdir(ctx, "") if err != nil { @@ -1235,7 +1255,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op } var rxHeaders swift.Headers err = o.fs.pacer.CallNoRetry(func() (bool, error) { - rxHeaders, err = o.fs.c.ObjectPut(o.fs.container, o.fs.root+o.remote, in, true, "", contentType, headers) + rxHeaders, err = o.fs.c.ObjectPut(container, containerPath, in, true, "", contentType, headers) return shouldRetryHeaders(rxHeaders, err) }) if err != nil { @@ -1268,13 +1288,14 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // Remove an object func (o *Object) Remove(ctx context.Context) error { + container, containerPath := o.split() isDynamicLargeObject, err := o.isDynamicLargeObject() if err != nil { return err } // Remove file/manifest first err = o.fs.pacer.Call(func() (bool, error) { - err = o.fs.c.ObjectDelete(o.fs.container, o.fs.root+o.remote) + err = o.fs.c.ObjectDelete(container, containerPath) return shouldRetry(err) }) if err != nil {