yandex: implement streaming uploads (see #1614)

This commit is contained in:
Stefan Breunig 2017-08-19 14:07:23 +02:00
parent 323daae63e
commit a122b9fa7a
3 changed files with 44 additions and 22 deletions

View File

@ -813,8 +813,7 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
if size != -1 { if size != -1 {
chunks = int(size/chunkSize) + 1 chunks = int(size/chunkSize) + 1
} }
wc := &writeCounter{} in := fs.NewCountingReader(in0)
in := io.TeeReader(in0, wc)
fmtChunk := func(cur int, last bool) { fmtChunk := func(cur int, last bool) {
if chunks == 0 && last { if chunks == 0 && last {
@ -853,12 +852,12 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
// if the size is known, only upload full chunks. Remaining bytes are uploaded with // if the size is known, only upload full chunks. Remaining bytes are uploaded with
// the UploadSessionFinish request. // the UploadSessionFinish request.
break break
} else if chunks == 0 && wc.Written-cursor.Offset < uint64(chunkSize) { } else if chunks == 0 && in.BytesRead()-cursor.Offset < uint64(chunkSize) {
// if the size is unknown, upload as long as we can read full chunks from the reader. // if the size is unknown, upload as long as we can read full chunks from the reader.
// The UploadSessionFinish request will not contain any payload. // The UploadSessionFinish request will not contain any payload.
break break
} }
cursor.Offset = wc.Written cursor.Offset = in.BytesRead()
fmtChunk(currentChunk, false) fmtChunk(currentChunk, false)
err = o.fs.pacer.CallNoRetry(func() (bool, error) { err = o.fs.pacer.CallNoRetry(func() (bool, error) {
err = o.fs.srv.UploadSessionAppendV2(&appendArg, &io.LimitedReader{R: in, N: chunkSize}) err = o.fs.srv.UploadSessionAppendV2(&appendArg, &io.LimitedReader{R: in, N: chunkSize})
@ -871,7 +870,7 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
} }
// write the remains // write the remains
cursor.Offset = wc.Written cursor.Offset = in.BytesRead()
args := &files.UploadSessionFinishArg{ args := &files.UploadSessionFinishArg{
Cursor: &cursor, Cursor: &cursor,
Commit: commitInfo, Commit: commitInfo,
@ -929,17 +928,6 @@ func (o *Object) Remove() (err error) {
return err return err
} }
type writeCounter struct {
Written uint64
}
// Write implements the io.Writer interface.
func (wc *writeCounter) Write(p []byte) (int, error) {
n := len(p)
wc.Written += uint64(n)
return n, nil
}
// Check the interfaces are satisfied // Check the interfaces are satisfied
var ( var (
_ fs.Fs = (*Fs)(nil) _ fs.Fs = (*Fs)(nil)

28
fs/counting_reader.go Normal file
View File

@ -0,0 +1,28 @@
package fs
import "io"
// NewCountingReader returns a CountingReader, which will read from the given
// reader while keeping track of how many bytes were read.
func NewCountingReader(in io.Reader) *CountingReader {
return &CountingReader{in: in}
}
// CountingReader holds a reader and a read count of how many bytes were read
// so far.
type CountingReader struct {
in io.Reader
read uint64
}
// Read reads from the underlying reader.
func (cr *CountingReader) Read(b []byte) (int, error) {
n, err := cr.in.Read(b)
cr.read += uint64(n)
return n, err
}
// BytesRead returns how many bytes were read from the underlying reader so far.
func (cr *CountingReader) BytesRead() uint64 {
return cr.read
}

View File

@ -421,6 +421,11 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.
return o, o.Update(in, src, options...) return o, o.Update(in, src, options...)
} }
// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.Put(in, src, options...)
}
// Mkdir creates the container if it doesn't exist // Mkdir creates the container if it doesn't exist
func (f *Fs) Mkdir(dir string) error { func (f *Fs) Mkdir(dir string) error {
root := f.diskRoot root := f.diskRoot
@ -565,8 +570,8 @@ func (o *Object) remotePath() string {
// Copy the reader into the object updating modTime and size // Copy the reader into the object updating modTime and size
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { func (o *Object) Update(in0 io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
size := src.Size() in := fs.NewCountingReader(in0)
modTime := src.ModTime() modTime := src.ModTime()
remote := o.remotePath() remote := o.remotePath()
@ -581,7 +586,7 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
err := o.fs.yd.Upload(in, remote, overwrite, mimeType) err := o.fs.yd.Upload(in, remote, overwrite, mimeType)
if err == nil { if err == nil {
//if file uploaded sucessfully then return metadata //if file uploaded sucessfully then return metadata
o.bytes = uint64(size) o.bytes = in.BytesRead()
o.modTime = modTime o.modTime = modTime
o.md5sum = "" // according to unit tests after put the md5 is empty. o.md5sum = "" // according to unit tests after put the md5 is empty.
//and set modTime of uploaded file //and set modTime of uploaded file
@ -648,9 +653,10 @@ func (o *Object) MimeType() string {
// Check the interfaces are satisfied // Check the interfaces are satisfied
var ( var (
_ fs.Fs = (*Fs)(nil) _ fs.Fs = (*Fs)(nil)
_ fs.Purger = (*Fs)(nil) _ fs.Purger = (*Fs)(nil)
_ fs.ListRer = (*Fs)(nil) _ fs.PutStreamer = (*Fs)(nil)
_ fs.ListRer = (*Fs)(nil)
//_ fs.Copier = (*Fs)(nil) //_ fs.Copier = (*Fs)(nil)
_ fs.ListRer = (*Fs)(nil) _ fs.ListRer = (*Fs)(nil)
_ fs.Object = (*Object)(nil) _ fs.Object = (*Object)(nil)