rclone/backend/hidrive/helpers.go

880 lines
29 KiB
Go

package hidrive
// This file is for helper-functions which may provide more general and
// specialized functionality than the generic interfaces.
// There are two sections:
// 1. methods bound to Fs
// 2. other functions independent from Fs used throughout the package
// NOTE: Functions accessing paths expect any relative paths
// to be resolved prior to execution with resolvePath(...).
import (
"bytes"
"context"
"errors"
"io"
"net/http"
"path"
"strconv"
"sync"
"time"
"github.com/rclone/rclone/backend/hidrive/api"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/lib/ranges"
"github.com/rclone/rclone/lib/readers"
"github.com/rclone/rclone/lib/rest"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
const (
// MaximumUploadBytes represents the maximum amount of bytes
// a single upload-operation will support.
MaximumUploadBytes = 2147483647 // = 2GiB - 1
// iterationChunkSize represents the chunk size used to iterate directory contents.
iterationChunkSize = 5000
)
var (
// retryErrorCodes is a slice of error codes that we will always retry.
retryErrorCodes = []int{
429, // Too Many Requests
500, // Internal Server Error
502, // Bad Gateway
503, // Service Unavailable
504, // Gateway Timeout
509, // Bandwidth Limit Exceeded
}
// ErrorFileExists is returned when a query tries to create a file
// that already exists.
ErrorFileExists = errors.New("destination file already exists")
)
// MemberType represents the possible types of entries a directory can contain.
type MemberType string
// possible values for MemberType
const (
AllMembers MemberType = "all"
NoMembers MemberType = "none"
DirectoryMembers MemberType = api.HiDriveObjectTypeDirectory
FileMembers MemberType = api.HiDriveObjectTypeFile
SymlinkMembers MemberType = api.HiDriveObjectTypeSymlink
)
// SortByField represents possible fields to sort entries of a directory by.
type SortByField string
// possible values for SortByField
const (
descendingSort string = "-"
SortByName SortByField = "name"
SortByModTime SortByField = "mtime"
SortByObjectType SortByField = "type"
SortBySize SortByField = "size"
SortByNameDescending SortByField = SortByField(descendingSort) + SortByName
SortByModTimeDescending SortByField = SortByField(descendingSort) + SortByModTime
SortByObjectTypeDescending SortByField = SortByField(descendingSort) + SortByObjectType
SortBySizeDescending SortByField = SortByField(descendingSort) + SortBySize
)
var (
// Unsorted disables sorting and can therefore not be combined with other values.
Unsorted = []SortByField{"none"}
// DefaultSorted does not specify how to sort and
// therefore implies the default sort order.
DefaultSorted = []SortByField{}
)
// CopyOrMoveOperationType represents the possible types of copy- and move-operations.
type CopyOrMoveOperationType int
// possible values for CopyOrMoveOperationType
const (
MoveOriginal CopyOrMoveOperationType = iota
CopyOriginal
CopyOriginalPreserveModTime
)
// OnExistAction represents possible actions the API should take,
// when a request tries to create a path that already exists.
type OnExistAction string
// possible values for OnExistAction
const (
// IgnoreOnExist instructs the API not to execute
// the request in case of a conflict, but to return an error.
IgnoreOnExist OnExistAction = "ignore"
// AutoNameOnExist instructs the API to automatically rename
// any conflicting request-objects.
AutoNameOnExist OnExistAction = "autoname"
// OverwriteOnExist instructs the API to overwrite any conflicting files.
// This can only be used, if the request operates on files directly.
// (For example when moving/copying a file.)
// For most requests this action will simply be ignored.
OverwriteOnExist OnExistAction = "overwrite"
)
// shouldRetry returns a boolean as to whether this resp and err deserve to be retried.
// It tries to expire/invalidate the token, if necessary.
// It returns the err as a convenience.
func (f *Fs) shouldRetry(ctx context.Context, resp *http.Response, err error) (bool, error) {
if fserrors.ContextError(ctx, &err) {
return false, err
}
if resp != nil && (resp.StatusCode == 401 || isHTTPError(err, 401)) && len(resp.Header["Www-Authenticate"]) > 0 {
fs.Debugf(f, "Token might be invalid: %v", err)
if f.tokenRenewer != nil {
iErr := f.tokenRenewer.Expire()
if iErr == nil {
return true, err
}
}
}
return fserrors.ShouldRetry(err) || fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err
}
// resolvePath resolves the given (relative) path and
// returns a path suitable for API-calls.
// This will consider the root-path of the fs and any needed prefixes.
//
// Any relative paths passed to functions that access these paths should
// be resolved with this first!
func (f *Fs) resolvePath(objectPath string) string {
resolved := path.Join(f.opt.RootPrefix, f.root, f.opt.Enc.FromStandardPath(objectPath))
return resolved
}
// iterateOverDirectory calls the given function callback
// on each item found in a given directory.
//
// If callback ever returns true then this exits early with found = true.
func (f *Fs) iterateOverDirectory(ctx context.Context, directory string, searchOnly MemberType, callback func(*api.HiDriveObject) bool, fields []string, sortBy []SortByField) (found bool, err error) {
parameters := api.NewQueryParameters()
parameters.SetPath(directory)
parameters.AddFields("members.", fields...)
parameters.AddFields("", api.DirectoryContentFields...)
parameters.Set("members", string(searchOnly))
for _, v := range sortBy {
// The explicit conversion is necessary for each element.
parameters.AddList("sort", ",", string(v))
}
opts := rest.Opts{
Method: "GET",
Path: "/dir",
Parameters: parameters.Values,
}
iterateContent := func(result *api.DirectoryContent, err error) (bool, error) {
if err != nil {
return false, err
}
for _, item := range result.Entries {
item.Name = f.opt.Enc.ToStandardName(item.Name)
if callback(&item) {
return true, nil
}
}
return false, nil
}
return f.paginateDirectoryAccess(ctx, &opts, iterationChunkSize, 0, iterateContent)
}
// paginateDirectoryAccess executes requests specified via ctx and opts
// which should produce api.DirectoryContent.
// This will paginate the requests using limit starting at the given offset.
//
// The given function callback is called on each api.DirectoryContent found
// along with any errors that occurred.
// If callback ever returns true then this exits early with found = true.
// If callback ever returns an error then this exits early with that error.
func (f *Fs) paginateDirectoryAccess(ctx context.Context, opts *rest.Opts, limit int64, offset int64, callback func(*api.DirectoryContent, error) (bool, error)) (found bool, err error) {
for {
opts.Parameters.Set("limit", strconv.FormatInt(offset, 10)+","+strconv.FormatInt(limit, 10))
var result api.DirectoryContent
var resp *http.Response
err = f.pacer.Call(func() (bool, error) {
resp, err = f.srv.CallJSON(ctx, opts, nil, &result)
return f.shouldRetry(ctx, resp, err)
})
found, err = callback(&result, err)
if found || err != nil {
return found, err
}
offset += int64(len(result.Entries))
if offset >= result.TotalCount || limit > int64(len(result.Entries)) {
break
}
}
return false, nil
}
// fetchMetadataForPath reads the metadata from the path.
func (f *Fs) fetchMetadataForPath(ctx context.Context, path string, fields []string) (*api.HiDriveObject, error) {
parameters := api.NewQueryParameters()
parameters.SetPath(path)
parameters.AddFields("", fields...)
opts := rest.Opts{
Method: "GET",
Path: "/meta",
Parameters: parameters.Values,
}
var result api.HiDriveObject
var resp *http.Response
var err error
err = f.pacer.Call(func() (bool, error) {
resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
return f.shouldRetry(ctx, resp, err)
})
if err != nil {
return nil, err
}
return &result, nil
}
// copyOrMove copies or moves a directory or file
// from the source-path to the destination-path.
//
// The operation will only be successful
// if the parent-directory of the destination-path exists.
//
// NOTE: Use the explicit methods instead of directly invoking this method.
// (Those are: copyDirectory, moveDirectory, copyFile, moveFile.)
func (f *Fs) copyOrMove(ctx context.Context, isDirectory bool, operationType CopyOrMoveOperationType, source string, destination string, onExist OnExistAction) (*api.HiDriveObject, error) {
parameters := api.NewQueryParameters()
parameters.Set("src", source)
parameters.Set("dst", destination)
if onExist == AutoNameOnExist ||
(onExist == OverwriteOnExist && !isDirectory) {
parameters.Set("on_exist", string(onExist))
}
endpoint := "/"
if isDirectory {
endpoint += "dir"
} else {
endpoint += "file"
}
switch operationType {
case MoveOriginal:
endpoint += "/move"
case CopyOriginalPreserveModTime:
parameters.Set("preserve_mtime", strconv.FormatBool(true))
fallthrough
case CopyOriginal:
endpoint += "/copy"
}
opts := rest.Opts{
Method: "POST",
Path: endpoint,
Parameters: parameters.Values,
}
var result api.HiDriveObject
var resp *http.Response
var err error
err = f.pacer.Call(func() (bool, error) {
resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
return f.shouldRetry(ctx, resp, err)
})
if err != nil {
return nil, err
}
return &result, nil
}
// moveDirectory moves the directory at the source-path to the destination-path and
// returns the resulting api-object if successful.
//
// The operation will only be successful
// if the parent-directory of the destination-path exists.
func (f *Fs) moveDirectory(ctx context.Context, source string, destination string, onExist OnExistAction) (*api.HiDriveObject, error) {
return f.copyOrMove(ctx, true, MoveOriginal, source, destination, onExist)
}
// copyFile copies the file at the source-path to the destination-path and
// returns the resulting api-object if successful.
//
// The operation will only be successful
// if the parent-directory of the destination-path exists.
//
// NOTE: This operation will expand sparse areas in the content of the source-file
// to blocks of 0-bytes in the destination-file.
func (f *Fs) copyFile(ctx context.Context, source string, destination string, onExist OnExistAction) (*api.HiDriveObject, error) {
return f.copyOrMove(ctx, false, CopyOriginalPreserveModTime, source, destination, onExist)
}
// moveFile moves the file at the source-path to the destination-path and
// returns the resulting api-object if successful.
//
// The operation will only be successful
// if the parent-directory of the destination-path exists.
//
// NOTE: This operation may expand sparse areas in the content of the source-file
// to blocks of 0-bytes in the destination-file.
func (f *Fs) moveFile(ctx context.Context, source string, destination string, onExist OnExistAction) (*api.HiDriveObject, error) {
return f.copyOrMove(ctx, false, MoveOriginal, source, destination, onExist)
}
// createDirectory creates the directory at the given path and
// returns the resulting api-object if successful.
//
// The directory will only be created if its parent-directory exists.
// This returns fs.ErrorDirNotFound if the parent-directory is not found.
// This returns fs.ErrorDirExists if the directory already exists.
func (f *Fs) createDirectory(ctx context.Context, directory string, onExist OnExistAction) (*api.HiDriveObject, error) {
parameters := api.NewQueryParameters()
parameters.SetPath(directory)
if onExist == AutoNameOnExist {
parameters.Set("on_exist", string(onExist))
}
opts := rest.Opts{
Method: "POST",
Path: "/dir",
Parameters: parameters.Values,
}
var result api.HiDriveObject
var resp *http.Response
var err error
err = f.pacer.Call(func() (bool, error) {
resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
return f.shouldRetry(ctx, resp, err)
})
switch {
case err == nil:
return &result, nil
case isHTTPError(err, 404):
return nil, fs.ErrorDirNotFound
case isHTTPError(err, 409):
return nil, fs.ErrorDirExists
}
return nil, err
}
// createDirectories creates the directory at the given path
// along with any missing parent directories and
// returns the resulting api-object (of the created directory) if successful.
//
// This returns fs.ErrorDirExists if the directory already exists.
//
// If an error occurs while the parent directories are being created,
// any directories already created will NOT be deleted again.
func (f *Fs) createDirectories(ctx context.Context, directory string, onExist OnExistAction) (*api.HiDriveObject, error) {
result, err := f.createDirectory(ctx, directory, onExist)
if err == nil {
return result, nil
}
if err != fs.ErrorDirNotFound {
return nil, err
}
parentDirectory := path.Dir(directory)
_, err = f.createDirectories(ctx, parentDirectory, onExist)
if err != nil && err != fs.ErrorDirExists {
return nil, err
}
// NOTE: Ignoring fs.ErrorDirExists does no harm,
// since it does not mean the child directory cannot be created.
return f.createDirectory(ctx, directory, onExist)
}
// deleteDirectory deletes the directory at the given path.
//
// If recursive is false, the directory will only be deleted if it is empty.
// If recursive is true, the directory will be deleted regardless of its content.
// This returns fs.ErrorDirNotFound if the directory is not found.
// This returns fs.ErrorDirectoryNotEmpty if the directory is not empty and
// recursive is false.
func (f *Fs) deleteDirectory(ctx context.Context, directory string, recursive bool) error {
parameters := api.NewQueryParameters()
parameters.SetPath(directory)
parameters.Set("recursive", strconv.FormatBool(recursive))
opts := rest.Opts{
Method: "DELETE",
Path: "/dir",
Parameters: parameters.Values,
NoResponse: true,
}
var resp *http.Response
var err error
err = f.pacer.Call(func() (bool, error) {
resp, err = f.srv.Call(ctx, &opts)
return f.shouldRetry(ctx, resp, err)
})
switch {
case isHTTPError(err, 404):
return fs.ErrorDirNotFound
case isHTTPError(err, 409):
return fs.ErrorDirectoryNotEmpty
}
return err
}
// deleteObject deletes the object/file at the given path.
//
// This returns fs.ErrorObjectNotFound if the object is not found.
func (f *Fs) deleteObject(ctx context.Context, path string) error {
parameters := api.NewQueryParameters()
parameters.SetPath(path)
opts := rest.Opts{
Method: "DELETE",
Path: "/file",
Parameters: parameters.Values,
NoResponse: true,
}
var resp *http.Response
var err error
err = f.pacer.Call(func() (bool, error) {
resp, err = f.srv.Call(ctx, &opts)
return f.shouldRetry(ctx, resp, err)
})
if isHTTPError(err, 404) {
return fs.ErrorObjectNotFound
}
return err
}
// createFile creates a file at the given path
// with the content of the io.ReadSeeker.
// This guarantees that existing files will not be overwritten.
// The maximum size of the content is limited by MaximumUploadBytes.
// The io.ReadSeeker should be resettable by seeking to its start.
// If modTime is not the zero time instant,
// it will be set as the file's modification time after the operation.
//
// This returns fs.ErrorDirNotFound
// if the parent directory of the file is not found.
// This returns ErrorFileExists if a file already exists at the specified path.
func (f *Fs) createFile(ctx context.Context, path string, content io.ReadSeeker, modTime time.Time, onExist OnExistAction) (*api.HiDriveObject, error) {
parameters := api.NewQueryParameters()
parameters.SetFileInDirectory(path)
if onExist == AutoNameOnExist {
parameters.Set("on_exist", string(onExist))
}
var err error
if !modTime.IsZero() {
err = parameters.SetTime("mtime", modTime)
if err != nil {
return nil, err
}
}
opts := rest.Opts{
Method: "POST",
Path: "/file",
Body: content,
ContentType: "application/octet-stream",
Parameters: parameters.Values,
}
var result api.HiDriveObject
var resp *http.Response
err = f.pacer.Call(func() (bool, error) {
// Reset the reading index (in case this is a retry).
if _, err = content.Seek(0, io.SeekStart); err != nil {
return false, err
}
resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
return f.shouldRetry(ctx, resp, err)
})
switch {
case err == nil:
return &result, nil
case isHTTPError(err, 404):
return nil, fs.ErrorDirNotFound
case isHTTPError(err, 409):
return nil, ErrorFileExists
}
return nil, err
}
// overwriteFile updates the content of the file at the given path
// with the content of the io.ReadSeeker.
// If the file does not exist it will be created.
// The maximum size of the content is limited by MaximumUploadBytes.
// The io.ReadSeeker should be resettable by seeking to its start.
// If modTime is not the zero time instant,
// it will be set as the file's modification time after the operation.
//
// This returns fs.ErrorDirNotFound
// if the parent directory of the file is not found.
func (f *Fs) overwriteFile(ctx context.Context, path string, content io.ReadSeeker, modTime time.Time) (*api.HiDriveObject, error) {
parameters := api.NewQueryParameters()
parameters.SetFileInDirectory(path)
var err error
if !modTime.IsZero() {
err = parameters.SetTime("mtime", modTime)
if err != nil {
return nil, err
}
}
opts := rest.Opts{
Method: "PUT",
Path: "/file",
Body: content,
ContentType: "application/octet-stream",
Parameters: parameters.Values,
}
var result api.HiDriveObject
var resp *http.Response
err = f.pacer.Call(func() (bool, error) {
// Reset the reading index (in case this is a retry).
if _, err = content.Seek(0, io.SeekStart); err != nil {
return false, err
}
resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
return f.shouldRetry(ctx, resp, err)
})
switch {
case err == nil:
return &result, nil
case isHTTPError(err, 404):
return nil, fs.ErrorDirNotFound
}
return nil, err
}
// uploadFileChunked updates the content of the existing file at the given path
// with the content of the io.Reader.
// Returns the position of the last successfully written byte, stopping before the first failed write.
// If nothing was written this will be 0.
// Returns the resulting api-object if successful.
//
// Replaces the file contents by uploading multiple chunks of the given size in parallel.
// Therefore this can and be used to upload files of any size efficiently.
// The number of parallel transfers is limited by transferLimit which should larger than 0.
// If modTime is not the zero time instant,
// it will be set as the file's modification time after the operation.
//
// NOTE: This method uses updateFileChunked and may create sparse files,
// if the upload of a chunk fails unexpectedly.
// See note about sparse files in patchFile.
// If any of the uploads fail, the process will be aborted and
// the first error that occurred will be returned.
// This is not an atomic operation,
// therefore if the upload fails the file may be partially modified.
//
// This returns fs.ErrorObjectNotFound if the object is not found.
func (f *Fs) uploadFileChunked(ctx context.Context, path string, content io.Reader, modTime time.Time, chunkSize int, transferLimit int64) (okSize uint64, info *api.HiDriveObject, err error) {
okSize, err = f.updateFileChunked(ctx, path, content, 0, chunkSize, transferLimit)
if err == nil {
info, err = f.resizeFile(ctx, path, okSize, modTime)
}
return okSize, info, err
}
// updateFileChunked updates the content of the existing file at the given path
// starting at the given offset.
// Returns the position of the last successfully written byte, stopping before the first failed write.
// If nothing was written this will be 0.
//
// Replaces the file contents starting from the given byte offset
// with the content of the io.Reader.
// If the offset is beyond the file end, the file is extended up to the offset.
//
// The upload is done multiple chunks of the given size in parallel.
// Therefore this can and be used to upload files of any size efficiently.
// The number of parallel transfers is limited by transferLimit which should larger than 0.
//
// NOTE: Because it is inefficient to set the modification time with every chunk,
// setting it to a specific value must be done in a separate request
// after this operation finishes.
//
// NOTE: This method uses patchFile and may create sparse files,
// especially if the upload of a chunk fails unexpectedly.
// See note about sparse files in patchFile.
// If any of the uploads fail, the process will be aborted and
// the first error that occurred will be returned.
// This is not an atomic operation,
// therefore if the upload fails the file may be partially modified.
//
// This returns fs.ErrorObjectNotFound if the object is not found.
func (f *Fs) updateFileChunked(ctx context.Context, path string, content io.Reader, offset uint64, chunkSize int, transferLimit int64) (okSize uint64, err error) {
var (
okChunksMu sync.Mutex // protects the variables below
okChunks []ranges.Range
)
g, gCtx := errgroup.WithContext(ctx)
transferSemaphore := semaphore.NewWeighted(transferLimit)
var readErr error
startMoreTransfers := true
zeroTime := time.Time{}
for chunk := uint64(0); startMoreTransfers; chunk++ {
// Acquire semaphore to limit number of transfers in parallel.
readErr = transferSemaphore.Acquire(gCtx, 1)
if readErr != nil {
break
}
// Read a chunk of data.
chunkReader, bytesRead, readErr := readerForChunk(content, chunkSize)
if bytesRead < chunkSize {
startMoreTransfers = false
}
if readErr != nil || bytesRead <= 0 {
break
}
// Transfer the chunk.
chunkOffset := uint64(chunkSize)*chunk + offset
g.Go(func() error {
// After this upload is done,
// signal that another transfer can be started.
defer transferSemaphore.Release(1)
uploadErr := f.patchFile(gCtx, path, cachedReader(chunkReader), chunkOffset, zeroTime)
if uploadErr == nil {
// Remember successfully written chunks.
okChunksMu.Lock()
okChunks = append(okChunks, ranges.Range{Pos: int64(chunkOffset), Size: int64(bytesRead)})
okChunksMu.Unlock()
fs.Debugf(f, "Done uploading chunk of size %v at offset %v.", bytesRead, chunkOffset)
} else {
fs.Infof(f, "Error while uploading chunk at offset %v. Error is %v.", chunkOffset, uploadErr)
}
return uploadErr
})
}
if readErr != nil {
// Log the error in case it is later ignored because of an upload-error.
fs.Infof(f, "Error while reading/preparing to upload a chunk. Error is %v.", readErr)
}
err = g.Wait()
// Compute the first continuous range of the file content,
// which does not contain any failed chunks.
// Do not forget to add the file content up to the starting offset,
// which is presumed to be already correct.
rs := ranges.Ranges{}
rs.Insert(ranges.Range{Pos: 0, Size: int64(offset)})
for _, chunkRange := range okChunks {
rs.Insert(chunkRange)
}
if len(rs) > 0 && rs[0].Pos == 0 {
okSize = uint64(rs[0].Size)
}
if err != nil {
return okSize, err
}
if readErr != nil {
return okSize, readErr
}
return okSize, nil
}
// patchFile updates the content of the existing file at the given path
// starting at the given offset.
//
// Replaces the file contents starting from the given byte offset
// with the content of the io.ReadSeeker.
// If the offset is beyond the file end, the file is extended up to the offset.
// The maximum size of the update is limited by MaximumUploadBytes.
// The io.ReadSeeker should be resettable by seeking to its start.
// If modTime is not the zero time instant,
// it will be set as the file's modification time after the operation.
//
// NOTE: By extending the file up to the offset this may create sparse files,
// which allocate less space on the file system than their apparent size indicates,
// since holes between data chunks are "real" holes
// and not regions made up of consecutive 0-bytes.
// Subsequent operations (such as copying data)
// usually expand the holes into regions of 0-bytes.
//
// This returns fs.ErrorObjectNotFound if the object is not found.
func (f *Fs) patchFile(ctx context.Context, path string, content io.ReadSeeker, offset uint64, modTime time.Time) error {
parameters := api.NewQueryParameters()
parameters.SetPath(path)
parameters.Set("offset", strconv.FormatUint(offset, 10))
if !modTime.IsZero() {
err := parameters.SetTime("mtime", modTime)
if err != nil {
return err
}
}
opts := rest.Opts{
Method: "PATCH",
Path: "/file",
Body: content,
ContentType: "application/octet-stream",
Parameters: parameters.Values,
NoResponse: true,
}
var resp *http.Response
var err error
err = f.pacer.Call(func() (bool, error) {
// Reset the reading index (in case this is a retry).
_, err = content.Seek(0, io.SeekStart)
if err != nil {
return false, err
}
resp, err = f.srv.Call(ctx, &opts)
if isHTTPError(err, 423) {
return true, err
}
return f.shouldRetry(ctx, resp, err)
})
if isHTTPError(err, 404) {
return fs.ErrorObjectNotFound
}
return err
}
// resizeFile updates the existing file at the given path to be of the given size
// and returns the resulting api-object if successful.
//
// If the given size is smaller than the current filesize,
// the file is cut/truncated at that position.
// If the given size is larger, the file is extended up to that position.
// If modTime is not the zero time instant,
// it will be set as the file's modification time after the operation.
//
// NOTE: By extending the file this may create sparse files,
// which allocate less space on the file system than their apparent size indicates,
// since holes between data chunks are "real" holes
// and not regions made up of consecutive 0-bytes.
// Subsequent operations (such as copying data)
// usually expand the holes into regions of 0-bytes.
//
// This returns fs.ErrorObjectNotFound if the object is not found.
func (f *Fs) resizeFile(ctx context.Context, path string, size uint64, modTime time.Time) (*api.HiDriveObject, error) {
parameters := api.NewQueryParameters()
parameters.SetPath(path)
parameters.Set("size", strconv.FormatUint(size, 10))
if !modTime.IsZero() {
err := parameters.SetTime("mtime", modTime)
if err != nil {
return nil, err
}
}
opts := rest.Opts{
Method: "POST",
Path: "/file/truncate",
Parameters: parameters.Values,
}
var result api.HiDriveObject
var resp *http.Response
var err error
err = f.pacer.Call(func() (bool, error) {
resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
return f.shouldRetry(ctx, resp, err)
})
switch {
case err == nil:
return &result, nil
case isHTTPError(err, 404):
return nil, fs.ErrorObjectNotFound
}
return nil, err
}
// ------------------------------------------------------------
// isHTTPError compares the numerical status code
// of an api.Error to the given HTTP status.
//
// If the given error is not an api.Error or
// a numerical status code could not be determined, this returns false.
// Otherwise this returns whether the status code of the error is equal to the given status.
func isHTTPError(err error, status int64) bool {
if apiErr, ok := err.(*api.Error); ok {
errStatus, decodeErr := apiErr.Code.Int64()
if decodeErr == nil && errStatus == status {
return true
}
}
return false
}
// createHiDriveScopes creates oauth-scopes
// from the given user-role and access-permissions.
//
// If the arguments are empty, they will not be included in the result.
func createHiDriveScopes(role string, access string) []string {
switch {
case role != "" && access != "":
return []string{access + "," + role}
case role != "":
return []string{role}
case access != "":
return []string{access}
}
return []string{}
}
// cachedReader returns a version of the reader that caches its contents and
// can therefore be reset using Seek.
func cachedReader(reader io.Reader) io.ReadSeeker {
bytesReader, ok := reader.(*bytes.Reader)
if ok {
return bytesReader
}
repeatableReader, ok := reader.(*readers.RepeatableReader)
if ok {
return repeatableReader
}
return readers.NewRepeatableReader(reader)
}
// readerForChunk reads a chunk of bytes from reader (after handling any accounting).
// Returns a new io.Reader (chunkReader) for that chunk
// and the number of bytes that have been read from reader.
func readerForChunk(reader io.Reader, length int) (chunkReader io.Reader, bytesRead int, err error) {
// Unwrap any accounting from the input if present.
reader, wrap := accounting.UnWrap(reader)
// Read a chunk of data.
buffer := make([]byte, length)
bytesRead, err = io.ReadFull(reader, buffer)
if err == io.EOF || err == io.ErrUnexpectedEOF {
err = nil
}
if err != nil {
return nil, bytesRead, err
}
// Truncate unused capacity.
buffer = buffer[:bytesRead]
// Use wrap to put any accounting back for chunkReader.
return wrap(bytes.NewReader(buffer)), bytesRead, nil
}