diff --git a/docs/content/docs.md b/docs/content/docs.md index 4b15b37b5..c4c1de48d 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -1526,6 +1526,24 @@ of the remote which may be desirable. Setting this to a negative number will make the backlog as large as possible. +### --max-buffer-memory=SIZE {#max-buffer-memory} + +If set, don't allocate more than SIZE amount of memory as buffers. If +not set or set to `0` or `off` this will not limit the amount of memory +in use. + +This includes memory used by buffers created by the `--buffer` flag +and buffers used by multi-thread transfers. + +Most multi-thread transfers do not take additional memory, but some do +depending on the backend (eg the s3 backend for uploads). This means +there is a tension between total setting `--transfers` as high as +possible and memory use. + +Setting `--max-buffer-memory` allows the buffer memory to be +controlled so that it doesn't overwhelm the machine and allows +`--transfers` to be set large. + ### --max-delete=N ### This tells rclone not to delete more than N files. If that limit is @@ -1785,6 +1803,14 @@ This will work with the `sync`/`copy`/`move` commands and friends mount` and `rclone serve` if `--vfs-cache-mode` is set to `writes` or above. +Most multi-thread transfers do not take additional memory, but some do +(for example uploading to s3). In the worst case memory usage can be +at maximum `--transfers` * `--multi-thread-chunk-size` * +`--multi-thread-streams` or specifically for the s3 backend +`--transfers` * `--s3-chunk-size` * `--s3-concurrency`. However you +can use the the [--max-buffer-memory](/docs/#max-buffer-memory) flag +to control the maximum memory used here. + **NB** that this **only** works with supported backends as the destination but will work with any backend as the source. diff --git a/docs/content/faq.md b/docs/content/faq.md index 050caf8ef..19eafe6dd 100644 --- a/docs/content/faq.md +++ b/docs/content/faq.md @@ -239,6 +239,10 @@ memory. There is [a workaround for this](https://github.com/rclone/rclone/wiki/Big-syncs-with-millions-of-files) which involves a bit of scripting. +From v1.70 rclone also has the [--max-buffer-memory](/docs/#max-buffer-memory) +flag which helps particularly when multi-thread transfers are using +too much memory. + ### Rclone changes fullwidth Unicode punctuation marks in file names For example: On a Windows system, you have a file with name `Test:1.jpg`, diff --git a/fs/config.go b/fs/config.go index e83d2cac1..611552ffa 100644 --- a/fs/config.go +++ b/fs/config.go @@ -413,6 +413,11 @@ var ConfigOptionsInfo = Options{{ Default: false, Help: "Use mmap allocator (see docs)", Groups: "Config", +}, { + Name: "max_buffer_memory", + Default: SizeSuffix(-1), + Help: "If set, don't allocate more than this amount of memory as buffers", + Groups: "Config", }, { Name: "ca_cert", Default: []string{}, @@ -613,6 +618,7 @@ type ConfigInfo struct { ProgressTerminalTitle bool `config:"progress_terminal_title"` Cookie bool `config:"use_cookies"` UseMmap bool `config:"use_mmap"` + MaxBufferMemory SizeSuffix `config:"max_buffer_memory"` CaCert []string `config:"ca_cert"` // Client Side CA ClientCert string `config:"client_cert"` // Client Side Cert ClientKey string `config:"client_key"` // Client Side Key diff --git a/lib/pool/pool.go b/lib/pool/pool.go index 4cb4d3a71..b1bd4ab57 100644 --- a/lib/pool/pool.go +++ b/lib/pool/pool.go @@ -3,12 +3,14 @@ package pool import ( + "context" "fmt" "sync" "time" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/lib/mmap" + "golang.org/x/sync/semaphore" ) // Pool of internal buffers @@ -33,6 +35,14 @@ type Pool struct { free func([]byte) error } +// totalMemory is a semaphore used to control total buffer usage of +// all Pools. It it may be nil in which case the total buffer usage +// will not be controlled. +var totalMemory *semaphore.Weighted + +// Make sure we initialise the totalMemory semaphore once +var totalMemoryInit sync.Once + // New makes a buffer pool // // flushTime is the interval the buffer pools is flushed @@ -145,6 +155,33 @@ func (bp *Pool) updateMinFill() { } } +// acquire mem bytes of memory +func (bp *Pool) acquire(mem int64) error { + ctx := context.Background() + + totalMemoryInit.Do(func() { + ci := fs.GetConfig(ctx) + + // Set max buffer memory limiter + if ci.MaxBufferMemory > 0 { + totalMemory = semaphore.NewWeighted(int64(ci.MaxBufferMemory)) + } + }) + + if totalMemory == nil { + return nil + } + return totalMemory.Acquire(ctx, mem) +} + +// release mem bytes of memory +func (bp *Pool) release(mem int64) { + if totalMemory == nil { + return + } + totalMemory.Release(mem) +} + // Get a buffer from the pool or allocate one func (bp *Pool) Get() []byte { bp.mu.Lock() @@ -156,10 +193,16 @@ func (bp *Pool) Get() []byte { break } else { var err error - buf, err = bp.alloc(bp.bufferSize) + bp.mu.Unlock() + err = bp.acquire(int64(bp.bufferSize)) + bp.mu.Lock() if err == nil { - bp.alloced++ - break + buf, err = bp.alloc(bp.bufferSize) + if err == nil { + bp.alloced++ + break + } + bp.release(int64(bp.bufferSize)) } fs.Logf(nil, "Failed to get memory for buffer, waiting for %v: %v", waitTime, err) bp.mu.Unlock() @@ -179,6 +222,8 @@ func (bp *Pool) freeBuffer(mem []byte) { err := bp.free(mem) if err != nil { fs.Logf(nil, "Failed to free memory: %v", err) + } else { + bp.release(int64(bp.bufferSize)) } bp.alloced-- } diff --git a/lib/pool/pool_test.go b/lib/pool/pool_test.go index 90b0e03fa..27eca3a87 100644 --- a/lib/pool/pool_test.go +++ b/lib/pool/pool_test.go @@ -1,12 +1,15 @@ package pool import ( + "context" "errors" "fmt" "math/rand" + "sync" "testing" "time" + "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fstest/testy" "github.com/stretchr/testify/assert" ) @@ -225,3 +228,55 @@ func TestPool(t *testing.T) { }) } } + +func TestPoolMaxBufferMemory(t *testing.T) { + ctx := context.Background() + ci := fs.GetConfig(ctx) + ci.MaxBufferMemory = 4 * 4096 + defer func() { + ci.MaxBufferMemory = 0 + totalMemory = nil + }() + totalMemoryInit = sync.Once{} // reset the sync.Once as it likely has been used + totalMemory = nil + bp := New(60*time.Second, 4096, 2, true) + + assert.Equal(t, bp.alloced, 0) + assert.Nil(t, totalMemory) + buf := bp.Get() + assert.NotNil(t, totalMemory) + bp.Put(buf) + assert.Equal(t, bp.alloced, 1) + + var ( + wg sync.WaitGroup + mu sync.Mutex + bufs int + maxBufs int + countBuf = func(i int) { + mu.Lock() + defer mu.Unlock() + bufs += i + if bufs > maxBufs { + maxBufs = bufs + } + } + ) + for i := 0; i < 20; i++ { + wg.Add(1) + go func() { + defer wg.Done() + buf := bp.Get() + countBuf(1) + time.Sleep(100 * time.Millisecond) + bp.Put(buf) + countBuf(-1) + }() + } + + wg.Wait() + + assert.Equal(t, bufs, 0) + assert.Equal(t, maxBufs, 4) + assert.Equal(t, bp.alloced, 2) +}