rcat: fix goroutine leak

This was leaking goroutines in the short file case beause it wasn't
calling Close() on the Account object.  This became apparent when
testing with mount.
This commit is contained in:
Nick Craig-Wood 2017-11-11 18:43:00 +00:00
parent de98e2480d
commit 46947b3b9b

View File

@ -1518,12 +1518,7 @@ func CleanUp(f Fs) error {
// wrap a Reader and a Closer together into a ReadCloser // wrap a Reader and a Closer together into a ReadCloser
type readCloser struct { type readCloser struct {
io.Reader io.Reader
Closer io.Closer io.Closer
}
// Close the Closer
func (r *readCloser) Close() error {
return r.Closer.Close()
} }
// Cat any files to the io.Writer // Cat any files to the io.Writer
@ -1586,11 +1581,12 @@ func Cat(f Fs, w io.Writer, offset, count int64) error {
} }
// Rcat reads data from the Reader until EOF and uploads it to a file on remote // Rcat reads data from the Reader until EOF and uploads it to a file on remote
func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (dst Object, err error) { func Rcat(fdst Fs, dstFileName string, in io.ReadCloser, modTime time.Time) (dst Object, err error) {
Stats.Transferring(dstFileName) Stats.Transferring(dstFileName)
in = NewAccountSizeName(in, -1, dstFileName).WithBuffer()
defer func() { defer func() {
Stats.DoneTransferring(dstFileName, err == nil) Stats.DoneTransferring(dstFileName, err == nil)
if otherErr := in0.Close(); otherErr != nil { if otherErr := in.Close(); otherErr != nil {
Debugf(fdst, "Rcat: failed to close source: %v", err) Debugf(fdst, "Rcat: failed to close source: %v", err)
} }
}() }()
@ -1600,7 +1596,7 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (ds
if err != nil { if err != nil {
return nil, err return nil, err
} }
readCounter := NewCountingReader(in0) readCounter := NewCountingReader(in)
trackingIn := io.TeeReader(readCounter, hash) trackingIn := io.TeeReader(readCounter, hash)
compare := func(dst Object) error { compare := func(dst Object) error {
@ -1619,7 +1615,6 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (ds
if n, err := io.ReadFull(trackingIn, buf); err == io.EOF || err == io.ErrUnexpectedEOF { if n, err := io.ReadFull(trackingIn, buf); err == io.EOF || err == io.ErrUnexpectedEOF {
Debugf(fdst, "File to upload is small (%d bytes), uploading instead of streaming", n) Debugf(fdst, "File to upload is small (%d bytes), uploading instead of streaming", n)
in := ioutil.NopCloser(bytes.NewReader(buf[:n])) in := ioutil.NopCloser(bytes.NewReader(buf[:n]))
in = NewAccountSizeName(in, int64(n), dstFileName).WithBuffer()
objInfo := NewStaticObjectInfo(dstFileName, modTime, int64(n), false, nil, nil) objInfo := NewStaticObjectInfo(dstFileName, modTime, int64(n), false, nil, nil)
if Config.DryRun { if Config.DryRun {
Logf("stdin", "Not uploading as --dry-run") Logf("stdin", "Not uploading as --dry-run")
@ -1631,7 +1626,12 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (ds
} }
return dst, compare(dst) return dst, compare(dst)
} }
in := ioutil.NopCloser(io.MultiReader(bytes.NewReader(buf), trackingIn))
// Make a new ReadCloser with the bits we've already read
in = &readCloser{
Reader: io.MultiReader(bytes.NewReader(buf), trackingIn),
Closer: in,
}
fStreamTo := fdst fStreamTo := fdst
canStream := fdst.Features().PutStream != nil canStream := fdst.Features().PutStream != nil
@ -1650,8 +1650,6 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (ds
fStreamTo = tmpLocalFs fStreamTo = tmpLocalFs
} }
in = NewAccountSizeName(in, -1, dstFileName).WithBuffer()
if Config.DryRun { if Config.DryRun {
Logf("stdin", "Not uploading as --dry-run") Logf("stdin", "Not uploading as --dry-run")
// prevents "broken pipe" errors // prevents "broken pipe" errors