sftp: implement connection pooling for multiple ssh connections

A connection may be opened for each `--transfers` and `--checkers`
now.  Connections are checked when putting them in the pool and
getting them out the pool so it should recover from network errors
much better.

This fixes #1561, fixes #1541, fixes #1381, fixes #1158, fixes #1538
This commit is contained in:
Nick Craig-Wood 2017-08-07 17:19:37 +01:00
parent bfe812ea6b
commit 47eab397ba

View File

@ -11,6 +11,7 @@ import (
"path"
"regexp"
"strings"
"sync"
"time"
"github.com/ncw/rclone/fs"
@ -60,11 +61,14 @@ type Fs struct {
name string
root string
features *fs.Features // optional features
config *ssh.ClientConfig
host string
port string
url string
sshClient *ssh.Client
sftpClient *sftp.Client
mkdirLock *stringLock
cachedHashes *fs.HashSet
poolMu sync.Mutex
pool []*conn
}
// Object is a remote SFTP file that has been stat'd (so it exists, but is not necessarily open for reading)
@ -100,6 +104,114 @@ func Dial(network, addr string, config *ssh.ClientConfig) (*ssh.Client, error) {
return ssh.NewClient(c, chans, reqs), nil
}
// conn encapsulates an ssh client and corresponding sftp client
type conn struct {
sshClient *ssh.Client
sftpClient *sftp.Client
err chan error
}
// Wait for connection to close
func (c *conn) wait() {
c.err <- c.sshClient.Conn.Wait()
}
// Closes the connection
func (c *conn) close() error {
sftpErr := c.sftpClient.Close()
sshErr := c.sshClient.Close()
if sftpErr != nil {
return sftpErr
}
return sshErr
}
// Returns an error if closed
func (c *conn) closed() error {
select {
case err := <-c.err:
return err
default:
}
return nil
}
// Open a new connection to the SFTP server.
func (f *Fs) sftpConnection() (c *conn, err error) {
c = &conn{
err: make(chan error, 1),
}
c.sshClient, err = Dial("tcp", f.host+":"+f.port, f.config)
if err != nil {
return nil, errors.Wrap(err, "couldn't connect SSH")
}
c.sftpClient, err = sftp.NewClient(c.sshClient)
if err != nil {
_ = c.sshClient.Close()
return nil, errors.Wrap(err, "couldn't initialise SFTP")
}
go c.wait()
return c, nil
}
// Get an SFTP connection from the pool, or open a new one
func (f *Fs) getSftpConnection() (c *conn, err error) {
f.poolMu.Lock()
for len(f.pool) > 0 {
c = f.pool[0]
f.pool = f.pool[1:]
err := c.closed()
if err == nil {
break
}
fs.Errorf(f, "Discarding closed SSH connection: %v", err)
c = nil
}
f.poolMu.Unlock()
if c != nil {
return c, nil
}
return f.sftpConnection()
}
// Return an SFTP connection to the pool
//
// It nils the pointed to connection out so it can't be reused
//
// if err is not nil then it checks the connection is alive using a
// Getwd request
func (f *Fs) putSftpConnection(pc **conn, err error) {
c := *pc
*pc = nil
if err != nil {
// work out if this is an expected error
underlyingErr := errors.Cause(err)
isRegularError := false
switch underlyingErr {
case os.ErrNotExist:
isRegularError = true
default:
switch underlyingErr.(type) {
case *sftp.StatusError, *os.PathError:
isRegularError = true
}
}
// If not a regular SFTP error code then check the connection
if !isRegularError {
_, nopErr := c.sftpClient.Getwd()
if nopErr != nil {
fs.Debugf(f, "Connection failed, closing: %v", nopErr)
_ = c.close()
return
}
fs.Debugf(f, "Connection OK after error: %v", err)
}
}
f.poolMu.Lock()
f.pool = append(f.pool, c)
f.poolMu.Unlock()
}
// NewFs creates a new Fs object from the name and root. It connects to
// the host specified in the config file.
func NewFs(name, root string) (fs.Fs, error) {
@ -156,24 +268,22 @@ func NewFs(name, root string) (fs.Fs, error) {
config.Auth = append(config.Auth, ssh.Password(clearpass))
}
sshClient, err := Dial("tcp", host+":"+port, config)
if err != nil {
return nil, errors.Wrap(err, "couldn't connect ssh")
}
sftpClient, err := sftp.NewClient(sshClient)
if err != nil {
_ = sshClient.Close()
return nil, errors.Wrap(err, "couldn't initialise SFTP")
}
f := &Fs{
name: name,
root: root,
sshClient: sshClient,
sftpClient: sftpClient,
url: "sftp://" + user + "@" + host + ":" + port + "/" + root,
mkdirLock: newStringLock(),
name: name,
root: root,
config: config,
host: host,
port: port,
url: "sftp://" + user + "@" + host + ":" + port + "/" + root,
mkdirLock: newStringLock(),
}
f.features = (&fs.Features{}).Fill(f)
// Make a connection and pool it to return errors early
c, err := f.getSftpConnection()
if err != nil {
return nil, errors.Wrap(err, "NewFs")
}
f.putSftpConnection(&c, nil)
if root != "" {
// Check to see if the root actually an existing file
remote := path.Base(root)
@ -193,11 +303,6 @@ func NewFs(name, root string) (fs.Fs, error) {
// return an error with an fs which points to the parent
return f, fs.ErrorIsFile
}
go func() {
// FIXME re-open the connection here...
err := f.sshClient.Conn.Wait()
fs.Errorf(f, "SSH connection closed: %v", err)
}()
return f, nil
}
@ -245,7 +350,12 @@ func (f *Fs) dirExists(dir string) (bool, error) {
if dir == "" {
dir = "."
}
info, err := f.sftpClient.Stat(dir)
c, err := f.getSftpConnection()
if err != nil {
return false, errors.Wrap(err, "dirExists")
}
info, err := c.sftpClient.Stat(dir)
f.putSftpConnection(&c, err)
if err != nil {
if os.IsNotExist(err) {
return false, nil
@ -280,7 +390,12 @@ func (f *Fs) List(dir string) (entries fs.DirEntries, err error) {
if sftpDir == "" {
sftpDir = "."
}
infos, err := f.sftpClient.ReadDir(sftpDir)
c, err := f.getSftpConnection()
if err != nil {
return nil, errors.Wrap(err, "List")
}
infos, err := c.sftpClient.ReadDir(sftpDir)
f.putSftpConnection(&c, err)
if err != nil {
return nil, errors.Wrapf(err, "error listing %q", dir)
}
@ -350,7 +465,12 @@ func (f *Fs) mkdir(dirPath string) error {
if err != nil {
return err
}
err = f.sftpClient.Mkdir(dirPath)
c, err := f.getSftpConnection()
if err != nil {
return errors.Wrap(err, "mkdir")
}
err = c.sftpClient.Mkdir(dirPath)
f.putSftpConnection(&c, err)
if err != nil {
return errors.Wrapf(err, "mkdir %q failed", dirPath)
}
@ -366,7 +486,13 @@ func (f *Fs) Mkdir(dir string) error {
// Rmdir removes the root directory of the Fs object
func (f *Fs) Rmdir(dir string) error {
root := path.Join(f.root, dir)
return f.sftpClient.Remove(root)
c, err := f.getSftpConnection()
if err != nil {
return errors.Wrap(err, "Rmdir")
}
err = c.sftpClient.Remove(root)
f.putSftpConnection(&c, err)
return err
}
// Move renames a remote sftp file object
@ -380,10 +506,15 @@ func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) {
if err != nil {
return nil, errors.Wrap(err, "Move mkParentDir failed")
}
err = f.sftpClient.Rename(
c, err := f.getSftpConnection()
if err != nil {
return nil, errors.Wrap(err, "Move")
}
err = c.sftpClient.Rename(
srcObj.path(),
path.Join(f.root, remote),
)
f.putSftpConnection(&c, err)
if err != nil {
return nil, errors.Wrap(err, "Move Rename failed")
}
@ -427,10 +558,15 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error {
}
// Do the move
err = f.sftpClient.Rename(
c, err := f.getSftpConnection()
if err != nil {
return errors.Wrap(err, "DirMove")
}
err = c.sftpClient.Rename(
srcPath,
dstPath,
)
f.putSftpConnection(&c, err)
if err != nil {
return errors.Wrapf(err, "DirMove Rename(%q,%q) failed", srcPath, dstPath)
}
@ -443,7 +579,13 @@ func (f *Fs) Hashes() fs.HashSet {
return *f.cachedHashes
}
session, err := f.sshClient.NewSession()
c, err := f.getSftpConnection()
if err != nil {
fs.Errorf(f, "Couldn't get SSH connection to figure out Hashes: %v", err)
return fs.HashSet(fs.HashNone)
}
defer f.putSftpConnection(&c, err)
session, err := c.sshClient.NewSession()
if err != nil {
return fs.HashSet(fs.HashNone)
}
@ -451,7 +593,7 @@ func (f *Fs) Hashes() fs.HashSet {
expectedSha1 := "03cfd743661f07975fa2f1220c5194cbaff48451"
_ = session.Close()
session, err = f.sshClient.NewSession()
session, err = c.sshClient.NewSession()
if err != nil {
return fs.HashSet(fs.HashNone)
}
@ -505,7 +647,12 @@ func (o *Object) Hash(r fs.HashType) (string, error) {
return *o.sha1sum, nil
}
session, err := o.fs.sshClient.NewSession()
c, err := o.fs.getSftpConnection()
if err != nil {
return "", errors.Wrap(err, "Hash")
}
session, err := c.sshClient.NewSession()
o.fs.putSftpConnection(&c, err)
if err != nil {
o.fs.cachedHashes = nil // Something has changed on the remote system
return "", fs.ErrHashUnsupported
@ -576,7 +723,12 @@ func (o *Object) setMetadata(info os.FileInfo) {
// stat updates the info in the Object
func (o *Object) stat() error {
info, err := o.fs.sftpClient.Stat(o.path())
c, err := o.fs.getSftpConnection()
if err != nil {
return errors.Wrap(err, "stat")
}
info, err := c.sftpClient.Stat(o.path())
o.fs.putSftpConnection(&c, err)
if err != nil {
if os.IsNotExist(err) {
return fs.ErrorObjectNotFound
@ -594,7 +746,12 @@ func (o *Object) stat() error {
//
// it also updates the info field
func (o *Object) SetModTime(modTime time.Time) error {
err := o.fs.sftpClient.Chtimes(o.path(), modTime, modTime)
c, err := o.fs.getSftpConnection()
if err != nil {
return errors.Wrap(err, "SetModTime")
}
err = c.sftpClient.Chtimes(o.path(), modTime, modTime)
o.fs.putSftpConnection(&c, err)
if err != nil {
return errors.Wrap(err, "SetModTime failed")
}
@ -636,7 +793,12 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
}
}
}
sftpFile, err := o.fs.sftpClient.Open(o.path())
c, err := o.fs.getSftpConnection()
if err != nil {
return nil, errors.Wrap(err, "Open")
}
sftpFile, err := c.sftpClient.Open(o.path())
o.fs.putSftpConnection(&c, err)
if err != nil {
return nil, errors.Wrap(err, "Open failed")
}
@ -655,13 +817,24 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
// Update a remote sftp file using the data <in> and ModTime from <src>
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
file, err := o.fs.sftpClient.Create(o.path())
c, err := o.fs.getSftpConnection()
if err != nil {
return errors.Wrap(err, "Update")
}
file, err := c.sftpClient.Create(o.path())
o.fs.putSftpConnection(&c, err)
if err != nil {
return errors.Wrap(err, "Update Create failed")
}
// remove the file if upload failed
remove := func() {
removeErr := o.fs.sftpClient.Remove(o.path())
c, removeErr := o.fs.getSftpConnection()
if removeErr != nil {
fs.Debugf(src, "Failed to open new SSH connection for delete: %v", removeErr)
return
}
removeErr = c.sftpClient.Remove(o.path())
o.fs.putSftpConnection(&c, removeErr)
if removeErr != nil {
fs.Debugf(src, "Failed to remove: %v", removeErr)
} else {
@ -687,7 +860,13 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
// Remove a remote sftp file object
func (o *Object) Remove() error {
return o.fs.sftpClient.Remove(o.path())
c, err := o.fs.getSftpConnection()
if err != nil {
return errors.Wrap(err, "Remove")
}
err = c.sftpClient.Remove(o.path())
o.fs.putSftpConnection(&c, err)
return err
}
// Check the interfaces are satisfied