diff --git a/backend/sftp/sftp.go b/backend/sftp/sftp.go index 1fe1553f7..176e24dd9 100644 --- a/backend/sftp/sftp.go +++ b/backend/sftp/sftp.go @@ -125,12 +125,6 @@ type Object struct { sha1sum *string // Cached SHA1 checksum } -// ObjectReader holds the sftp.File interface to a remote SFTP file opened for reading -type ObjectReader struct { - object *Object - sftpFile *sftp.File -} - // readCurrentUser finds the current user name or "" if not found func readCurrentUser() (userName string) { usr, err := user.Current() @@ -877,15 +871,49 @@ func (o *Object) Storable() bool { return o.mode.IsRegular() } +// objectReader represents a file open for reading on the SFTP server +type objectReader struct { + sftpFile *sftp.File + pipeReader *io.PipeReader + done chan struct{} +} + +func newObjectReader(sftpFile *sftp.File) *objectReader { + pipeReader, pipeWriter := io.Pipe() + file := &objectReader{ + sftpFile: sftpFile, + pipeReader: pipeReader, + done: make(chan struct{}), + } + + go func() { + // Use sftpFile.WriteTo to pump data so that it gets a + // chance to build the window up. + _, err := sftpFile.WriteTo(pipeWriter) + // Close the pipeWriter so the pipeReader fails with + // the same error or EOF if err == nil + _ = pipeWriter.CloseWithError(err) + // signal that we've finished + close(file.done) + }() + + return file +} + // Read from a remote sftp file object reader -func (file *ObjectReader) Read(p []byte) (n int, err error) { - n, err = file.sftpFile.Read(p) +func (file *objectReader) Read(p []byte) (n int, err error) { + n, err = file.pipeReader.Read(p) return n, err } // Close a reader of a remote sftp file -func (file *ObjectReader) Close() (err error) { +func (file *objectReader) Close() (err error) { + // Close the sftpFile - this will likely cause the WriteTo to error err = file.sftpFile.Close() + // Close the pipeReader so writes to the pipeWriter fail + _ = file.pipeReader.Close() + // Wait for the background process to finish + <-file.done return err } @@ -919,10 +947,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { return nil, errors.Wrap(err, "Open Seek failed") } } - in = readers.NewLimitedReadCloser(&ObjectReader{ - object: o, - sftpFile: sftpFile, - }, limit) + in = readers.NewLimitedReadCloser(newObjectReader(sftpFile), limit) return in, nil }