From f2c0f82fc6f5390cc30db612ce457ece45286f15 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Tue, 16 Mar 2021 15:50:02 +0000 Subject: [PATCH] backends: Add context checking to remaining backends #4504 This is a follow up to 4013bc4a4cd768e2 which missed some backends. It adds a ctx parameter to shouldRetry and checks it. --- backend/drive/drive.go | 143 ++++++++++++++------------- backend/drive/drive_internal_test.go | 3 +- backend/drive/upload.go | 4 +- backend/mailru/mailru.go | 29 +++--- backend/mega/mega.go | 128 ++++++++++++------------ backend/onedrive/onedrive.go | 45 +++++---- backend/s3/s3.go | 57 ++++++----- backend/swift/swift.go | 43 ++++---- backend/swift/swift_internal_test.go | 6 +- 9 files changed, 241 insertions(+), 217 deletions(-) diff --git a/backend/drive/drive.go b/backend/drive/drive.go index 28354747a..267c67e3c 100755 --- a/backend/drive/drive.go +++ b/backend/drive/drive.go @@ -641,7 +641,10 @@ func (f *Fs) Features() *fs.Features { } // shouldRetry determines whether a given err rates being retried -func (f *Fs) shouldRetry(err error) (bool, error) { +func (f *Fs) shouldRetry(ctx context.Context, err error) (bool, error) { + if fserrors.ContextError(ctx, &err) { + return false, err + } if err == nil { return false, nil } @@ -695,20 +698,20 @@ func containsString(slice []string, s string) bool { } // getFile returns drive.File for the ID passed and fields passed in -func (f *Fs) getFile(ID string, fields googleapi.Field) (info *drive.File, err error) { +func (f *Fs) getFile(ctx context.Context, ID string, fields googleapi.Field) (info *drive.File, err error) { err = f.pacer.Call(func() (bool, error) { info, err = f.svc.Files.Get(ID). Fields(fields). SupportsAllDrives(true). Do() - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) return info, err } // getRootID returns the canonical ID for the "root" ID -func (f *Fs) getRootID() (string, error) { - info, err := f.getFile("root", "id") +func (f *Fs) getRootID(ctx context.Context) (string, error) { + info, err := f.getFile(ctx, "root", "id") if err != nil { return "", errors.Wrap(err, "couldn't find root directory ID") } @@ -814,7 +817,7 @@ OUTER: var files *drive.FileList err = f.pacer.Call(func() (bool, error) { files, err = list.Fields(googleapi.Field(fields)).Context(ctx).Do() - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return false, errors.Wrap(err, "couldn't list directory") @@ -837,7 +840,7 @@ OUTER: if filesOnly && item.ShortcutDetails.TargetMimeType == driveFolderType { continue } - item, err = f.resolveShortcut(item) + item, err = f.resolveShortcut(ctx, item) if err != nil { return false, errors.Wrap(err, "list") } @@ -855,7 +858,7 @@ OUTER: if !found { continue } - _, exportName, _, _ := f.findExportFormat(item) + _, exportName, _, _ := f.findExportFormat(ctx, item) if exportName == "" || exportName != title { continue } @@ -1155,7 +1158,7 @@ func NewFs(ctx context.Context, name, path string, m configmap.Mapper) (fs.Fs, e f.rootFolderID = f.opt.TeamDriveID } else { // otherwise look up the actual root ID - rootID, err := f.getRootID() + rootID, err := f.getRootID(ctx) if err != nil { if gerr, ok := errors.Cause(err).(*googleapi.Error); ok && gerr.Code == 404 { // 404 means that this scope does not have permission to get the @@ -1328,26 +1331,26 @@ func (f *Fs) newLinkObject(remote string, info *drive.File, extension, exportMim // newObjectWithInfo creates an fs.Object for any drive.File // // When the drive.File cannot be represented as an fs.Object it will return (nil, nil). -func (f *Fs) newObjectWithInfo(remote string, info *drive.File) (fs.Object, error) { +func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *drive.File) (fs.Object, error) { // If item has MD5 sum or a length it is a file stored on drive if info.Md5Checksum != "" || info.Size > 0 { return f.newRegularObject(remote, info), nil } - extension, exportName, exportMimeType, isDocument := f.findExportFormat(info) - return f.newObjectWithExportInfo(remote, info, extension, exportName, exportMimeType, isDocument) + extension, exportName, exportMimeType, isDocument := f.findExportFormat(ctx, info) + return f.newObjectWithExportInfo(ctx, remote, info, extension, exportName, exportMimeType, isDocument) } // newObjectWithExportInfo creates an fs.Object for any drive.File and the result of findExportFormat // // When the drive.File cannot be represented as an fs.Object it will return (nil, nil). func (f *Fs) newObjectWithExportInfo( - remote string, info *drive.File, + ctx context.Context, remote string, info *drive.File, extension, exportName, exportMimeType string, isDocument bool) (o fs.Object, err error) { // Note that resolveShortcut will have been called already if // we are being called from a listing. However the drive.Item // will have been resolved so this will do nothing. - info, err = f.resolveShortcut(info) + info, err = f.resolveShortcut(ctx, info) if err != nil { return nil, errors.Wrap(err, "new object") } @@ -1395,7 +1398,7 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { } remote = remote[:len(remote)-len(extension)] - obj, err := f.newObjectWithExportInfo(remote, info, extension, exportName, exportMimeType, isDocument) + obj, err := f.newObjectWithExportInfo(ctx, remote, info, extension, exportName, exportMimeType, isDocument) switch { case err != nil: return nil, err @@ -1412,7 +1415,7 @@ func (f *Fs) FindLeaf(ctx context.Context, pathID, leaf string) (pathIDOut strin pathID = actualID(pathID) found, err = f.list(ctx, []string{pathID}, leaf, true, false, f.opt.TrashedOnly, false, func(item *drive.File) bool { if !f.opt.SkipGdocs { - _, exportName, _, isDocument := f.findExportFormat(item) + _, exportName, _, isDocument := f.findExportFormat(ctx, item) if exportName == leaf { pathIDOut = item.Id return true @@ -1448,7 +1451,7 @@ func (f *Fs) CreateDir(ctx context.Context, pathID, leaf string) (newID string, Fields("id"). SupportsAllDrives(true). Do() - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return "", err @@ -1483,7 +1486,7 @@ func linkTemplate(mt string) *template.Template { }) return _linkTemplates[mt] } -func (f *Fs) fetchFormats() { +func (f *Fs) fetchFormats(ctx context.Context) { fetchFormatsOnce.Do(func() { var about *drive.About var err error @@ -1491,7 +1494,7 @@ func (f *Fs) fetchFormats() { about, err = f.svc.About.Get(). Fields("exportFormats,importFormats"). Do() - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { fs.Errorf(f, "Failed to get Drive exportFormats and importFormats: %v", err) @@ -1508,8 +1511,8 @@ func (f *Fs) fetchFormats() { // if necessary. // // if the fetch fails then it will not export any drive formats -func (f *Fs) exportFormats() map[string][]string { - f.fetchFormats() +func (f *Fs) exportFormats(ctx context.Context) map[string][]string { + f.fetchFormats(ctx) return _exportFormats } @@ -1517,8 +1520,8 @@ func (f *Fs) exportFormats() map[string][]string { // if necessary. // // if the fetch fails then it will not import any drive formats -func (f *Fs) importFormats() map[string][]string { - f.fetchFormats() +func (f *Fs) importFormats(ctx context.Context) map[string][]string { + f.fetchFormats(ctx) return _importFormats } @@ -1527,9 +1530,9 @@ func (f *Fs) importFormats() map[string][]string { // // Look through the exportExtensions and find the first format that can be // converted. If none found then return ("", "", false) -func (f *Fs) findExportFormatByMimeType(itemMimeType string) ( +func (f *Fs) findExportFormatByMimeType(ctx context.Context, itemMimeType string) ( extension, mimeType string, isDocument bool) { - exportMimeTypes, isDocument := f.exportFormats()[itemMimeType] + exportMimeTypes, isDocument := f.exportFormats(ctx)[itemMimeType] if isDocument { for _, _extension := range f.exportExtensions { _mimeType := mime.TypeByExtension(_extension) @@ -1556,8 +1559,8 @@ func (f *Fs) findExportFormatByMimeType(itemMimeType string) ( // // Look through the exportExtensions and find the first format that can be // converted. If none found then return ("", "", "", false) -func (f *Fs) findExportFormat(item *drive.File) (extension, filename, mimeType string, isDocument bool) { - extension, mimeType, isDocument = f.findExportFormatByMimeType(item.MimeType) +func (f *Fs) findExportFormat(ctx context.Context, item *drive.File) (extension, filename, mimeType string, isDocument bool) { + extension, mimeType, isDocument = f.findExportFormatByMimeType(ctx, item.MimeType) if extension != "" { filename = item.Name + extension } @@ -1569,9 +1572,9 @@ func (f *Fs) findExportFormat(item *drive.File) (extension, filename, mimeType s // MIME type is returned // // When no match is found "" is returned. -func (f *Fs) findImportFormat(mimeType string) string { +func (f *Fs) findImportFormat(ctx context.Context, mimeType string) string { mimeType = fixMimeType(mimeType) - ifs := f.importFormats() + ifs := f.importFormats(ctx) for _, mt := range f.importMimeTypes { if mt == mimeType { importMimeTypes := ifs[mimeType] @@ -1604,7 +1607,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e var iErr error _, err = f.list(ctx, []string{directoryID}, "", false, false, f.opt.TrashedOnly, false, func(item *drive.File) bool { - entry, err := f.itemToDirEntry(path.Join(dir, item.Name), item) + entry, err := f.itemToDirEntry(ctx, path.Join(dir, item.Name), item) if err != nil { iErr = err return true @@ -1717,7 +1720,7 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in chan listRE } } remote := path.Join(paths[i], item.Name) - entry, err := f.itemToDirEntry(remote, item) + entry, err := f.itemToDirEntry(ctx, remote, item) if err != nil { iErr = err return true @@ -1982,7 +1985,7 @@ func isShortcut(item *drive.File) bool { // Note that we assume shortcuts can't point to shortcuts. Google // drive web interface doesn't offer the option to create a shortcut // to a shortcut. The documentation is silent on the issue. -func (f *Fs) resolveShortcut(item *drive.File) (newItem *drive.File, err error) { +func (f *Fs) resolveShortcut(ctx context.Context, item *drive.File) (newItem *drive.File, err error) { if f.opt.SkipShortcuts || item.MimeType != shortcutMimeType { return item, nil } @@ -1990,7 +1993,7 @@ func (f *Fs) resolveShortcut(item *drive.File) (newItem *drive.File, err error) fs.Errorf(nil, "Expecting shortcutDetails in %v", item) return item, nil } - newItem, err = f.getFile(item.ShortcutDetails.TargetId, f.fileFields) + newItem, err = f.getFile(ctx, item.ShortcutDetails.TargetId, f.fileFields) if err != nil { if gerr, ok := errors.Cause(err).(*googleapi.Error); ok && gerr.Code == 404 { // 404 means dangling shortcut, so just return the shortcut with the mime type mangled @@ -2012,7 +2015,7 @@ func (f *Fs) resolveShortcut(item *drive.File) (newItem *drive.File, err error) // itemToDirEntry converts a drive.File to an fs.DirEntry. // When the drive.File cannot be represented as an fs.DirEntry // (nil, nil) is returned. -func (f *Fs) itemToDirEntry(remote string, item *drive.File) (entry fs.DirEntry, err error) { +func (f *Fs) itemToDirEntry(ctx context.Context, remote string, item *drive.File) (entry fs.DirEntry, err error) { switch { case item.MimeType == driveFolderType: // cache the directory ID for later lookups @@ -2026,7 +2029,7 @@ func (f *Fs) itemToDirEntry(remote string, item *drive.File) (entry fs.DirEntry, case f.opt.AuthOwnerOnly && !isAuthOwned(item): // ignore object default: - entry, err = f.newObjectWithInfo(remote, item) + entry, err = f.newObjectWithInfo(ctx, remote, item) if err == fs.ErrorObjectNotFound { return nil, nil } @@ -2093,12 +2096,12 @@ func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, importMimeType := "" if f.importMimeTypes != nil && !f.opt.SkipGdocs { - importMimeType = f.findImportFormat(srcMimeType) + importMimeType = f.findImportFormat(ctx, srcMimeType) if isInternalMimeType(importMimeType) { remote = remote[:len(remote)-len(srcExt)] - exportExt, _, _ = f.findExportFormatByMimeType(importMimeType) + exportExt, _, _ = f.findExportFormatByMimeType(ctx, importMimeType) if exportExt == "" { return nil, errors.Errorf("No export format found for %q", importMimeType) } @@ -2129,7 +2132,7 @@ func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, SupportsAllDrives(true). KeepRevisionForever(f.opt.KeepRevisionForever). Do() - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return nil, err @@ -2141,7 +2144,7 @@ func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, return nil, err } } - return f.newObjectWithInfo(remote, info) + return f.newObjectWithInfo(ctx, remote, info) } // MergeDirs merges the contents of all the directories passed @@ -2184,7 +2187,7 @@ func (f *Fs) MergeDirs(ctx context.Context, dirs []fs.Directory) error { Fields(""). SupportsAllDrives(true). Do() - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return errors.Wrapf(err, "MergeDirs move failed on %q in %v", info.Name, srcDir) @@ -2224,7 +2227,7 @@ func (f *Fs) delete(ctx context.Context, id string, useTrash bool) error { SupportsAllDrives(true). Do() } - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) } @@ -2337,7 +2340,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, if isDoc { // preserve the description on copy for docs - info, err := f.getFile(actualID(srcObj.id), "description") + info, err := f.getFile(ctx, actualID(srcObj.id), "description") if err != nil { fs.Errorf(srcObj, "Failed to read description for Google Doc: %v", err) } else { @@ -2359,12 +2362,12 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, SupportsAllDrives(true). KeepRevisionForever(f.opt.KeepRevisionForever). Do() - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return nil, err } - newObject, err := f.newObjectWithInfo(remote, info) + newObject, err := f.newObjectWithInfo(ctx, remote, info) if err != nil { return nil, err } @@ -2458,7 +2461,7 @@ func (f *Fs) CleanUp(ctx context.Context) error { } err := f.pacer.Call(func() (bool, error) { err := f.svc.Files.EmptyTrash().Context(ctx).Do() - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { @@ -2476,7 +2479,7 @@ func (f *Fs) teamDriveOK(ctx context.Context) (err error) { var td *drive.Drive err = f.pacer.Call(func() (bool, error) { td, err = f.svc.Drives.Get(f.opt.TeamDriveID).Fields("name,id,capabilities,createdTime,restrictions").Context(ctx).Do() - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return errors.Wrap(err, "failed to get Shared Drive info") @@ -2499,7 +2502,7 @@ func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { var err error err = f.pacer.Call(func() (bool, error) { about, err = f.svc.About.Get().Fields("storageQuota").Context(ctx).Do() - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return nil, errors.Wrap(err, "failed to get Drive storageQuota") @@ -2572,13 +2575,13 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, Fields(partialFields). SupportsAllDrives(true). Do() - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return nil, err } - return f.newObjectWithInfo(remote, info) + return f.newObjectWithInfo(ctx, remote, info) } // PublicLink adds a "readable by anyone with link" permission on the given file or folder. @@ -2609,7 +2612,7 @@ func (f *Fs) PublicLink(ctx context.Context, remote string, expire fs.Duration, Fields(""). SupportsAllDrives(true). Do() - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return "", err @@ -2652,7 +2655,7 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string Fields(""). SupportsAllDrives(true). Do() - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return err @@ -2670,7 +2673,7 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) { go func() { // get the StartPageToken early so all changes from now on get processed - startPageToken, err := f.changeNotifyStartPageToken() + startPageToken, err := f.changeNotifyStartPageToken(ctx) if err != nil { fs.Infof(f, "Failed to get StartPageToken: %s", err) } @@ -2695,7 +2698,7 @@ func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryT } case <-tickerC: if startPageToken == "" { - startPageToken, err = f.changeNotifyStartPageToken() + startPageToken, err = f.changeNotifyStartPageToken(ctx) if err != nil { fs.Infof(f, "Failed to get StartPageToken: %s", err) continue @@ -2710,7 +2713,7 @@ func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryT } }() } -func (f *Fs) changeNotifyStartPageToken() (pageToken string, err error) { +func (f *Fs) changeNotifyStartPageToken(ctx context.Context) (pageToken string, err error) { var startPageToken *drive.StartPageToken err = f.pacer.Call(func() (bool, error) { changes := f.svc.Changes.GetStartPageToken().SupportsAllDrives(true) @@ -2718,7 +2721,7 @@ func (f *Fs) changeNotifyStartPageToken() (pageToken string, err error) { changes.DriveId(f.opt.TeamDriveID) } startPageToken, err = changes.Do() - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return @@ -2747,7 +2750,7 @@ func (f *Fs) changeNotifyRunner(ctx context.Context, notifyFunc func(string, fs. changesCall.Spaces("appDataFolder") } changeList, err = changesCall.Context(ctx).Do() - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return @@ -2944,7 +2947,7 @@ func (f *Fs) makeShortcut(ctx context.Context, srcPath string, dstFs *Fs, dstPat SupportsAllDrives(true). KeepRevisionForever(dstFs.opt.KeepRevisionForever). Do() - return dstFs.shouldRetry(err) + return dstFs.shouldRetry(ctx, err) }) if err != nil { return nil, errors.Wrap(err, "shortcut creation failed") @@ -2952,7 +2955,7 @@ func (f *Fs) makeShortcut(ctx context.Context, srcPath string, dstFs *Fs, dstPat if isDir { return nil, nil } - return dstFs.newObjectWithInfo(dstPath, info) + return dstFs.newObjectWithInfo(ctx, dstPath, info) } // List all team drives @@ -2964,7 +2967,7 @@ func (f *Fs) listTeamDrives(ctx context.Context) (drives []*drive.TeamDrive, err var teamDrives *drive.TeamDriveList err = f.pacer.Call(func() (bool, error) { teamDrives, err = listTeamDrives.Context(ctx).Do() - return defaultFs.shouldRetry(err) + return defaultFs.shouldRetry(ctx, err) }) if err != nil { return drives, errors.Wrap(err, "listing Team Drives failed") @@ -3007,7 +3010,7 @@ func (f *Fs) unTrash(ctx context.Context, dir string, directoryID string, recurs SupportsAllDrives(true). Fields("trashed"). Do() - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { err = errors.Wrap(err, "failed to restore") @@ -3049,7 +3052,7 @@ func (f *Fs) unTrashDir(ctx context.Context, dir string, recurse bool) (r unTras // copy file with id to dest func (f *Fs) copyID(ctx context.Context, id, dest string) (err error) { - info, err := f.getFile(id, f.fileFields) + info, err := f.getFile(ctx, id, f.fileFields) if err != nil { return errors.Wrap(err, "couldn't find id") } @@ -3057,7 +3060,7 @@ func (f *Fs) copyID(ctx context.Context, id, dest string) (err error) { return errors.Errorf("can't copy directory use: rclone copy --drive-root-folder-id %s %s %s", id, fs.ConfigString(f), dest) } info.Name = f.opt.Enc.ToStandardName(info.Name) - o, err := f.newObjectWithInfo(info.Name, info) + o, err := f.newObjectWithInfo(ctx, info.Name, info) if err != nil { return err } @@ -3358,7 +3361,7 @@ func (f *Fs) getRemoteInfoWithExport(ctx context.Context, remote string) ( found, err := f.list(ctx, []string{directoryID}, leaf, false, false, f.opt.TrashedOnly, false, func(item *drive.File) bool { if !f.opt.SkipGdocs { - extension, exportName, exportMimeType, isDocument = f.findExportFormat(item) + extension, exportName, exportMimeType, isDocument = f.findExportFormat(ctx, item) if exportName == leaf { info = item return true @@ -3410,7 +3413,7 @@ func (o *baseObject) SetModTime(ctx context.Context, modTime time.Time) error { Fields(partialFields). SupportsAllDrives(true). Do() - return o.fs.shouldRetry(err) + return o.fs.shouldRetry(ctx, err) }) if err != nil { return err @@ -3448,7 +3451,7 @@ func (o *baseObject) httpResponse(ctx context.Context, url, method string, optio _ = res.Body.Close() // ignore error } } - return o.fs.shouldRetry(err) + return o.fs.shouldRetry(ctx, err) }) if err != nil { return req, nil, err @@ -3541,7 +3544,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read Fields("downloadUrl"). SupportsAllDrives(true). Do() - return o.fs.shouldRetry(err) + return o.fs.shouldRetry(ctx, err) }) if err == nil { fs.Debugf(o, "Using v2 download: %v", v2File.DownloadUrl) @@ -3622,7 +3625,7 @@ func (o *baseObject) update(ctx context.Context, updateInfo *drive.File, uploadM SupportsAllDrives(true). KeepRevisionForever(o.fs.opt.KeepRevisionForever). Do() - return o.fs.shouldRetry(err) + return o.fs.shouldRetry(ctx, err) }) return } @@ -3665,7 +3668,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op if err != nil { return err } - newO, err := o.fs.newObjectWithInfo(src.Remote(), info) + newO, err := o.fs.newObjectWithInfo(ctx, src.Remote(), info) if err != nil { return err } @@ -3689,7 +3692,7 @@ func (o *documentObject) Update(ctx context.Context, in io.Reader, src fs.Object if o.fs.importMimeTypes == nil || o.fs.opt.SkipGdocs { return errors.Errorf("can't update google document type without --drive-import-formats") } - importMimeType = o.fs.findImportFormat(updateInfo.MimeType) + importMimeType = o.fs.findImportFormat(ctx, updateInfo.MimeType) if importMimeType == "" { return errors.Errorf("no import format found for %q", srcMimeType) } @@ -3706,7 +3709,7 @@ func (o *documentObject) Update(ctx context.Context, in io.Reader, src fs.Object remote := src.Remote() remote = remote[:len(remote)-o.extLen] - newO, err := o.fs.newObjectWithInfo(remote, info) + newO, err := o.fs.newObjectWithInfo(ctx, remote, info) if err != nil { return err } diff --git a/backend/drive/drive_internal_test.go b/backend/drive/drive_internal_test.go index e78a717a6..2858a2ba5 100644 --- a/backend/drive/drive_internal_test.go +++ b/backend/drive/drive_internal_test.go @@ -111,6 +111,7 @@ func TestInternalParseExtensions(t *testing.T) { } func TestInternalFindExportFormat(t *testing.T) { + ctx := context.Background() item := &drive.File{ Name: "file", MimeType: "application/vnd.google-apps.document", @@ -128,7 +129,7 @@ func TestInternalFindExportFormat(t *testing.T) { } { f := new(Fs) f.exportExtensions = test.extensions - gotExtension, gotFilename, gotMimeType, gotIsDocument := f.findExportFormat(item) + gotExtension, gotFilename, gotMimeType, gotIsDocument := f.findExportFormat(ctx, item) assert.Equal(t, test.wantExtension, gotExtension) if test.wantExtension != "" { assert.Equal(t, item.Name+gotExtension, gotFilename) diff --git a/backend/drive/upload.go b/backend/drive/upload.go index 14c7baa64..df6454429 100644 --- a/backend/drive/upload.go +++ b/backend/drive/upload.go @@ -94,7 +94,7 @@ func (f *Fs) Upload(ctx context.Context, in io.Reader, size int64, contentType, defer googleapi.CloseBody(res) err = googleapi.CheckResponse(res) } - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return nil, err @@ -202,7 +202,7 @@ func (rx *resumableUpload) Upload(ctx context.Context) (*drive.File, error) { err = rx.f.pacer.Call(func() (bool, error) { fs.Debugf(rx.remote, "Sending chunk %d length %d", start, reqSize) StatusCode, err = rx.transferChunk(ctx, start, chunk, reqSize) - again, err := rx.f.shouldRetry(err) + again, err := rx.f.shouldRetry(ctx, err) if StatusCode == statusResumeIncomplete || StatusCode == http.StatusCreated || StatusCode == http.StatusOK { again = false err = nil diff --git a/backend/mailru/mailru.go b/backend/mailru/mailru.go index 3e8aad9c4..0398dea32 100644 --- a/backend/mailru/mailru.go +++ b/backend/mailru/mailru.go @@ -234,7 +234,10 @@ var retryErrorCodes = []int{ // shouldRetry returns a boolean as to whether this response and err // deserve to be retried. It returns the err as a convenience. // Retries password authorization (once) in a special case of access denied. -func shouldRetry(res *http.Response, err error, f *Fs, opts *rest.Opts) (bool, error) { +func shouldRetry(ctx context.Context, res *http.Response, err error, f *Fs, opts *rest.Opts) (bool, error) { + if fserrors.ContextError(ctx, &err) { + return false, err + } if res != nil && res.StatusCode == 403 && f.opt.Password != "" && !f.passFailed { reAuthErr := f.reAuthorize(opts, err) return reAuthErr == nil, err // return an original error @@ -600,7 +603,7 @@ func (f *Fs) readItemMetaData(ctx context.Context, path string) (entry fs.DirEnt var info api.ItemInfoResponse err = f.pacer.Call(func() (bool, error) { res, err := f.srv.CallJSON(ctx, &opts, nil, &info) - return shouldRetry(res, err, f, &opts) + return shouldRetry(ctx, res, err, f, &opts) }) if err != nil { @@ -736,7 +739,7 @@ func (f *Fs) listM1(ctx context.Context, dirPath string, offset int, limit int) ) err = f.pacer.Call(func() (bool, error) { res, err = f.srv.CallJSON(ctx, &opts, nil, &info) - return shouldRetry(res, err, f, &opts) + return shouldRetry(ctx, res, err, f, &opts) }) if err != nil { @@ -800,7 +803,7 @@ func (f *Fs) listBin(ctx context.Context, dirPath string, depth int) (entries fs var res *http.Response err = f.pacer.Call(func() (bool, error) { res, err = f.srv.Call(ctx, &opts) - return shouldRetry(res, err, f, &opts) + return shouldRetry(ctx, res, err, f, &opts) }) if err != nil { closeBody(res) @@ -1073,7 +1076,7 @@ func (f *Fs) CreateDir(ctx context.Context, path string) error { var res *http.Response err = f.pacer.Call(func() (bool, error) { res, err = f.srv.Call(ctx, &opts) - return shouldRetry(res, err, f, &opts) + return shouldRetry(ctx, res, err, f, &opts) }) if err != nil { closeBody(res) @@ -1216,7 +1219,7 @@ func (f *Fs) delete(ctx context.Context, path string, hardDelete bool) error { var response api.GenericResponse err = f.pacer.Call(func() (bool, error) { res, err := f.srv.CallJSON(ctx, &opts, nil, &response) - return shouldRetry(res, err, f, &opts) + return shouldRetry(ctx, res, err, f, &opts) }) switch { @@ -1288,7 +1291,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, var response api.GenericBodyResponse err = f.pacer.Call(func() (bool, error) { res, err := f.srv.CallJSON(ctx, &opts, nil, &response) - return shouldRetry(res, err, f, &opts) + return shouldRetry(ctx, res, err, f, &opts) }) if err != nil { @@ -1392,7 +1395,7 @@ func (f *Fs) moveItemBin(ctx context.Context, srcPath, dstPath, opName string) e var res *http.Response err = f.pacer.Call(func() (bool, error) { res, err = f.srv.Call(ctx, &opts) - return shouldRetry(res, err, f, &opts) + return shouldRetry(ctx, res, err, f, &opts) }) if err != nil { closeBody(res) @@ -1483,7 +1486,7 @@ func (f *Fs) PublicLink(ctx context.Context, remote string, expire fs.Duration, var response api.GenericBodyResponse err = f.pacer.Call(func() (bool, error) { res, err := f.srv.CallJSON(ctx, &opts, nil, &response) - return shouldRetry(res, err, f, &opts) + return shouldRetry(ctx, res, err, f, &opts) }) if err == nil && response.Body != "" { @@ -1524,7 +1527,7 @@ func (f *Fs) CleanUp(ctx context.Context) error { var response api.CleanupResponse err = f.pacer.Call(func() (bool, error) { res, err := f.srv.CallJSON(ctx, &opts, nil, &response) - return shouldRetry(res, err, f, &opts) + return shouldRetry(ctx, res, err, f, &opts) }) if err != nil { return err @@ -1557,7 +1560,7 @@ func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { var info api.UserInfoResponse err = f.pacer.Call(func() (bool, error) { res, err := f.srv.CallJSON(ctx, &opts, nil, &info) - return shouldRetry(res, err, f, &opts) + return shouldRetry(ctx, res, err, f, &opts) }) if err != nil { return nil, err @@ -2076,7 +2079,7 @@ func (o *Object) addFileMetaData(ctx context.Context, overwrite bool) error { var res *http.Response err = o.fs.pacer.Call(func() (bool, error) { res, err = o.fs.srv.Call(ctx, &opts) - return shouldRetry(res, err, o.fs, &opts) + return shouldRetry(ctx, res, err, o.fs, &opts) }) if err != nil { closeBody(res) @@ -2172,7 +2175,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read } opts.RootURL = server res, err = o.fs.srv.Call(ctx, &opts) - return shouldRetry(res, err, o.fs, &opts) + return shouldRetry(ctx, res, err, o.fs, &opts) }) if err != nil { if res != nil && res.Body != nil { diff --git a/backend/mega/mega.go b/backend/mega/mega.go index b65e94fc5..36d41842e 100644 --- a/backend/mega/mega.go +++ b/backend/mega/mega.go @@ -30,6 +30,7 @@ import ( "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/config/obscure" + "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/lib/encoder" @@ -158,7 +159,10 @@ func parsePath(path string) (root string) { // shouldRetry returns a boolean as to whether this err deserves to be // retried. It returns the err as a convenience -func shouldRetry(err error) (bool, error) { +func shouldRetry(ctx context.Context, err error) (bool, error) { + if fserrors.ContextError(ctx, &err) { + return false, err + } // Let the mega library handle the low level retries return false, err /* @@ -171,8 +175,8 @@ func shouldRetry(err error) (bool, error) { } // readMetaDataForPath reads the metadata from the path -func (f *Fs) readMetaDataForPath(remote string) (info *mega.Node, err error) { - rootNode, err := f.findRoot(false) +func (f *Fs) readMetaDataForPath(ctx context.Context, remote string) (info *mega.Node, err error) { + rootNode, err := f.findRoot(ctx, false) if err != nil { return nil, err } @@ -237,7 +241,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e }).Fill(ctx, f) // Find the root node and check if it is a file or not - _, err = f.findRoot(false) + _, err = f.findRoot(ctx, false) switch err { case nil: // root node found and is a directory @@ -307,8 +311,8 @@ func (f *Fs) findObject(rootNode *mega.Node, file string) (node *mega.Node, err // lookupDir looks up the node for the directory of the name given // // if create is true it tries to create the root directory if not found -func (f *Fs) lookupDir(dir string) (*mega.Node, error) { - rootNode, err := f.findRoot(false) +func (f *Fs) lookupDir(ctx context.Context, dir string) (*mega.Node, error) { + rootNode, err := f.findRoot(ctx, false) if err != nil { return nil, err } @@ -316,15 +320,15 @@ func (f *Fs) lookupDir(dir string) (*mega.Node, error) { } // lookupParentDir finds the parent node for the remote passed in -func (f *Fs) lookupParentDir(remote string) (dirNode *mega.Node, leaf string, err error) { +func (f *Fs) lookupParentDir(ctx context.Context, remote string) (dirNode *mega.Node, leaf string, err error) { parent, leaf := path.Split(remote) - dirNode, err = f.lookupDir(parent) + dirNode, err = f.lookupDir(ctx, parent) return dirNode, leaf, err } // mkdir makes the directory and any parent directories for the // directory of the name given -func (f *Fs) mkdir(rootNode *mega.Node, dir string) (node *mega.Node, err error) { +func (f *Fs) mkdir(ctx context.Context, rootNode *mega.Node, dir string) (node *mega.Node, err error) { f.mkdirMu.Lock() defer f.mkdirMu.Unlock() @@ -358,7 +362,7 @@ func (f *Fs) mkdir(rootNode *mega.Node, dir string) (node *mega.Node, err error) // create directory called name in node err = f.pacer.Call(func() (bool, error) { node, err = f.srv.CreateDir(name, node) - return shouldRetry(err) + return shouldRetry(ctx, err) }) if err != nil { return nil, errors.Wrap(err, "mkdir create node failed") @@ -368,20 +372,20 @@ func (f *Fs) mkdir(rootNode *mega.Node, dir string) (node *mega.Node, err error) } // mkdirParent creates the parent directory of remote -func (f *Fs) mkdirParent(remote string) (dirNode *mega.Node, leaf string, err error) { - rootNode, err := f.findRoot(true) +func (f *Fs) mkdirParent(ctx context.Context, remote string) (dirNode *mega.Node, leaf string, err error) { + rootNode, err := f.findRoot(ctx, true) if err != nil { return nil, "", err } parent, leaf := path.Split(remote) - dirNode, err = f.mkdir(rootNode, parent) + dirNode, err = f.mkdir(ctx, rootNode, parent) return dirNode, leaf, err } // findRoot looks up the root directory node and returns it. // // if create is true it tries to create the root directory if not found -func (f *Fs) findRoot(create bool) (*mega.Node, error) { +func (f *Fs) findRoot(ctx context.Context, create bool) (*mega.Node, error) { f.rootNodeMu.Lock() defer f.rootNodeMu.Unlock() @@ -403,7 +407,7 @@ func (f *Fs) findRoot(create bool) (*mega.Node, error) { } //..not found so create the root directory - f._rootNode, err = f.mkdir(absRoot, f.root) + f._rootNode, err = f.mkdir(ctx, absRoot, f.root) return f._rootNode, err } @@ -433,7 +437,7 @@ func (f *Fs) CleanUp(ctx context.Context) (err error) { fs.Debugf(f, "Deleting trash %q", f.opt.Enc.ToStandardName(item.GetName())) deleteErr := f.pacer.Call(func() (bool, error) { err := f.srv.Delete(item, true) - return shouldRetry(err) + return shouldRetry(ctx, err) }) if deleteErr != nil { err = deleteErr @@ -447,7 +451,7 @@ func (f *Fs) CleanUp(ctx context.Context) (err error) { // Return an Object from a path // // If it can't be found it returns the error fs.ErrorObjectNotFound. -func (f *Fs) newObjectWithInfo(remote string, info *mega.Node) (fs.Object, error) { +func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *mega.Node) (fs.Object, error) { o := &Object{ fs: f, remote: remote, @@ -457,7 +461,7 @@ func (f *Fs) newObjectWithInfo(remote string, info *mega.Node) (fs.Object, error // Set info err = o.setMetaData(info) } else { - err = o.readMetaData() // reads info and meta, returning an error + err = o.readMetaData(ctx) // reads info and meta, returning an error } if err != nil { return nil, err @@ -468,7 +472,7 @@ func (f *Fs) newObjectWithInfo(remote string, info *mega.Node) (fs.Object, error // NewObject finds the Object at remote. If it can't be found // it returns the error fs.ErrorObjectNotFound. func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { - return f.newObjectWithInfo(remote, nil) + return f.newObjectWithInfo(ctx, remote, nil) } // list the objects into the function supplied @@ -506,7 +510,7 @@ func (f *Fs) list(ctx context.Context, dir *mega.Node, fn listFn) (found bool, e // This should return ErrDirNotFound if the directory isn't // found. func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { - dirNode, err := f.lookupDir(dir) + dirNode, err := f.lookupDir(ctx, dir) if err != nil { return nil, err } @@ -518,7 +522,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e d := fs.NewDir(remote, info.GetTimeStamp()).SetID(info.GetHash()) entries = append(entries, d) case mega.FILE: - o, err := f.newObjectWithInfo(remote, info) + o, err := f.newObjectWithInfo(ctx, remote, info) if err != nil { iErr = err return true @@ -542,8 +546,8 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e // Returns the dirNode, object, leaf and error // // Used to create new objects -func (f *Fs) createObject(remote string, modTime time.Time, size int64) (o *Object, dirNode *mega.Node, leaf string, err error) { - dirNode, leaf, err = f.mkdirParent(remote) +func (f *Fs) createObject(ctx context.Context, remote string, modTime time.Time, size int64) (o *Object, dirNode *mega.Node, leaf string, err error) { + dirNode, leaf, err = f.mkdirParent(ctx, remote) if err != nil { return nil, nil, leaf, err } @@ -565,7 +569,7 @@ func (f *Fs) createObject(remote string, modTime time.Time, size int64) (o *Obje // This will create a duplicate if we upload a new file without // checking to see if there is one already - use Put() for that. func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - existingObj, err := f.newObjectWithInfo(src.Remote(), nil) + existingObj, err := f.newObjectWithInfo(ctx, src.Remote(), nil) switch err { case nil: return existingObj, existingObj.Update(ctx, in, src, options...) @@ -591,7 +595,7 @@ func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, size := src.Size() modTime := src.ModTime(ctx) - o, _, _, err := f.createObject(remote, modTime, size) + o, _, _, err := f.createObject(ctx, remote, modTime, size) if err != nil { return nil, err } @@ -600,30 +604,30 @@ func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, // Mkdir creates the directory if it doesn't exist func (f *Fs) Mkdir(ctx context.Context, dir string) error { - rootNode, err := f.findRoot(true) + rootNode, err := f.findRoot(ctx, true) if err != nil { return err } - _, err = f.mkdir(rootNode, dir) + _, err = f.mkdir(ctx, rootNode, dir) return errors.Wrap(err, "Mkdir failed") } // deleteNode removes a file or directory, observing useTrash -func (f *Fs) deleteNode(node *mega.Node) (err error) { +func (f *Fs) deleteNode(ctx context.Context, node *mega.Node) (err error) { err = f.pacer.Call(func() (bool, error) { err = f.srv.Delete(node, f.opt.HardDelete) - return shouldRetry(err) + return shouldRetry(ctx, err) }) return err } // purgeCheck removes the directory dir, if check is set then it // refuses to do so if it has anything in -func (f *Fs) purgeCheck(dir string, check bool) error { +func (f *Fs) purgeCheck(ctx context.Context, dir string, check bool) error { f.mkdirMu.Lock() defer f.mkdirMu.Unlock() - rootNode, err := f.findRoot(false) + rootNode, err := f.findRoot(ctx, false) if err != nil { return err } @@ -644,7 +648,7 @@ func (f *Fs) purgeCheck(dir string, check bool) error { waitEvent := f.srv.WaitEventsStart() - err = f.deleteNode(dirNode) + err = f.deleteNode(ctx, dirNode) if err != nil { return errors.Wrap(err, "delete directory node failed") } @@ -662,7 +666,7 @@ func (f *Fs) purgeCheck(dir string, check bool) error { // // Returns an error if it isn't empty func (f *Fs) Rmdir(ctx context.Context, dir string) error { - return f.purgeCheck(dir, true) + return f.purgeCheck(ctx, dir, true) } // Precision return the precision of this Fs @@ -676,13 +680,13 @@ func (f *Fs) Precision() time.Duration { // deleting all the files quicker than just running Remove() on the // result of List() func (f *Fs) Purge(ctx context.Context, dir string) error { - return f.purgeCheck(dir, false) + return f.purgeCheck(ctx, dir, false) } // move a file or folder (srcFs, srcRemote, info) to (f, dstRemote) // // info will be updates -func (f *Fs) move(dstRemote string, srcFs *Fs, srcRemote string, info *mega.Node) (err error) { +func (f *Fs) move(ctx context.Context, dstRemote string, srcFs *Fs, srcRemote string, info *mega.Node) (err error) { var ( dstFs = f srcDirNode, dstDirNode *mega.Node @@ -692,12 +696,12 @@ func (f *Fs) move(dstRemote string, srcFs *Fs, srcRemote string, info *mega.Node if dstRemote != "" { // lookup or create the destination parent directory - dstDirNode, dstLeaf, err = dstFs.mkdirParent(dstRemote) + dstDirNode, dstLeaf, err = dstFs.mkdirParent(ctx, dstRemote) } else { // find or create the parent of the root directory absRoot := dstFs.srv.FS.GetRoot() dstParent, dstLeaf = path.Split(dstFs.root) - dstDirNode, err = dstFs.mkdir(absRoot, dstParent) + dstDirNode, err = dstFs.mkdir(ctx, absRoot, dstParent) } if err != nil { return errors.Wrap(err, "server-side move failed to make dst parent dir") @@ -705,7 +709,7 @@ func (f *Fs) move(dstRemote string, srcFs *Fs, srcRemote string, info *mega.Node if srcRemote != "" { // lookup the existing parent directory - srcDirNode, srcLeaf, err = srcFs.lookupParentDir(srcRemote) + srcDirNode, srcLeaf, err = srcFs.lookupParentDir(ctx, srcRemote) } else { // lookup the existing root parent absRoot := srcFs.srv.FS.GetRoot() @@ -721,7 +725,7 @@ func (f *Fs) move(dstRemote string, srcFs *Fs, srcRemote string, info *mega.Node //log.Printf("move src %p %q dst %p %q", srcDirNode, srcDirNode.GetName(), dstDirNode, dstDirNode.GetName()) err = f.pacer.Call(func() (bool, error) { err = f.srv.Move(info, dstDirNode) - return shouldRetry(err) + return shouldRetry(ctx, err) }) if err != nil { return errors.Wrap(err, "server-side move failed") @@ -735,7 +739,7 @@ func (f *Fs) move(dstRemote string, srcFs *Fs, srcRemote string, info *mega.Node //log.Printf("rename %q to %q", srcLeaf, dstLeaf) err = f.pacer.Call(func() (bool, error) { err = f.srv.Rename(info, f.opt.Enc.FromStandardName(dstLeaf)) - return shouldRetry(err) + return shouldRetry(ctx, err) }) if err != nil { return errors.Wrap(err, "server-side rename failed") @@ -767,7 +771,7 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, } // Do the move - err := f.move(remote, srcObj.fs, srcObj.remote, srcObj.info) + err := f.move(ctx, remote, srcObj.fs, srcObj.remote, srcObj.info) if err != nil { return nil, err } @@ -798,13 +802,13 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string } // find the source - info, err := srcFs.lookupDir(srcRemote) + info, err := srcFs.lookupDir(ctx, srcRemote) if err != nil { return err } // check the destination doesn't exist - _, err = dstFs.lookupDir(dstRemote) + _, err = dstFs.lookupDir(ctx, dstRemote) if err == nil { return fs.ErrorDirExists } else if err != fs.ErrorDirNotFound { @@ -812,7 +816,7 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string } // Do the move - err = f.move(dstRemote, srcFs, srcRemote, info) + err = f.move(ctx, dstRemote, srcFs, srcRemote, info) if err != nil { return err } @@ -838,7 +842,7 @@ func (f *Fs) Hashes() hash.Set { // PublicLink generates a public link to the remote path (usually readable by anyone) func (f *Fs) PublicLink(ctx context.Context, remote string, expire fs.Duration, unlink bool) (link string, err error) { - root, err := f.findRoot(false) + root, err := f.findRoot(ctx, false) if err != nil { return "", errors.Wrap(err, "PublicLink failed to find root node") } @@ -886,7 +890,7 @@ func (f *Fs) MergeDirs(ctx context.Context, dirs []fs.Directory) error { fs.Infof(srcDir, "merging %q", f.opt.Enc.ToStandardName(info.GetName())) err = f.pacer.Call(func() (bool, error) { err = f.srv.Move(info, dstDirNode) - return shouldRetry(err) + return shouldRetry(ctx, err) }) if err != nil { return errors.Wrapf(err, "MergeDirs move failed on %q in %v", f.opt.Enc.ToStandardName(info.GetName()), srcDir) @@ -894,7 +898,7 @@ func (f *Fs) MergeDirs(ctx context.Context, dirs []fs.Directory) error { } // rmdir (into trash) the now empty source directory fs.Infof(srcDir, "removing empty directory") - err = f.deleteNode(srcDirNode) + err = f.deleteNode(ctx, srcDirNode) if err != nil { return errors.Wrapf(err, "MergeDirs move failed to rmdir %q", srcDir) } @@ -908,7 +912,7 @@ func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { var err error err = f.pacer.Call(func() (bool, error) { q, err = f.srv.GetQuota() - return shouldRetry(err) + return shouldRetry(ctx, err) }) if err != nil { return nil, errors.Wrap(err, "failed to get Mega Quota") @@ -963,11 +967,11 @@ func (o *Object) setMetaData(info *mega.Node) (err error) { // readMetaData gets the metadata if it hasn't already been fetched // // it also sets the info -func (o *Object) readMetaData() (err error) { +func (o *Object) readMetaData(ctx context.Context) (err error) { if o.info != nil { return nil } - info, err := o.fs.readMetaDataForPath(o.remote) + info, err := o.fs.readMetaDataForPath(ctx, o.remote) if err != nil { if err == fs.ErrorDirNotFound { err = fs.ErrorObjectNotFound @@ -998,6 +1002,7 @@ func (o *Object) Storable() bool { // openObject represents a download in progress type openObject struct { + ctx context.Context mu sync.Mutex o *Object d *mega.Download @@ -1008,14 +1013,14 @@ type openObject struct { } // get the next chunk -func (oo *openObject) getChunk() (err error) { +func (oo *openObject) getChunk(ctx context.Context) (err error) { if oo.id >= oo.d.Chunks() { return io.EOF } var chunk []byte err = oo.o.fs.pacer.Call(func() (bool, error) { chunk, err = oo.d.DownloadChunk(oo.id) - return shouldRetry(err) + return shouldRetry(ctx, err) }) if err != nil { return err @@ -1045,7 +1050,7 @@ func (oo *openObject) Read(p []byte) (n int, err error) { oo.skip -= int64(size) } if len(oo.chunk) == 0 { - err = oo.getChunk() + err = oo.getChunk(oo.ctx) if err != nil { return 0, err } @@ -1068,7 +1073,7 @@ func (oo *openObject) Close() (err error) { } err = oo.o.fs.pacer.Call(func() (bool, error) { err = oo.d.Finish() - return shouldRetry(err) + return shouldRetry(oo.ctx, err) }) if err != nil { return errors.Wrap(err, "failed to finish download") @@ -1096,13 +1101,14 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read var d *mega.Download err = o.fs.pacer.Call(func() (bool, error) { d, err = o.fs.srv.NewDownload(o.info) - return shouldRetry(err) + return shouldRetry(ctx, err) }) if err != nil { return nil, errors.Wrap(err, "open download file failed") } oo := &openObject{ + ctx: ctx, o: o, d: d, skip: offset, @@ -1125,7 +1131,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op remote := o.Remote() // Create the parent directory - dirNode, leaf, err := o.fs.mkdirParent(remote) + dirNode, leaf, err := o.fs.mkdirParent(ctx, remote) if err != nil { return errors.Wrap(err, "update make parent dir failed") } @@ -1133,7 +1139,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op var u *mega.Upload err = o.fs.pacer.Call(func() (bool, error) { u, err = o.fs.srv.NewUpload(dirNode, o.fs.opt.Enc.FromStandardName(leaf), size) - return shouldRetry(err) + return shouldRetry(ctx, err) }) if err != nil { return errors.Wrap(err, "upload file failed to create session") @@ -1154,7 +1160,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op err = o.fs.pacer.Call(func() (bool, error) { err = u.UploadChunk(id, chunk) - return shouldRetry(err) + return shouldRetry(ctx, err) }) if err != nil { return errors.Wrap(err, "upload file failed to upload chunk") @@ -1165,7 +1171,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op var info *mega.Node err = o.fs.pacer.Call(func() (bool, error) { info, err = u.Finish() - return shouldRetry(err) + return shouldRetry(ctx, err) }) if err != nil { return errors.Wrap(err, "failed to finish upload") @@ -1173,7 +1179,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // If the upload succeeded and the original object existed, then delete it if o.info != nil { - err = o.fs.deleteNode(o.info) + err = o.fs.deleteNode(ctx, o.info) if err != nil { return errors.Wrap(err, "upload failed to remove old version") } @@ -1185,7 +1191,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // Remove an object func (o *Object) Remove(ctx context.Context) error { - err := o.fs.deleteNode(o.info) + err := o.fs.deleteNode(ctx, o.info) if err != nil { return errors.Wrap(err, "Remove object failed") } diff --git a/backend/onedrive/onedrive.go b/backend/onedrive/onedrive.go index 2973ba8ce..17e505a60 100755 --- a/backend/onedrive/onedrive.go +++ b/backend/onedrive/onedrive.go @@ -549,7 +549,10 @@ var errAsyncJobAccessDenied = errors.New("async job failed - access denied") // shouldRetry returns a boolean as to whether this resp and err // deserve to be retried. It returns the err as a convenience -func shouldRetry(resp *http.Response, err error) (bool, error) { +func shouldRetry(ctx context.Context, resp *http.Response, err error) (bool, error) { + if fserrors.ContextError(ctx, &err) { + return false, err + } retry := false if resp != nil { switch resp.StatusCode { @@ -596,7 +599,7 @@ func (f *Fs) readMetaDataForPathRelativeToID(ctx context.Context, normalizedID s err = f.pacer.Call(func() (bool, error) { resp, err = f.srv.CallJSON(ctx, &opts, nil, &info) - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) return info, resp, err @@ -612,7 +615,7 @@ func (f *Fs) readMetaDataForPath(ctx context.Context, path string) (info *api.It opts.Path = strings.TrimSuffix(opts.Path, ":") err = f.pacer.Call(func() (bool, error) { resp, err = f.srv.CallJSON(ctx, &opts, nil, &info) - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) return info, resp, err } @@ -868,7 +871,7 @@ func (f *Fs) CreateDir(ctx context.Context, dirID, leaf string) (newID string, e } err = f.pacer.Call(func() (bool, error) { resp, err = f.srv.CallJSON(ctx, &opts, &mkdir, &info) - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) if err != nil { //fmt.Printf("...Error %v\n", err) @@ -900,7 +903,7 @@ OUTER: var resp *http.Response err = f.pacer.Call(func() (bool, error) { resp, err = f.srv.CallJSON(ctx, &opts, nil, &result) - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) if err != nil { return found, errors.Wrap(err, "couldn't list files") @@ -1037,7 +1040,7 @@ func (f *Fs) deleteObject(ctx context.Context, id string) error { return f.pacer.Call(func() (bool, error) { resp, err := f.srv.Call(ctx, &opts) - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) } @@ -1193,7 +1196,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, var resp *http.Response err = f.pacer.Call(func() (bool, error) { resp, err = f.srv.CallJSON(ctx, &opts, ©Req, nil) - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) if err != nil { return nil, err @@ -1286,7 +1289,7 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, var info api.Item err = f.pacer.Call(func() (bool, error) { resp, err = f.srv.CallJSON(ctx, &opts, &move, &info) - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) if err != nil { return nil, err @@ -1353,7 +1356,7 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string var info api.Item err = f.pacer.Call(func() (bool, error) { resp, err = f.srv.CallJSON(ctx, &opts, &move, &info) - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) if err != nil { return err @@ -1379,7 +1382,7 @@ func (f *Fs) About(ctx context.Context) (usage *fs.Usage, err error) { var resp *http.Response err = f.pacer.Call(func() (bool, error) { resp, err = f.srv.CallJSON(ctx, &opts, nil, &drive) - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) if err != nil { return nil, errors.Wrap(err, "about failed") @@ -1429,7 +1432,7 @@ func (f *Fs) PublicLink(ctx context.Context, remote string, expire fs.Duration, var result api.CreateShareLinkResponse err = f.pacer.Call(func() (bool, error) { resp, err = f.srv.CallJSON(ctx, &opts, &share, &result) - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) if err != nil { fmt.Println(err) @@ -1474,7 +1477,7 @@ func (o *Object) deleteVersions(ctx context.Context) error { var versions api.VersionsResponse err := o.fs.pacer.Call(func() (bool, error) { resp, err := o.fs.srv.CallJSON(ctx, &opts, nil, &versions) - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) if err != nil { return err @@ -1501,7 +1504,7 @@ func (o *Object) deleteVersion(ctx context.Context, ID string) error { opts.NoResponse = true return o.fs.pacer.Call(func() (bool, error) { resp, err := o.fs.srv.Call(ctx, &opts) - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) } @@ -1652,7 +1655,7 @@ func (o *Object) setModTime(ctx context.Context, modTime time.Time) (*api.Item, var info *api.Item err := o.fs.pacer.Call(func() (bool, error) { resp, err := o.fs.srv.CallJSON(ctx, &opts, &update, &info) - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) // Remove versions if required if o.fs.opt.NoVersions { @@ -1694,7 +1697,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read err = o.fs.pacer.Call(func() (bool, error) { resp, err = o.fs.srv.Call(ctx, &opts) - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) if err != nil { return nil, err @@ -1722,7 +1725,7 @@ func (o *Object) createUploadSession(ctx context.Context, modTime time.Time) (re err = errors.New(err.Error() + " (is it a OneNote file?)") } } - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) return response, err } @@ -1737,7 +1740,7 @@ func (o *Object) getPosition(ctx context.Context, url string) (pos int64, err er var resp *http.Response err = o.fs.pacer.Call(func() (bool, error) { resp, err = o.fs.srv.CallJSON(ctx, &opts, nil, &info) - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) if err != nil { return 0, err @@ -1797,11 +1800,11 @@ func (o *Object) uploadFragment(ctx context.Context, url string, start int64, to return true, errors.Wrapf(err, "retry this chunk skipping %d bytes", skip) } if err != nil { - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) } body, err = rest.ReadBody(resp) if err != nil { - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) } if resp.StatusCode == 200 || resp.StatusCode == 201 { // we are done :) @@ -1824,7 +1827,7 @@ func (o *Object) cancelUploadSession(ctx context.Context, url string) (err error var resp *http.Response err = o.fs.pacer.Call(func() (bool, error) { resp, err = o.fs.srv.Call(ctx, &opts) - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) return } @@ -1895,7 +1898,7 @@ func (o *Object) uploadSinglepart(ctx context.Context, in io.Reader, size int64, err = errors.New(err.Error() + " (is it a OneNote file?)") } } - return shouldRetry(resp, err) + return shouldRetry(ctx, resp, err) }) if err != nil { return nil, err diff --git a/backend/s3/s3.go b/backend/s3/s3.go index fc570413b..43b4f6984 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -1399,7 +1399,10 @@ var retryErrorCodes = []int{ //S3 is pretty resilient, and the built in retry handling is probably sufficient // as it should notice closed connections and timeouts which are the most likely // sort of failure modes -func (f *Fs) shouldRetry(err error) (bool, error) { +func (f *Fs) shouldRetry(ctx context.Context, err error) (bool, error) { + if fserrors.ContextError(ctx, &err) { + return false, err + } // If this is an awserr object, try and extract more useful information to determine if we should retry if awsError, ok := err.(awserr.Error); ok { // Simple case, check the original embedded error in case it's generically retryable @@ -1411,7 +1414,7 @@ func (f *Fs) shouldRetry(err error) (bool, error) { // 301 if wrong region for bucket - can only update if running from a bucket if f.rootBucket != "" { if reqErr.StatusCode() == http.StatusMovedPermanently { - urfbErr := f.updateRegionForBucket(f.rootBucket) + urfbErr := f.updateRegionForBucket(ctx, f.rootBucket) if urfbErr != nil { fs.Errorf(f, "Failed to update region for bucket: %v", urfbErr) return false, err @@ -1741,7 +1744,7 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { } // Gets the bucket location -func (f *Fs) getBucketLocation(bucket string) (string, error) { +func (f *Fs) getBucketLocation(ctx context.Context, bucket string) (string, error) { req := s3.GetBucketLocationInput{ Bucket: &bucket, } @@ -1749,7 +1752,7 @@ func (f *Fs) getBucketLocation(bucket string) (string, error) { var err error err = f.pacer.Call(func() (bool, error) { resp, err = f.c.GetBucketLocation(&req) - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return "", err @@ -1759,8 +1762,8 @@ func (f *Fs) getBucketLocation(bucket string) (string, error) { // Updates the region for the bucket by reading the region from the // bucket then updating the session. -func (f *Fs) updateRegionForBucket(bucket string) error { - region, err := f.getBucketLocation(bucket) +func (f *Fs) updateRegionForBucket(ctx context.Context, bucket string) error { + region, err := f.getBucketLocation(ctx, bucket) if err != nil { return errors.Wrap(err, "reading bucket location failed") } @@ -1854,7 +1857,7 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck } } } - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { if awsErr, ok := err.(awserr.RequestFailure); ok { @@ -2001,7 +2004,7 @@ func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) var resp *s3.ListBucketsOutput err = f.pacer.Call(func() (bool, error) { resp, err = f.c.ListBucketsWithContext(ctx, &req) - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return nil, err @@ -2116,7 +2119,7 @@ func (f *Fs) bucketExists(ctx context.Context, bucket string) (bool, error) { } err := f.pacer.Call(func() (bool, error) { _, err := f.c.HeadBucketWithContext(ctx, &req) - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err == nil { return true, nil @@ -2152,7 +2155,7 @@ func (f *Fs) makeBucket(ctx context.Context, bucket string) error { } err := f.pacer.Call(func() (bool, error) { _, err := f.c.CreateBucketWithContext(ctx, &req) - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err == nil { fs.Infof(f, "Bucket %q created with ACL %q", bucket, f.opt.BucketACL) @@ -2182,7 +2185,7 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error { } err := f.pacer.Call(func() (bool, error) { _, err := f.c.DeleteBucketWithContext(ctx, &req) - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err == nil { fs.Infof(f, "Bucket %q deleted", bucket) @@ -2242,7 +2245,7 @@ func (f *Fs) copy(ctx context.Context, req *s3.CopyObjectInput, dstBucket, dstPa } return f.pacer.Call(func() (bool, error) { _, err := f.c.CopyObjectWithContext(ctx, req) - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) } @@ -2286,7 +2289,7 @@ func (f *Fs) copyMultipart(ctx context.Context, copyReq *s3.CopyObjectInput, dst if err := f.pacer.Call(func() (bool, error) { var err error cout, err = f.c.CreateMultipartUploadWithContext(ctx, req) - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }); err != nil { return err } @@ -2302,7 +2305,7 @@ func (f *Fs) copyMultipart(ctx context.Context, copyReq *s3.CopyObjectInput, dst UploadId: uid, RequestPayer: req.RequestPayer, }) - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) })() @@ -2325,7 +2328,7 @@ func (f *Fs) copyMultipart(ctx context.Context, copyReq *s3.CopyObjectInput, dst uploadPartReq.CopySourceRange = aws.String(calculateRange(partSize, partNum-1, numParts, srcSize)) uout, err := f.c.UploadPartCopyWithContext(ctx, uploadPartReq) if err != nil { - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) } parts = append(parts, &s3.CompletedPart{ PartNumber: &partNum, @@ -2347,7 +2350,7 @@ func (f *Fs) copyMultipart(ctx context.Context, copyReq *s3.CopyObjectInput, dst RequestPayer: req.RequestPayer, UploadId: uid, }) - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) } @@ -2578,7 +2581,7 @@ func (f *Fs) Command(ctx context.Context, name string, arg []string, opt map[str reqCopy.Key = &bucketPath err = f.pacer.Call(func() (bool, error) { _, err = f.c.RestoreObject(&reqCopy) - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { st.Status = err.Error() @@ -2626,7 +2629,7 @@ func (f *Fs) listMultipartUploads(ctx context.Context, bucket, key string) (uplo var resp *s3.ListMultipartUploadsOutput err = f.pacer.Call(func() (bool, error) { resp, err = f.c.ListMultipartUploads(&req) - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return nil, errors.Wrapf(err, "list multipart uploads bucket %q key %q", bucket, key) @@ -2801,7 +2804,7 @@ func (o *Object) headObject(ctx context.Context) (resp *s3.HeadObjectOutput, err err = o.fs.pacer.Call(func() (bool, error) { var err error resp, err = o.fs.c.HeadObjectWithContext(ctx, &req) - return o.fs.shouldRetry(err) + return o.fs.shouldRetry(ctx, err) }) if err != nil { if awsErr, ok := err.(awserr.RequestFailure); ok { @@ -2957,7 +2960,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read var err error httpReq.HTTPRequest = httpReq.HTTPRequest.WithContext(ctx) err = httpReq.Send() - return o.fs.shouldRetry(err) + return o.fs.shouldRetry(ctx, err) }) if err, ok := err.(awserr.RequestFailure); ok { if err.Code() == "InvalidObjectState" { @@ -3016,7 +3019,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si err = f.pacer.Call(func() (bool, error) { var err error cout, err = f.c.CreateMultipartUploadWithContext(ctx, &mReq) - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return errors.Wrap(err, "multipart upload failed to initialise") @@ -3035,7 +3038,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si UploadId: uid, RequestPayer: req.RequestPayer, }) - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if errCancel != nil { fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel) @@ -3111,7 +3114,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si uout, err := f.c.UploadPartWithContext(gCtx, uploadPartReq) if err != nil { if partNum <= int64(concurrency) { - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) } // retry all chunks once have done the first batch return true, err @@ -3151,7 +3154,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si RequestPayer: req.RequestPayer, UploadId: uid, }) - return f.shouldRetry(err) + return f.shouldRetry(ctx, err) }) if err != nil { return errors.Wrap(err, "multipart upload failed to finalise") @@ -3306,11 +3309,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op var err error resp, err = o.fs.srv.Do(httpReq) if err != nil { - return o.fs.shouldRetry(err) + return o.fs.shouldRetry(ctx, err) } body, err := rest.ReadBody(resp) if err != nil { - return o.fs.shouldRetry(err) + return o.fs.shouldRetry(ctx, err) } if resp.StatusCode >= 200 && resp.StatusCode < 299 { return false, nil @@ -3361,7 +3364,7 @@ func (o *Object) Remove(ctx context.Context) error { } err := o.fs.pacer.Call(func() (bool, error) { _, err := o.fs.c.DeleteObjectWithContext(ctx, &req) - return o.fs.shouldRetry(err) + return o.fs.shouldRetry(ctx, err) }) return err } diff --git a/backend/swift/swift.go b/backend/swift/swift.go index 21c0d8d5a..1f1032915 100644 --- a/backend/swift/swift.go +++ b/backend/swift/swift.go @@ -291,7 +291,10 @@ var retryErrorCodes = []int{ // shouldRetry returns a boolean as to whether this err deserves to be // retried. It returns the err as a convenience -func shouldRetry(err error) (bool, error) { +func shouldRetry(ctx context.Context, err error) (bool, error) { + if fserrors.ContextError(ctx, &err) { + return false, err + } // If this is a swift.Error object extract the HTTP error code if swiftError, ok := err.(*swift.Error); ok { for _, e := range retryErrorCodes { @@ -307,7 +310,7 @@ func shouldRetry(err error) (bool, error) { // shouldRetryHeaders returns a boolean as to whether this err // deserves to be retried. It reads the headers passed in looking for // `Retry-After`. It returns the err as a convenience -func shouldRetryHeaders(headers swift.Headers, err error) (bool, error) { +func shouldRetryHeaders(ctx context.Context, headers swift.Headers, err error) (bool, error) { if swiftError, ok := err.(*swift.Error); ok && swiftError.StatusCode == 429 { if value := headers["Retry-After"]; value != "" { retryAfter, parseErr := strconv.Atoi(value) @@ -326,7 +329,7 @@ func shouldRetryHeaders(headers swift.Headers, err error) (bool, error) { } } } - return shouldRetry(err) + return shouldRetry(ctx, err) } // parsePath parses a remote 'url' @@ -468,7 +471,7 @@ func NewFsWithConnection(ctx context.Context, opt *Options, name, root string, c err = f.pacer.Call(func() (bool, error) { var rxHeaders swift.Headers info, rxHeaders, err = f.c.Object(ctx, f.rootContainer, encodedDirectory) - return shouldRetryHeaders(rxHeaders, err) + return shouldRetryHeaders(ctx, rxHeaders, err) }) if err == nil && info.ContentType != directoryMarkerContentType { newRoot := path.Dir(f.root) @@ -576,7 +579,7 @@ func (f *Fs) listContainerRoot(ctx context.Context, container, directory, prefix var err error err = f.pacer.Call(func() (bool, error) { objects, err = f.c.Objects(ctx, container, opts) - return shouldRetry(err) + return shouldRetry(ctx, err) }) if err == nil { for i := range objects { @@ -661,7 +664,7 @@ func (f *Fs) listContainers(ctx context.Context) (entries fs.DirEntries, err err var containers []swift.Container err = f.pacer.Call(func() (bool, error) { containers, err = f.c.ContainersAll(ctx, nil) - return shouldRetry(err) + return shouldRetry(ctx, err) }) if err != nil { return nil, errors.Wrap(err, "container listing failed") @@ -753,7 +756,7 @@ func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { var err error err = f.pacer.Call(func() (bool, error) { containers, err = f.c.ContainersAll(ctx, nil) - return shouldRetry(err) + return shouldRetry(ctx, err) }) if err != nil { return nil, errors.Wrap(err, "container listing failed") @@ -805,7 +808,7 @@ func (f *Fs) makeContainer(ctx context.Context, container string) error { err = f.pacer.Call(func() (bool, error) { var rxHeaders swift.Headers _, rxHeaders, err = f.c.Container(ctx, container) - return shouldRetryHeaders(rxHeaders, err) + return shouldRetryHeaders(ctx, rxHeaders, err) }) } if err == swift.ContainerNotFound { @@ -815,7 +818,7 @@ func (f *Fs) makeContainer(ctx context.Context, container string) error { } err = f.pacer.Call(func() (bool, error) { err = f.c.ContainerCreate(ctx, container, headers) - return shouldRetry(err) + return shouldRetry(ctx, err) }) if err == nil { fs.Infof(f, "Container %q created", container) @@ -836,7 +839,7 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error { err := f.cache.Remove(container, func() error { err := f.pacer.Call(func() (bool, error) { err := f.c.ContainerDelete(ctx, container) - return shouldRetry(err) + return shouldRetry(ctx, err) }) if err == nil { fs.Infof(f, "Container %q removed", container) @@ -906,7 +909,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, err = f.pacer.Call(func() (bool, error) { var rxHeaders swift.Headers rxHeaders, err = f.c.ObjectCopy(ctx, srcContainer, srcPath, dstContainer, dstPath, nil) - return shouldRetryHeaders(rxHeaders, err) + return shouldRetryHeaders(ctx, rxHeaders, err) }) if err != nil { return nil, err @@ -1041,7 +1044,7 @@ func (o *Object) readMetaData(ctx context.Context) (err error) { container, containerPath := o.split() err = o.fs.pacer.Call(func() (bool, error) { info, h, err = o.fs.c.Object(ctx, container, containerPath) - return shouldRetryHeaders(h, err) + return shouldRetryHeaders(ctx, h, err) }) if err != nil { if err == swift.ObjectNotFound { @@ -1100,7 +1103,7 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error { container, containerPath := o.split() return o.fs.pacer.Call(func() (bool, error) { err = o.fs.c.ObjectUpdate(ctx, container, containerPath, newHeaders) - return shouldRetry(err) + return shouldRetry(ctx, err) }) } @@ -1121,7 +1124,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read err = o.fs.pacer.Call(func() (bool, error) { var rxHeaders swift.Headers in, rxHeaders, err = o.fs.c.ObjectOpen(ctx, container, containerPath, !isRanging, headers) - return shouldRetryHeaders(rxHeaders, err) + return shouldRetryHeaders(ctx, rxHeaders, err) }) return } @@ -1211,7 +1214,7 @@ func (o *Object) updateChunks(ctx context.Context, in0 io.Reader, headers swift. err = o.fs.pacer.Call(func() (bool, error) { var rxHeaders swift.Headers _, rxHeaders, err = o.fs.c.Container(ctx, segmentsContainer) - return shouldRetryHeaders(rxHeaders, err) + return shouldRetryHeaders(ctx, rxHeaders, err) }) if err == swift.ContainerNotFound { headers := swift.Headers{} @@ -1220,7 +1223,7 @@ func (o *Object) updateChunks(ctx context.Context, in0 io.Reader, headers swift. } err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.c.ContainerCreate(ctx, segmentsContainer, headers) - return shouldRetry(err) + return shouldRetry(ctx, err) }) } if err != nil { @@ -1267,7 +1270,7 @@ func (o *Object) updateChunks(ctx context.Context, in0 io.Reader, headers swift. if err == nil { segmentInfos = append(segmentInfos, segmentPath) } - return shouldRetryHeaders(rxHeaders, err) + return shouldRetryHeaders(ctx, rxHeaders, err) }) if err != nil { return "", err @@ -1281,7 +1284,7 @@ func (o *Object) updateChunks(ctx context.Context, in0 io.Reader, headers swift. err = o.fs.pacer.Call(func() (bool, error) { var rxHeaders swift.Headers rxHeaders, err = o.fs.c.ObjectPut(ctx, container, containerPath, emptyReader, true, "", contentType, headers) - return shouldRetryHeaders(rxHeaders, err) + return shouldRetryHeaders(ctx, rxHeaders, err) }) if err == nil { @@ -1356,7 +1359,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op var rxHeaders swift.Headers err = o.fs.pacer.CallNoRetry(func() (bool, error) { rxHeaders, err = o.fs.c.ObjectPut(ctx, container, containerPath, in, true, "", contentType, headers) - return shouldRetryHeaders(rxHeaders, err) + return shouldRetryHeaders(ctx, rxHeaders, err) }) if err != nil { return err @@ -1414,7 +1417,7 @@ func (o *Object) Remove(ctx context.Context) (err error) { // Remove file/manifest first err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.c.ObjectDelete(ctx, container, containerPath) - return shouldRetry(err) + return shouldRetry(ctx, err) }) if err != nil { return err diff --git a/backend/swift/swift_internal_test.go b/backend/swift/swift_internal_test.go index f0feab80d..c85355084 100644 --- a/backend/swift/swift_internal_test.go +++ b/backend/swift/swift_internal_test.go @@ -1,6 +1,7 @@ package swift import ( + "context" "testing" "time" @@ -32,6 +33,7 @@ func TestInternalUrlEncode(t *testing.T) { } func TestInternalShouldRetryHeaders(t *testing.T) { + ctx := context.Background() headers := swift.Headers{ "Content-Length": "64", "Content-Type": "text/html; charset=UTF-8", @@ -45,7 +47,7 @@ func TestInternalShouldRetryHeaders(t *testing.T) { // Short sleep should just do the sleep start := time.Now() - retry, gotErr := shouldRetryHeaders(headers, err) + retry, gotErr := shouldRetryHeaders(ctx, headers, err) dt := time.Since(start) assert.True(t, retry) assert.Equal(t, err, gotErr) @@ -54,7 +56,7 @@ func TestInternalShouldRetryHeaders(t *testing.T) { // Long sleep should return RetryError headers["Retry-After"] = "3600" start = time.Now() - retry, gotErr = shouldRetryHeaders(headers, err) + retry, gotErr = shouldRetryHeaders(ctx, headers, err) dt = time.Since(start) assert.True(t, dt < time.Second) assert.False(t, retry)