From 4c1cb0622e39a2a91edcab28ebea95c77f186b9d Mon Sep 17 00:00:00 2001 From: Georg Welzel Date: Sat, 8 Jun 2024 19:07:01 -0700 Subject: [PATCH] backend: pcloud: Implement OpenWriterAt feature --- backend/pcloud/api/types.go | 31 ++++++ backend/pcloud/pcloud.go | 88 +++++++++++---- backend/pcloud/writer_at.go | 216 ++++++++++++++++++++++++++++++++++++ 3 files changed, 315 insertions(+), 20 deletions(-) create mode 100644 backend/pcloud/writer_at.go diff --git a/backend/pcloud/api/types.go b/backend/pcloud/api/types.go index c1b5dc217..6f2d6361b 100644 --- a/backend/pcloud/api/types.go +++ b/backend/pcloud/api/types.go @@ -109,6 +109,37 @@ type Hashes struct { SHA256 string `json:"sha256"` } +// FileTruncateResponse is the response from /file_truncate +type FileTruncateResponse struct { + Error +} + +// FileCloseResponse is the response from /file_close +type FileCloseResponse struct { + Error +} + +// FileOpenResponse is the response from /file_open +type FileOpenResponse struct { + Error + Fileid int64 `json:"fileid"` + FileDescriptor int64 `json:"fd"` +} + +// FileChecksumResponse is the response from /file_checksum +type FileChecksumResponse struct { + Error + MD5 string `json:"md5"` + SHA1 string `json:"sha1"` + SHA256 string `json:"sha256"` +} + +// FilePWriteResponse is the response from /file_pwrite +type FilePWriteResponse struct { + Error + Bytes int64 `json:"bytes"` +} + // UploadFileResponse is the response from /uploadfile type UploadFileResponse struct { Error diff --git a/backend/pcloud/pcloud.go b/backend/pcloud/pcloud.go index f17017cc1..dc475cb9d 100644 --- a/backend/pcloud/pcloud.go +++ b/backend/pcloud/pcloud.go @@ -14,6 +14,7 @@ import ( "net/http" "net/url" "path" + "strconv" "strings" "time" @@ -146,7 +147,8 @@ we have to rely on user password authentication for it.`, Help: "Your pcloud password.", IsPassword: true, Advanced: true, - }}...), + }, + }...), }) } @@ -161,15 +163,16 @@ type Options struct { // Fs represents a remote pcloud type Fs struct { - name string // name of this remote - root string // the path we are working on - opt Options // parsed options - features *fs.Features // optional features - srv *rest.Client // the connection to the server - cleanupSrv *rest.Client // the connection used for the cleanup method - dirCache *dircache.DirCache // Map of directory path to directory id - pacer *fs.Pacer // pacer for API calls - tokenRenewer *oauthutil.Renew // renew the token on expiry + name string // name of this remote + root string // the path we are working on + opt Options // parsed options + features *fs.Features // optional features + ts *oauthutil.TokenSource // the token source, used to create new clients + srv *rest.Client // the connection to the server + cleanupSrv *rest.Client // the connection used for the cleanup method + dirCache *dircache.DirCache // Map of directory path to directory id + pacer *fs.Pacer // pacer for API calls + tokenRenewer *oauthutil.Renew // renew the token on expiry } // Object describes a pcloud object @@ -317,6 +320,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e name: name, root: root, opt: *opt, + ts: ts, srv: rest.NewClient(oAuthClient).SetRoot("https://" + opt.Hostname), pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } @@ -326,6 +330,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e f.features = (&fs.Features{ CaseInsensitive: false, CanHaveEmptyDirectories: true, + PartialUploads: true, }).Fill(ctx, f) if !canCleanup { f.features.CleanUp = nil @@ -333,7 +338,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e f.srv.SetErrorHandler(errorHandler) // Renew the token in the background - f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error { + f.tokenRenewer = oauthutil.NewRenew(f.String(), f.ts, func() error { _, err := f.readMetaDataForPath(ctx, "") return err }) @@ -375,6 +380,56 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e return f, nil } +// OpenWriterAt opens with a handle for random access writes +// +// Pass in the remote desired and the size if known. +// +// It truncates any existing object +func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) { + client, err := f.newSingleConnClient(ctx) + if err != nil { + return nil, fmt.Errorf("create client: %w", err) + } + // init an empty file + leaf, directoryID, err := f.dirCache.FindPath(ctx, remote, true) + if err != nil { + return nil, fmt.Errorf("resolve src: %w", err) + } + openResult, err := fileOpenNew(ctx, client, f, directoryID, leaf) + if err != nil { + return nil, fmt.Errorf("open file: %w", err) + } + + writer := &writerAt{ + ctx: ctx, + client: client, + fs: f, + size: size, + remote: remote, + fd: openResult.FileDescriptor, + fileID: openResult.Fileid, + } + + return writer, nil +} + +// Create a new http client, accepting keep-alive headers, limited to single connection. +// Necessary for pcloud fileops API, as it binds the session to the underlying TCP connection. +// File descriptors are only valid within the same connection and auto-closed when the connection is closed, +// hence we need a separate client (with single connection) for each fd to avoid all sorts of errors and race conditions. +func (f *Fs) newSingleConnClient(ctx context.Context) (*rest.Client, error) { + baseClient := fshttp.NewClient(ctx) + baseClient.Transport = fshttp.NewTransportCustom(ctx, func(t *http.Transport) { + t.MaxConnsPerHost = 1 + t.DisableKeepAlives = false + }) + // Set our own http client in the context + ctx = oauthutil.Context(ctx, baseClient) + // create a new oauth client, re-use the token source + oAuthClient := oauth2.NewClient(ctx, f.ts) + return rest.NewClient(oAuthClient).SetRoot("https://" + f.opt.Hostname), nil +} + // Return an Object from a path // // If it can't be found it returns the error fs.ErrorObjectNotFound. @@ -1098,14 +1153,7 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error { if err != nil { return err } - return o.setModTime(ctx, fileIDtoNumber(o.id), filename, directoryID, modTime) -} - -func (o *Object) setModTime( - ctx context.Context, - fileID, filename, directoryID string, - modTime time.Time, -) error { + fileID := fileIDtoNumber(o.id) filename = o.fs.opt.Enc.FromStandardName(filename) opts := rest.Opts{ Method: "PUT", @@ -1124,7 +1172,7 @@ func (o *Object) setModTime( opts.Parameters.Set("mtime", strconv.FormatInt(modTime.Unix(), 10)) result := &api.ItemResult{} - err := o.fs.pacer.CallNoRetry(func() (bool, error) { + err = o.fs.pacer.CallNoRetry(func() (bool, error) { resp, err := o.fs.srv.CallJSON(ctx, &opts, nil, result) err = result.Error.Update(err) return shouldRetry(ctx, resp, err) diff --git a/backend/pcloud/writer_at.go b/backend/pcloud/writer_at.go new file mode 100644 index 000000000..ee3380a71 --- /dev/null +++ b/backend/pcloud/writer_at.go @@ -0,0 +1,216 @@ +package pcloud + +import ( + "bytes" + "context" + "crypto/sha1" + "encoding/hex" + "fmt" + "net/url" + "strconv" + "time" + + "github.com/rclone/rclone/backend/pcloud/api" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/lib/rest" +) + +// writerAt implements fs.WriterAtCloser, adding the OpenWrtierAt feature to pcloud. +type writerAt struct { + ctx context.Context + client *rest.Client + fs *Fs + size int64 + remote string + fd int64 + fileID int64 +} + +// Close implements WriterAt.Close. +func (c *writerAt) Close() error { + // close fd + if _, err := c.fileClose(c.ctx); err != nil { + return fmt.Errorf("close fd: %w", err) + } + + // Avoiding race conditions: Depending on the tcp connection, there might be + // caching issues when checking the size immediately after write. + // Hence we try avoiding them by checking the resulting size on a different connection. + if c.size < 0 { + // Without knowing the size, we cannot do size checks. + // Falling back to a sleep of 1s for sake of hope. + time.Sleep(1 * time.Second) + return nil + } + sizeOk := false + sizeLastSeen := int64(0) + for retry := 0; retry < 5; retry++ { + fs.Debugf(c.remote, "checking file size: try %d/5", retry) + obj, err := c.fs.NewObject(c.ctx, c.remote) + if err != nil { + return fmt.Errorf("get uploaded obj: %w", err) + } + sizeLastSeen = obj.Size() + if obj.Size() == c.size { + sizeOk = true + break + } + time.Sleep(1 * time.Second) + } + + if !sizeOk { + return fmt.Errorf("incorrect size after upload: got %d, want %d", sizeLastSeen, c.size) + } + + return nil +} + +// WriteAt implements fs.WriteAt. +func (c *writerAt) WriteAt(buffer []byte, offset int64) (n int, err error) { + contentLength := len(buffer) + + inSHA1Bytes := sha1.Sum(buffer) + inSHA1 := hex.EncodeToString(inSHA1Bytes[:]) + + // get target hash + outChecksum, err := c.fileChecksum(c.ctx, offset, int64(contentLength)) + if err != nil { + return 0, err + } + outSHA1 := outChecksum.SHA1 + + if outSHA1 == "" || inSHA1 == "" { + return 0, fmt.Errorf("expect both hashes to be filled: src: %q, target: %q", inSHA1, outSHA1) + } + + // check hash of buffer, skip if fits + if inSHA1 == outSHA1 { + return contentLength, nil + } + + // upload buffer with offset if necessary + if _, err := c.filePWrite(c.ctx, offset, buffer); err != nil { + return 0, err + } + + return contentLength, nil +} + +// Call pcloud file_open using folderid and name with O_CREAT and O_WRITE flags, see [API Doc.] +// [API Doc]: https://docs.pcloud.com/methods/fileops/file_open.html +func fileOpenNew(ctx context.Context, c *rest.Client, srcFs *Fs, directoryID, filename string) (*api.FileOpenResponse, error) { + opts := rest.Opts{ + Method: "PUT", + Path: "/file_open", + Parameters: url.Values{}, + TransferEncoding: []string{"identity"}, // pcloud doesn't like chunked encoding + ExtraHeaders: map[string]string{ + "Connection": "keep-alive", + }, + } + filename = srcFs.opt.Enc.FromStandardName(filename) + opts.Parameters.Set("name", filename) + opts.Parameters.Set("folderid", dirIDtoNumber(directoryID)) + opts.Parameters.Set("flags", "0x0042") // O_CREAT, O_WRITE + + result := &api.FileOpenResponse{} + err := srcFs.pacer.CallNoRetry(func() (bool, error) { + resp, err := c.CallJSON(ctx, &opts, nil, result) + err = result.Error.Update(err) + return shouldRetry(ctx, resp, err) + }) + if err != nil { + return nil, fmt.Errorf("open new file descriptor: %w", err) + } + return result, nil +} + +// Call pcloud file_checksum, see [API Doc.] +// [API Doc]: https://docs.pcloud.com/methods/fileops/file_checksum.html +func (c *writerAt) fileChecksum( + ctx context.Context, + offset, count int64, +) (*api.FileChecksumResponse, error) { + opts := rest.Opts{ + Method: "PUT", + Path: "/file_checksum", + Parameters: url.Values{}, + TransferEncoding: []string{"identity"}, // pcloud doesn't like chunked encoding + ExtraHeaders: map[string]string{ + "Connection": "keep-alive", + }, + } + opts.Parameters.Set("fd", strconv.FormatInt(c.fd, 10)) + opts.Parameters.Set("offset", strconv.FormatInt(offset, 10)) + opts.Parameters.Set("count", strconv.FormatInt(count, 10)) + + result := &api.FileChecksumResponse{} + err := c.fs.pacer.CallNoRetry(func() (bool, error) { + resp, err := c.client.CallJSON(ctx, &opts, nil, result) + err = result.Error.Update(err) + return shouldRetry(ctx, resp, err) + }) + if err != nil { + return nil, fmt.Errorf("checksum of fd %d with offset %d and size %d: %w", c.fd, offset, count, err) + } + return result, nil +} + +// Call pcloud file_pwrite, see [API Doc.] +// [API Doc]: https://docs.pcloud.com/methods/fileops/file_pwrite.html +func (c *writerAt) filePWrite( + ctx context.Context, + offset int64, + buf []byte, +) (*api.FilePWriteResponse, error) { + contentLength := int64(len(buf)) + opts := rest.Opts{ + Method: "PUT", + Path: "/file_pwrite", + Body: bytes.NewReader(buf), + ContentLength: &contentLength, + Parameters: url.Values{}, + TransferEncoding: []string{"identity"}, // pcloud doesn't like chunked encoding + Close: false, + ExtraHeaders: map[string]string{ + "Connection": "keep-alive", + }, + } + opts.Parameters.Set("fd", strconv.FormatInt(c.fd, 10)) + opts.Parameters.Set("offset", strconv.FormatInt(offset, 10)) + + result := &api.FilePWriteResponse{} + err := c.fs.pacer.CallNoRetry(func() (bool, error) { + resp, err := c.client.CallJSON(ctx, &opts, nil, result) + err = result.Error.Update(err) + return shouldRetry(ctx, resp, err) + }) + if err != nil { + return nil, fmt.Errorf("write %d bytes to fd %d with offset %d: %w", contentLength, c.fd, offset, err) + } + return result, nil +} + +// Call pcloud file_close, see [API Doc.] +// [API Doc]: https://docs.pcloud.com/methods/fileops/file_close.html +func (c *writerAt) fileClose(ctx context.Context) (*api.FileCloseResponse, error) { + opts := rest.Opts{ + Method: "PUT", + Path: "/file_close", + Parameters: url.Values{}, + TransferEncoding: []string{"identity"}, // pcloud doesn't like chunked encoding + Close: true, + } + opts.Parameters.Set("fd", strconv.FormatInt(c.fd, 10)) + + result := &api.FileCloseResponse{} + err := c.fs.pacer.CallNoRetry(func() (bool, error) { + resp, err := c.client.CallJSON(ctx, &opts, nil, result) + err = result.Error.Update(err) + return shouldRetry(ctx, resp, err) + }) + if err != nil { + return nil, fmt.Errorf("close file descriptor: %w", err) + } + return result, nil +}