pool: add --max-buffer-memory to limit total buffer memory usage
Some checks failed
Build & Push Docker Images / Build Docker Image for linux/386 (push) Has been cancelled
Build & Push Docker Images / Build Docker Image for linux/amd64 (push) Has been cancelled
Build & Push Docker Images / Build Docker Image for linux/arm/v6 (push) Has been cancelled
Build & Push Docker Images / Build Docker Image for linux/arm/v7 (push) Has been cancelled
Build & Push Docker Images / Build Docker Image for linux/arm64 (push) Has been cancelled
build / other_os (push) Has been cancelled
build / mac_amd64 (push) Has been cancelled
build / mac_arm64 (push) Has been cancelled
build / go1.22 (push) Has been cancelled
build / linux_386 (push) Has been cancelled
build / lint (push) Has been cancelled
build / android-all (push) Has been cancelled
build / windows (push) Has been cancelled
build / linux (push) Has been cancelled
build / go1.23 (push) Has been cancelled
Build & Push Docker Images / Merge & Push Final Docker Image (push) Has been cancelled

This commit is contained in:
Nick Craig-Wood 2025-02-15 18:04:09 +00:00
parent 32147b4bb5
commit 2caabde41a
5 changed files with 139 additions and 3 deletions

View File

@ -1526,6 +1526,24 @@ of the remote which may be desirable.
Setting this to a negative number will make the backlog as large as Setting this to a negative number will make the backlog as large as
possible. possible.
### --max-buffer-memory=SIZE {#max-buffer-memory}
If set, don't allocate more than SIZE amount of memory as buffers. If
not set or set to `0` or `off` this will not limit the amount of memory
in use.
This includes memory used by buffers created by the `--buffer` flag
and buffers used by multi-thread transfers.
Most multi-thread transfers do not take additional memory, but some do
depending on the backend (eg the s3 backend for uploads). This means
there is a tension between total setting `--transfers` as high as
possible and memory use.
Setting `--max-buffer-memory` allows the buffer memory to be
controlled so that it doesn't overwhelm the machine and allows
`--transfers` to be set large.
### --max-delete=N ### ### --max-delete=N ###
This tells rclone not to delete more than N files. If that limit is This tells rclone not to delete more than N files. If that limit is
@ -1785,6 +1803,14 @@ This will work with the `sync`/`copy`/`move` commands and friends
mount` and `rclone serve` if `--vfs-cache-mode` is set to `writes` or mount` and `rclone serve` if `--vfs-cache-mode` is set to `writes` or
above. above.
Most multi-thread transfers do not take additional memory, but some do
(for example uploading to s3). In the worst case memory usage can be
at maximum `--transfers` * `--multi-thread-chunk-size` *
`--multi-thread-streams` or specifically for the s3 backend
`--transfers` * `--s3-chunk-size` * `--s3-concurrency`. However you
can use the the [--max-buffer-memory](/docs/#max-buffer-memory) flag
to control the maximum memory used here.
**NB** that this **only** works with supported backends as the **NB** that this **only** works with supported backends as the
destination but will work with any backend as the source. destination but will work with any backend as the source.

View File

@ -239,6 +239,10 @@ memory. There is
[a workaround for this](https://github.com/rclone/rclone/wiki/Big-syncs-with-millions-of-files) [a workaround for this](https://github.com/rclone/rclone/wiki/Big-syncs-with-millions-of-files)
which involves a bit of scripting. which involves a bit of scripting.
From v1.70 rclone also has the [--max-buffer-memory](/docs/#max-buffer-memory)
flag which helps particularly when multi-thread transfers are using
too much memory.
### Rclone changes fullwidth Unicode punctuation marks in file names ### Rclone changes fullwidth Unicode punctuation marks in file names
For example: On a Windows system, you have a file with name `Test:1.jpg`, For example: On a Windows system, you have a file with name `Test:1.jpg`,

View File

@ -413,6 +413,11 @@ var ConfigOptionsInfo = Options{{
Default: false, Default: false,
Help: "Use mmap allocator (see docs)", Help: "Use mmap allocator (see docs)",
Groups: "Config", Groups: "Config",
}, {
Name: "max_buffer_memory",
Default: SizeSuffix(-1),
Help: "If set, don't allocate more than this amount of memory as buffers",
Groups: "Config",
}, { }, {
Name: "ca_cert", Name: "ca_cert",
Default: []string{}, Default: []string{},
@ -613,6 +618,7 @@ type ConfigInfo struct {
ProgressTerminalTitle bool `config:"progress_terminal_title"` ProgressTerminalTitle bool `config:"progress_terminal_title"`
Cookie bool `config:"use_cookies"` Cookie bool `config:"use_cookies"`
UseMmap bool `config:"use_mmap"` UseMmap bool `config:"use_mmap"`
MaxBufferMemory SizeSuffix `config:"max_buffer_memory"`
CaCert []string `config:"ca_cert"` // Client Side CA CaCert []string `config:"ca_cert"` // Client Side CA
ClientCert string `config:"client_cert"` // Client Side Cert ClientCert string `config:"client_cert"` // Client Side Cert
ClientKey string `config:"client_key"` // Client Side Key ClientKey string `config:"client_key"` // Client Side Key

View File

@ -3,12 +3,14 @@
package pool package pool
import ( import (
"context"
"fmt" "fmt"
"sync" "sync"
"time" "time"
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
"github.com/rclone/rclone/lib/mmap" "github.com/rclone/rclone/lib/mmap"
"golang.org/x/sync/semaphore"
) )
// Pool of internal buffers // Pool of internal buffers
@ -33,6 +35,14 @@ type Pool struct {
free func([]byte) error free func([]byte) error
} }
// totalMemory is a semaphore used to control total buffer usage of
// all Pools. It it may be nil in which case the total buffer usage
// will not be controlled.
var totalMemory *semaphore.Weighted
// Make sure we initialise the totalMemory semaphore once
var totalMemoryInit sync.Once
// New makes a buffer pool // New makes a buffer pool
// //
// flushTime is the interval the buffer pools is flushed // flushTime is the interval the buffer pools is flushed
@ -145,6 +155,33 @@ func (bp *Pool) updateMinFill() {
} }
} }
// acquire mem bytes of memory
func (bp *Pool) acquire(mem int64) error {
ctx := context.Background()
totalMemoryInit.Do(func() {
ci := fs.GetConfig(ctx)
// Set max buffer memory limiter
if ci.MaxBufferMemory > 0 {
totalMemory = semaphore.NewWeighted(int64(ci.MaxBufferMemory))
}
})
if totalMemory == nil {
return nil
}
return totalMemory.Acquire(ctx, mem)
}
// release mem bytes of memory
func (bp *Pool) release(mem int64) {
if totalMemory == nil {
return
}
totalMemory.Release(mem)
}
// Get a buffer from the pool or allocate one // Get a buffer from the pool or allocate one
func (bp *Pool) Get() []byte { func (bp *Pool) Get() []byte {
bp.mu.Lock() bp.mu.Lock()
@ -156,10 +193,16 @@ func (bp *Pool) Get() []byte {
break break
} else { } else {
var err error var err error
buf, err = bp.alloc(bp.bufferSize) bp.mu.Unlock()
err = bp.acquire(int64(bp.bufferSize))
bp.mu.Lock()
if err == nil { if err == nil {
bp.alloced++ buf, err = bp.alloc(bp.bufferSize)
break if err == nil {
bp.alloced++
break
}
bp.release(int64(bp.bufferSize))
} }
fs.Logf(nil, "Failed to get memory for buffer, waiting for %v: %v", waitTime, err) fs.Logf(nil, "Failed to get memory for buffer, waiting for %v: %v", waitTime, err)
bp.mu.Unlock() bp.mu.Unlock()
@ -179,6 +222,8 @@ func (bp *Pool) freeBuffer(mem []byte) {
err := bp.free(mem) err := bp.free(mem)
if err != nil { if err != nil {
fs.Logf(nil, "Failed to free memory: %v", err) fs.Logf(nil, "Failed to free memory: %v", err)
} else {
bp.release(int64(bp.bufferSize))
} }
bp.alloced-- bp.alloced--
} }

View File

@ -1,12 +1,15 @@
package pool package pool
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"math/rand" "math/rand"
"sync"
"testing" "testing"
"time" "time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fstest/testy" "github.com/rclone/rclone/fstest/testy"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -225,3 +228,55 @@ func TestPool(t *testing.T) {
}) })
} }
} }
func TestPoolMaxBufferMemory(t *testing.T) {
ctx := context.Background()
ci := fs.GetConfig(ctx)
ci.MaxBufferMemory = 4 * 4096
defer func() {
ci.MaxBufferMemory = 0
totalMemory = nil
}()
totalMemoryInit = sync.Once{} // reset the sync.Once as it likely has been used
totalMemory = nil
bp := New(60*time.Second, 4096, 2, true)
assert.Equal(t, bp.alloced, 0)
assert.Nil(t, totalMemory)
buf := bp.Get()
assert.NotNil(t, totalMemory)
bp.Put(buf)
assert.Equal(t, bp.alloced, 1)
var (
wg sync.WaitGroup
mu sync.Mutex
bufs int
maxBufs int
countBuf = func(i int) {
mu.Lock()
defer mu.Unlock()
bufs += i
if bufs > maxBufs {
maxBufs = bufs
}
}
)
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
buf := bp.Get()
countBuf(1)
time.Sleep(100 * time.Millisecond)
bp.Put(buf)
countBuf(-1)
}()
}
wg.Wait()
assert.Equal(t, bufs, 0)
assert.Equal(t, maxBufs, 4)
assert.Equal(t, bp.alloced, 2)
}