diff --git a/backend/sftp/sftp.go b/backend/sftp/sftp.go index 474163866..7a7c37e8f 100644 --- a/backend/sftp/sftp.go +++ b/backend/sftp/sftp.go @@ -332,6 +332,25 @@ cost of using more memory. `, Default: 64, Advanced: true, + }, { + Name: "connections", + Help: strings.Replace(`Maximum number of SFTP simultaneous connections, 0 for unlimited. + +Note that setting this is very likely to cause deadlocks so it should +be used with care. + +If you are doing a sync or copy then make sure concurrency is one more +than the sum of |--transfers| and |--checkers|. + +If you use |--check-first| then it just needs to be one more than the +maximum of |--checkers| and |--transfers|. + +So for |concurrency 3| you'd use |--checkers 2 --transfers 2 +--check-first| or |--checkers 1 --transfers 1|. + +`, "|", "`", -1), + Default: 0, + Advanced: true, }, { Name: "set_env", Default: fs.SpaceSepList{}, @@ -502,6 +521,7 @@ type Options struct { IdleTimeout fs.Duration `config:"idle_timeout"` ChunkSize fs.SizeSuffix `config:"chunk_size"` Concurrency int `config:"concurrency"` + Connections int `config:"connections"` SetEnv fs.SpaceSepList `config:"set_env"` Ciphers fs.SpaceSepList `config:"ciphers"` KeyExchange fs.SpaceSepList `config:"key_exchange"` @@ -533,6 +553,7 @@ type Fs struct { pacer *fs.Pacer // pacer for operations savedpswd string sessions atomic.Int32 // count in use sessions + tokens *pacer.TokenDispenser } // Object is a remote SFTP file that has been stat'd (so it exists, but is not necessarily open for reading) @@ -695,6 +716,9 @@ func (f *Fs) newSftpClient(client sshClient, opts ...sftp.ClientOption) (*sftp.C // Get an SFTP connection from the pool, or open a new one func (f *Fs) getSftpConnection(ctx context.Context) (c *conn, err error) { accounting.LimitTPS(ctx) + if f.opt.Connections > 0 { + f.tokens.Get() + } f.poolMu.Lock() for len(f.pool) > 0 { c = f.pool[0] @@ -717,6 +741,9 @@ func (f *Fs) getSftpConnection(ctx context.Context) (c *conn, err error) { } return false, nil }) + if f.opt.Connections > 0 && c == nil { + f.tokens.Put() + } return c, err } @@ -727,6 +754,9 @@ func (f *Fs) getSftpConnection(ctx context.Context) (c *conn, err error) { // if err is not nil then it checks the connection is alive using a // Getwd request func (f *Fs) putSftpConnection(pc **conn, err error) { + if f.opt.Connections > 0 { + defer f.tokens.Put() + } c := *pc if !c.sshClient.CanReuse() { return @@ -812,6 +842,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e if len(opt.SSH) != 0 && ((opt.User != currentUser && opt.User != "") || opt.Host != "" || (opt.Port != "22" && opt.Port != "")) { fs.Logf(name, "--sftp-ssh is in use - ignoring user/host/port from config - set in the parameters to --sftp-ssh (remove them from the config to silence this warning)") } + f.tokens = pacer.NewTokenDispenser(opt.Connections) if opt.User == "" { opt.User = currentUser