mirror of
https://github.com/rclone/rclone.git
synced 2024-12-24 13:03:51 +08:00
plcoud: fix failing large file uploads - fixes #8147
This changes the OpenWriterAt implementation to make client/fd handling atomic. This PR stabilizes the situation of bigger files and multi-threaded uploads. The root cause boils down to the old "fun" property of pclouds fileops API: sessions are bound to TCP connections. This forces us to use a http client with only a single connection underneath. With large files, we reuse the same connection for each chunk. If that connection interrupts (e.g. because we are talking through the internet), all chunks will fail. The probability for latter one increases with larger files. As the point of the whole multi-threaded feature was to speed-up large files in the first place, this change pulls the client creation (and hence connection handling) into each chunk. This should stabilize the situation, as each chunk (and retry) gets its own connection.
This commit is contained in:
parent
ab58ae5b03
commit
40111ba5e1
|
@ -399,14 +399,15 @@ func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.Wr
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("open file: %w", err)
|
return nil, fmt.Errorf("open file: %w", err)
|
||||||
}
|
}
|
||||||
|
if _, err := fileClose(ctx, client, f.pacer, openResult.FileDescriptor); err != nil {
|
||||||
|
return nil, fmt.Errorf("close file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
writer := &writerAt{
|
writer := &writerAt{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
client: client,
|
|
||||||
fs: f,
|
fs: f,
|
||||||
size: size,
|
size: size,
|
||||||
remote: remote,
|
remote: remote,
|
||||||
fd: openResult.FileDescriptor,
|
|
||||||
fileID: openResult.Fileid,
|
fileID: openResult.Fileid,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,21 +18,14 @@ import (
|
||||||
// writerAt implements fs.WriterAtCloser, adding the OpenWrtierAt feature to pcloud.
|
// writerAt implements fs.WriterAtCloser, adding the OpenWrtierAt feature to pcloud.
|
||||||
type writerAt struct {
|
type writerAt struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
client *rest.Client
|
|
||||||
fs *Fs
|
fs *Fs
|
||||||
size int64
|
size int64
|
||||||
remote string
|
remote string
|
||||||
fd int64
|
|
||||||
fileID int64
|
fileID int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close implements WriterAt.Close.
|
// Close implements WriterAt.Close.
|
||||||
func (c *writerAt) Close() error {
|
func (c *writerAt) Close() error {
|
||||||
// close fd
|
|
||||||
if _, err := c.fileClose(c.ctx); err != nil {
|
|
||||||
return fmt.Errorf("close fd: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Avoiding race conditions: Depending on the tcp connection, there might be
|
// Avoiding race conditions: Depending on the tcp connection, there might be
|
||||||
// caching issues when checking the size immediately after write.
|
// caching issues when checking the size immediately after write.
|
||||||
// Hence we try avoiding them by checking the resulting size on a different connection.
|
// Hence we try avoiding them by checking the resulting size on a different connection.
|
||||||
|
@ -72,8 +65,18 @@ func (c *writerAt) WriteAt(buffer []byte, offset int64) (n int, err error) {
|
||||||
inSHA1Bytes := sha1.Sum(buffer)
|
inSHA1Bytes := sha1.Sum(buffer)
|
||||||
inSHA1 := hex.EncodeToString(inSHA1Bytes[:])
|
inSHA1 := hex.EncodeToString(inSHA1Bytes[:])
|
||||||
|
|
||||||
|
client, err := c.fs.newSingleConnClient(c.ctx)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("create client: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
openResult, err := fileOpen(c.ctx, client, c.fs, c.fileID)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("open file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// get target hash
|
// get target hash
|
||||||
outChecksum, err := c.fileChecksum(c.ctx, offset, int64(contentLength))
|
outChecksum, err := fileChecksum(c.ctx, client, c.fs.pacer, openResult.FileDescriptor, offset, int64(contentLength))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -89,10 +92,15 @@ func (c *writerAt) WriteAt(buffer []byte, offset int64) (n int, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// upload buffer with offset if necessary
|
// upload buffer with offset if necessary
|
||||||
if _, err := c.filePWrite(c.ctx, offset, buffer); err != nil {
|
if _, err := filePWrite(c.ctx, client, c.fs.pacer, openResult.FileDescriptor, offset, buffer); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// close fd
|
||||||
|
if _, err := fileClose(c.ctx, client, c.fs.pacer, openResult.FileDescriptor); err != nil {
|
||||||
|
return contentLength, fmt.Errorf("close fd: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
return contentLength, nil
|
return contentLength, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,11 +133,40 @@ func fileOpenNew(ctx context.Context, c *rest.Client, srcFs *Fs, directoryID, fi
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Call pcloud file_open using fileid with O_WRITE flags, see [API Doc.]
|
||||||
|
// [API Doc]: https://docs.pcloud.com/methods/fileops/file_open.html
|
||||||
|
func fileOpen(ctx context.Context, c *rest.Client, srcFs *Fs, fileID int64) (*api.FileOpenResponse, error) {
|
||||||
|
opts := rest.Opts{
|
||||||
|
Method: "PUT",
|
||||||
|
Path: "/file_open",
|
||||||
|
Parameters: url.Values{},
|
||||||
|
TransferEncoding: []string{"identity"}, // pcloud doesn't like chunked encoding
|
||||||
|
ExtraHeaders: map[string]string{
|
||||||
|
"Connection": "keep-alive",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
opts.Parameters.Set("fileid", strconv.FormatInt(fileID, 10))
|
||||||
|
opts.Parameters.Set("flags", "0x0002") // O_WRITE
|
||||||
|
|
||||||
|
result := &api.FileOpenResponse{}
|
||||||
|
err := srcFs.pacer.CallNoRetry(func() (bool, error) {
|
||||||
|
resp, err := c.CallJSON(ctx, &opts, nil, result)
|
||||||
|
err = result.Error.Update(err)
|
||||||
|
return shouldRetry(ctx, resp, err)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("open new file descriptor: %w", err)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Call pcloud file_checksum, see [API Doc.]
|
// Call pcloud file_checksum, see [API Doc.]
|
||||||
// [API Doc]: https://docs.pcloud.com/methods/fileops/file_checksum.html
|
// [API Doc]: https://docs.pcloud.com/methods/fileops/file_checksum.html
|
||||||
func (c *writerAt) fileChecksum(
|
func fileChecksum(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
offset, count int64,
|
client *rest.Client,
|
||||||
|
pacer *fs.Pacer,
|
||||||
|
fd, offset, count int64,
|
||||||
) (*api.FileChecksumResponse, error) {
|
) (*api.FileChecksumResponse, error) {
|
||||||
opts := rest.Opts{
|
opts := rest.Opts{
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
|
@ -140,26 +177,29 @@ func (c *writerAt) fileChecksum(
|
||||||
"Connection": "keep-alive",
|
"Connection": "keep-alive",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
opts.Parameters.Set("fd", strconv.FormatInt(c.fd, 10))
|
opts.Parameters.Set("fd", strconv.FormatInt(fd, 10))
|
||||||
opts.Parameters.Set("offset", strconv.FormatInt(offset, 10))
|
opts.Parameters.Set("offset", strconv.FormatInt(offset, 10))
|
||||||
opts.Parameters.Set("count", strconv.FormatInt(count, 10))
|
opts.Parameters.Set("count", strconv.FormatInt(count, 10))
|
||||||
|
|
||||||
result := &api.FileChecksumResponse{}
|
result := &api.FileChecksumResponse{}
|
||||||
err := c.fs.pacer.CallNoRetry(func() (bool, error) {
|
err := pacer.CallNoRetry(func() (bool, error) {
|
||||||
resp, err := c.client.CallJSON(ctx, &opts, nil, result)
|
resp, err := client.CallJSON(ctx, &opts, nil, result)
|
||||||
err = result.Error.Update(err)
|
err = result.Error.Update(err)
|
||||||
return shouldRetry(ctx, resp, err)
|
return shouldRetry(ctx, resp, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("checksum of fd %d with offset %d and size %d: %w", c.fd, offset, count, err)
|
return nil, fmt.Errorf("checksum of fd %d with offset %d and size %d: %w", fd, offset, count, err)
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call pcloud file_pwrite, see [API Doc.]
|
// Call pcloud file_pwrite, see [API Doc.]
|
||||||
// [API Doc]: https://docs.pcloud.com/methods/fileops/file_pwrite.html
|
// [API Doc]: https://docs.pcloud.com/methods/fileops/file_pwrite.html
|
||||||
func (c *writerAt) filePWrite(
|
func filePWrite(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
client *rest.Client,
|
||||||
|
pacer *fs.Pacer,
|
||||||
|
fd int64,
|
||||||
offset int64,
|
offset int64,
|
||||||
buf []byte,
|
buf []byte,
|
||||||
) (*api.FilePWriteResponse, error) {
|
) (*api.FilePWriteResponse, error) {
|
||||||
|
@ -176,24 +216,29 @@ func (c *writerAt) filePWrite(
|
||||||
"Connection": "keep-alive",
|
"Connection": "keep-alive",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
opts.Parameters.Set("fd", strconv.FormatInt(c.fd, 10))
|
opts.Parameters.Set("fd", strconv.FormatInt(fd, 10))
|
||||||
opts.Parameters.Set("offset", strconv.FormatInt(offset, 10))
|
opts.Parameters.Set("offset", strconv.FormatInt(offset, 10))
|
||||||
|
|
||||||
result := &api.FilePWriteResponse{}
|
result := &api.FilePWriteResponse{}
|
||||||
err := c.fs.pacer.CallNoRetry(func() (bool, error) {
|
err := pacer.CallNoRetry(func() (bool, error) {
|
||||||
resp, err := c.client.CallJSON(ctx, &opts, nil, result)
|
resp, err := client.CallJSON(ctx, &opts, nil, result)
|
||||||
err = result.Error.Update(err)
|
err = result.Error.Update(err)
|
||||||
return shouldRetry(ctx, resp, err)
|
return shouldRetry(ctx, resp, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("write %d bytes to fd %d with offset %d: %w", contentLength, c.fd, offset, err)
|
return nil, fmt.Errorf("write %d bytes to fd %d with offset %d: %w", contentLength, fd, offset, err)
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call pcloud file_close, see [API Doc.]
|
// Call pcloud file_close, see [API Doc.]
|
||||||
// [API Doc]: https://docs.pcloud.com/methods/fileops/file_close.html
|
// [API Doc]: https://docs.pcloud.com/methods/fileops/file_close.html
|
||||||
func (c *writerAt) fileClose(ctx context.Context) (*api.FileCloseResponse, error) {
|
func fileClose(
|
||||||
|
ctx context.Context,
|
||||||
|
client *rest.Client,
|
||||||
|
pacer *fs.Pacer,
|
||||||
|
fd int64,
|
||||||
|
) (*api.FileCloseResponse, error) {
|
||||||
opts := rest.Opts{
|
opts := rest.Opts{
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
Path: "/file_close",
|
Path: "/file_close",
|
||||||
|
@ -201,11 +246,11 @@ func (c *writerAt) fileClose(ctx context.Context) (*api.FileCloseResponse, error
|
||||||
TransferEncoding: []string{"identity"}, // pcloud doesn't like chunked encoding
|
TransferEncoding: []string{"identity"}, // pcloud doesn't like chunked encoding
|
||||||
Close: true,
|
Close: true,
|
||||||
}
|
}
|
||||||
opts.Parameters.Set("fd", strconv.FormatInt(c.fd, 10))
|
opts.Parameters.Set("fd", strconv.FormatInt(fd, 10))
|
||||||
|
|
||||||
result := &api.FileCloseResponse{}
|
result := &api.FileCloseResponse{}
|
||||||
err := c.fs.pacer.CallNoRetry(func() (bool, error) {
|
err := pacer.CallNoRetry(func() (bool, error) {
|
||||||
resp, err := c.client.CallJSON(ctx, &opts, nil, result)
|
resp, err := client.CallJSON(ctx, &opts, nil, result)
|
||||||
err = result.Error.Update(err)
|
err = result.Error.Update(err)
|
||||||
return shouldRetry(ctx, resp, err)
|
return shouldRetry(ctx, resp, err)
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue
Block a user