drive: use multipart resumable uploads for streaming and uploads in mount

Before this change we used non multipart uploads for files of unknown
size (streaming and uploads in mount).  This is slower and less
reliable and is not recommended by Google for files smaller than 5MB.

After this change we use multipart resumable uploads for all files of
unknown length.  This will use an extra transaction so is less
efficient for files under the chunk size, however the natural
buffering in the operations.Rcat call specified by
`--streaming-upload-cutoff` will overcome this.

See: https://forum.rclone.org/t/upload-behaviour-and-speed-when-using-vfs-cache/9920/
This commit is contained in:
Nick Craig-Wood 2019-05-11 10:03:51 +01:00
parent 36157d8ae5
commit 6757244918
2 changed files with 39 additions and 46 deletions

View File

@ -1851,7 +1851,7 @@ func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo,
}
var info *drive.File
if size == 0 || size < int64(f.opt.UploadCutoff) {
if size >= 0 && size < int64(f.opt.UploadCutoff) {
// Make the API request to upload metadata and file data.
// Don't retry, return a retry error instead
err = f.pacer.CallNoRetry(func() (bool, error) {
@ -2845,7 +2845,7 @@ func (o *baseObject) update(ctx context.Context, updateInfo *drive.File, uploadM
src fs.ObjectInfo) (info *drive.File, err error) {
// Make the API request to upload metadata and file data.
size := src.Size()
if size == 0 || size < int64(o.fs.opt.UploadCutoff) {
if size >= 0 && size < int64(o.fs.opt.UploadCutoff) {
// Don't retry, return a retry error instead
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
info, err = o.fs.svc.Files.Update(o.id, updateInfo).

View File

@ -11,16 +11,15 @@
package drive
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"regexp"
"strconv"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/lib/readers"
@ -88,7 +87,9 @@ func (f *Fs) Upload(ctx context.Context, in io.Reader, size int64, contentType,
})
req.Header.Set("Content-Type", "application/json; charset=UTF-8")
req.Header.Set("X-Upload-Content-Type", contentType)
req.Header.Set("X-Upload-Content-Length", fmt.Sprintf("%v", size))
if size >= 0 {
req.Header.Set("X-Upload-Content-Length", fmt.Sprintf("%v", size))
}
res, err = f.client.Do(req)
if err == nil {
defer googleapi.CloseBody(res)
@ -116,49 +117,19 @@ func (rx *resumableUpload) makeRequest(ctx context.Context, start int64, body io
req, _ := http.NewRequest("POST", rx.URI, body)
req = req.WithContext(ctx) // go1.13 can use NewRequestWithContext
req.ContentLength = reqSize
totalSize := "*"
if rx.ContentLength >= 0 {
totalSize = strconv.FormatInt(rx.ContentLength, 10)
}
if reqSize != 0 {
req.Header.Set("Content-Range", fmt.Sprintf("bytes %v-%v/%v", start, start+reqSize-1, rx.ContentLength))
req.Header.Set("Content-Range", fmt.Sprintf("bytes %v-%v/%v", start, start+reqSize-1, totalSize))
} else {
req.Header.Set("Content-Range", fmt.Sprintf("bytes */%v", rx.ContentLength))
req.Header.Set("Content-Range", fmt.Sprintf("bytes */%v", totalSize))
}
req.Header.Set("Content-Type", rx.MediaType)
return req
}
// rangeRE matches the transfer status response from the server. $1 is
// the last byte index uploaded.
var rangeRE = regexp.MustCompile(`^0\-(\d+)$`)
// Query drive for the amount transferred so far
//
// If error is nil, then start should be valid
func (rx *resumableUpload) transferStatus(ctx context.Context) (start int64, err error) {
req := rx.makeRequest(ctx, 0, nil, 0)
res, err := rx.f.client.Do(req)
if err != nil {
return 0, err
}
defer googleapi.CloseBody(res)
if res.StatusCode == http.StatusCreated || res.StatusCode == http.StatusOK {
return rx.ContentLength, nil
}
if res.StatusCode != statusResumeIncomplete {
err = googleapi.CheckResponse(res)
if err != nil {
return 0, err
}
return 0, errors.Errorf("unexpected http return code %v", res.StatusCode)
}
Range := res.Header.Get("Range")
if m := rangeRE.FindStringSubmatch(Range); len(m) == 2 {
start, err = strconv.ParseInt(m[1], 10, 64)
if err == nil {
return start, nil
}
}
return 0, errors.Errorf("unable to parse range %q", Range)
}
// Transfer a chunk - caller must call googleapi.CloseBody(res) if err == nil || res != nil
func (rx *resumableUpload) transferChunk(ctx context.Context, start int64, chunk io.ReadSeeker, chunkSize int64) (int, error) {
_, _ = chunk.Seek(0, io.SeekStart)
@ -200,12 +171,34 @@ func (rx *resumableUpload) Upload(ctx context.Context) (*drive.File, error) {
var StatusCode int
var err error
buf := make([]byte, int(rx.f.opt.ChunkSize))
for start < rx.ContentLength {
reqSize := rx.ContentLength - start
if reqSize >= int64(rx.f.opt.ChunkSize) {
reqSize = int64(rx.f.opt.ChunkSize)
for finished := false; !finished; {
var reqSize int64
var chunk io.ReadSeeker
if rx.ContentLength >= 0 {
// If size known use repeatable reader for smoother bwlimit
if start >= rx.ContentLength {
break
}
reqSize = rx.ContentLength - start
if reqSize >= int64(rx.f.opt.ChunkSize) {
reqSize = int64(rx.f.opt.ChunkSize)
}
chunk = readers.NewRepeatableLimitReaderBuffer(rx.Media, buf, reqSize)
} else {
// If size unknown read into buffer
var n int
n, err = readers.ReadFill(rx.Media, buf)
if err == io.EOF {
// Send the last chunk with the correct ContentLength
// otherwise Google doesn't know we've finished
rx.ContentLength = start + int64(n)
finished = true
} else if err != nil {
return nil, err
}
reqSize = int64(n)
chunk = bytes.NewReader(buf[:reqSize])
}
chunk := readers.NewRepeatableLimitReaderBuffer(rx.Media, buf, reqSize)
// Transfer the chunk
err = rx.f.pacer.Call(func() (bool, error) {