mirror of
https://github.com/rclone/rclone.git
synced 2025-02-01 13:32:01 +08:00
ftp: fix timeout after long uploads #5596
This commit is contained in:
parent
a32fde09ca
commit
29c6c86c00
|
@ -128,6 +128,11 @@ Enabled by default. Use 0 to disable.`,
|
||||||
Help: "Disable TLS 1.3 (workaround for FTP servers with buggy TLS)",
|
Help: "Disable TLS 1.3 (workaround for FTP servers with buggy TLS)",
|
||||||
Default: false,
|
Default: false,
|
||||||
Advanced: true,
|
Advanced: true,
|
||||||
|
}, {
|
||||||
|
Name: "shut_timeout",
|
||||||
|
Help: "Maximum time to wait for data connection closing status.",
|
||||||
|
Default: fs.Duration(60 * time.Second),
|
||||||
|
Advanced: true,
|
||||||
}, {
|
}, {
|
||||||
Name: config.ConfigEncoding,
|
Name: config.ConfigEncoding,
|
||||||
Help: config.ConfigEncodingHelp,
|
Help: config.ConfigEncodingHelp,
|
||||||
|
@ -166,6 +171,7 @@ type Options struct {
|
||||||
DisableMLSD bool `config:"disable_mlsd"`
|
DisableMLSD bool `config:"disable_mlsd"`
|
||||||
IdleTimeout fs.Duration `config:"idle_timeout"`
|
IdleTimeout fs.Duration `config:"idle_timeout"`
|
||||||
CloseTimeout fs.Duration `config:"close_timeout"`
|
CloseTimeout fs.Duration `config:"close_timeout"`
|
||||||
|
ShutTimeout fs.Duration `config:"shut_timeout"`
|
||||||
Enc encoder.MultiEncoder `config:"encoding"`
|
Enc encoder.MultiEncoder `config:"encoding"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -311,6 +317,9 @@ func (f *Fs) ftpConnection(ctx context.Context) (c *ftp.ServerConn, err error) {
|
||||||
if f.opt.DisableMLSD {
|
if f.opt.DisableMLSD {
|
||||||
ftpConfig = append(ftpConfig, ftp.DialWithDisabledMLSD(true))
|
ftpConfig = append(ftpConfig, ftp.DialWithDisabledMLSD(true))
|
||||||
}
|
}
|
||||||
|
if f.opt.ShutTimeout != 0 && f.opt.ShutTimeout != fs.DurationOff {
|
||||||
|
ftpConfig = append(ftpConfig, ftp.DialWithShutTimeout(time.Duration(f.opt.ShutTimeout)))
|
||||||
|
}
|
||||||
if f.ci.Dump&(fs.DumpHeaders|fs.DumpBodies|fs.DumpRequests|fs.DumpResponses) != 0 {
|
if f.ci.Dump&(fs.DumpHeaders|fs.DumpBodies|fs.DumpRequests|fs.DumpResponses) != 0 {
|
||||||
ftpConfig = append(ftpConfig, ftp.DialWithDebugOutput(&debugLog{auth: f.ci.Dump&fs.DumpAuth != 0}))
|
ftpConfig = append(ftpConfig, ftp.DialWithDebugOutput(&debugLog{auth: f.ci.Dump&fs.DumpAuth != 0}))
|
||||||
}
|
}
|
||||||
|
@ -990,7 +999,11 @@ func (f *ftpReadCloser) Close() error {
|
||||||
errchan <- f.rc.Close()
|
errchan <- f.rc.Close()
|
||||||
}()
|
}()
|
||||||
// Wait for Close for up to 60 seconds by default
|
// Wait for Close for up to 60 seconds by default
|
||||||
timer := time.NewTimer(time.Duration(f.f.opt.CloseTimeout))
|
closeTimeout := f.f.opt.CloseTimeout
|
||||||
|
if closeTimeout == 0 {
|
||||||
|
closeTimeout = fs.DurationOff
|
||||||
|
}
|
||||||
|
timer := time.NewTimer(time.Duration(closeTimeout))
|
||||||
select {
|
select {
|
||||||
case err = <-errchan:
|
case err = <-errchan:
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
|
|
98
backend/ftp/ftp_internal_test.go
Normal file
98
backend/ftp/ftp_internal_test.go
Normal file
|
@ -0,0 +1,98 @@
|
||||||
|
package ftp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/rclone/rclone/fs"
|
||||||
|
"github.com/rclone/rclone/fs/config/configmap"
|
||||||
|
"github.com/rclone/rclone/fs/object"
|
||||||
|
"github.com/rclone/rclone/fstest"
|
||||||
|
"github.com/rclone/rclone/fstest/fstests"
|
||||||
|
"github.com/rclone/rclone/lib/readers"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type settings map[string]interface{}
|
||||||
|
|
||||||
|
func deriveFs(ctx context.Context, t *testing.T, f fs.Fs, opts settings) fs.Fs {
|
||||||
|
fsName := strings.Split(f.Name(), "{")[0] // strip off hash
|
||||||
|
configMap := configmap.Simple{}
|
||||||
|
for key, val := range opts {
|
||||||
|
configMap[key] = fmt.Sprintf("%v", val)
|
||||||
|
}
|
||||||
|
remote := fmt.Sprintf("%s,%s:%s", fsName, configMap.String(), f.Root())
|
||||||
|
fixFs, err := fs.NewFs(ctx, remote)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return fixFs
|
||||||
|
}
|
||||||
|
|
||||||
|
// test that big file uploads do not cause network i/o timeout
|
||||||
|
func (f *Fs) testUploadTimeout(t *testing.T) {
|
||||||
|
const (
|
||||||
|
fileSize = 100000000 // 100 MiB
|
||||||
|
idleTimeout = 40 * time.Millisecond // small because test server is local
|
||||||
|
maxTime = 5 * time.Second // prevent test hangup
|
||||||
|
)
|
||||||
|
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("not running with -short")
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
ci := fs.GetConfig(ctx)
|
||||||
|
saveLowLevelRetries := ci.LowLevelRetries
|
||||||
|
saveTimeout := ci.Timeout
|
||||||
|
defer func() {
|
||||||
|
ci.LowLevelRetries = saveLowLevelRetries
|
||||||
|
ci.Timeout = saveTimeout
|
||||||
|
}()
|
||||||
|
ci.LowLevelRetries = 1
|
||||||
|
ci.Timeout = idleTimeout
|
||||||
|
|
||||||
|
upload := func(concurrency int, shutTimeout time.Duration) (obj fs.Object, err error) {
|
||||||
|
fixFs := deriveFs(ctx, t, f, settings{
|
||||||
|
"concurrency": concurrency,
|
||||||
|
"shut_timeout": shutTimeout,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Make test object
|
||||||
|
fileTime := fstest.Time("2020-03-08T09:30:00.000000000Z")
|
||||||
|
meta := object.NewStaticObjectInfo("upload-timeout.test", fileTime, int64(fileSize), true, nil, nil)
|
||||||
|
data := readers.NewPatternReader(int64(fileSize))
|
||||||
|
|
||||||
|
// Run upload and ensure maximum time
|
||||||
|
done := make(chan bool)
|
||||||
|
deadline := time.After(maxTime)
|
||||||
|
go func() {
|
||||||
|
obj, err = fixFs.Put(ctx, data, meta)
|
||||||
|
done <- true
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-deadline:
|
||||||
|
t.Fatalf("Upload got stuck for %v !", maxTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
return obj, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// non-zero shut_timeout should fix i/o errors
|
||||||
|
obj, err := upload(f.opt.Concurrency, time.Second)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NotNil(t, obj)
|
||||||
|
if obj != nil {
|
||||||
|
_ = obj.Remove(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// InternalTest dispatches all internal tests
|
||||||
|
func (f *Fs) InternalTest(t *testing.T) {
|
||||||
|
t.Run("UploadTimeout", f.testUploadTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ fstests.InternalTester = (*Fs)(nil)
|
Loading…
Reference in New Issue
Block a user