dropbox: factor batcher into lib/batcher

This commit is contained in:
Nick Craig-Wood 2023-09-12 16:10:45 +01:00
parent 55d10f4d25
commit b94806a143
5 changed files with 668 additions and 312 deletions

View File

@ -8,121 +8,19 @@ package dropbox
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"sync"
"time"
"github.com/dropbox/dropbox-sdk-go-unofficial/v6/dropbox/files" "github.com/dropbox/dropbox-sdk-go-unofficial/v6/dropbox/files"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/lib/atexit"
) )
const (
maxBatchSize = 1000 // max size the batch can be
defaultTimeoutSync = 500 * time.Millisecond // kick off the batch if nothing added for this long (sync)
defaultTimeoutAsync = 10 * time.Second // kick off the batch if nothing added for this long (ssync)
defaultBatchSizeAsync = 100 // default batch size if async
)
// batcher holds info about the current items waiting for upload
type batcher struct {
f *Fs // Fs this batch is part of
mode string // configured batch mode
size int // maximum size for batch
timeout time.Duration // idle timeout for batch
async bool // whether we are using async batching
in chan batcherRequest // incoming items to batch
closed chan struct{} // close to indicate batcher shut down
atexit atexit.FnHandle // atexit handle
shutOnce sync.Once // make sure we shutdown once only
wg sync.WaitGroup // wait for shutdown
}
// batcherRequest holds an incoming request with a place for a reply
type batcherRequest struct {
commitInfo *files.UploadSessionFinishArg
result chan<- batcherResponse
}
// Return true if batcherRequest is the quit request
func (br *batcherRequest) isQuit() bool {
return br.commitInfo == nil
}
// Send this to get the engine to quit
var quitRequest = batcherRequest{}
// batcherResponse holds a response to be delivered to clients waiting
// for a batch to complete.
type batcherResponse struct {
err error
entry *files.FileMetadata
}
// newBatcher creates a new batcher structure
func newBatcher(ctx context.Context, f *Fs, mode string, size int, timeout time.Duration) (*batcher, error) {
// fs.Debugf(f, "Creating batcher with mode %q, size %d, timeout %v", mode, size, timeout)
if size > maxBatchSize || size < 0 {
return nil, fmt.Errorf("dropbox: batch size must be < %d and >= 0 - it is currently %d", maxBatchSize, size)
}
async := false
switch mode {
case "sync":
if size <= 0 {
ci := fs.GetConfig(ctx)
size = ci.Transfers
}
if timeout <= 0 {
timeout = defaultTimeoutSync
}
case "async":
if size <= 0 {
size = defaultBatchSizeAsync
}
if timeout <= 0 {
timeout = defaultTimeoutAsync
}
async = true
case "off":
size = 0
default:
return nil, fmt.Errorf("dropbox: batch mode must be sync|async|off not %q", mode)
}
b := &batcher{
f: f,
mode: mode,
size: size,
timeout: timeout,
async: async,
in: make(chan batcherRequest, size),
closed: make(chan struct{}),
}
if b.Batching() {
b.atexit = atexit.Register(b.Shutdown)
b.wg.Add(1)
go b.commitLoop(context.Background())
}
return b, nil
}
// Batching returns true if batching is active
func (b *batcher) Batching() bool {
return b.size > 0
}
// finishBatch commits the batch, returning a batch status to poll or maybe complete // finishBatch commits the batch, returning a batch status to poll or maybe complete
func (b *batcher) finishBatch(ctx context.Context, items []*files.UploadSessionFinishArg) (complete *files.UploadSessionFinishBatchResult, err error) { func (f *Fs) finishBatch(ctx context.Context, items []*files.UploadSessionFinishArg) (complete *files.UploadSessionFinishBatchResult, err error) {
var arg = &files.UploadSessionFinishBatchArg{ var arg = &files.UploadSessionFinishBatchArg{
Entries: items, Entries: items,
} }
err = b.f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
complete, err = b.f.srv.UploadSessionFinishBatchV2(arg) complete, err = f.srv.UploadSessionFinishBatchV2(arg)
// If error is insufficient space then don't retry // If error is insufficient space then don't retry
if e, ok := err.(files.UploadSessionFinishAPIError); ok { if e, ok := err.(files.UploadSessionFinishAPIError); ok {
if e.EndpointError != nil && e.EndpointError.Path != nil && e.EndpointError.Path.Tag == files.WriteErrorInsufficientSpace { if e.EndpointError != nil && e.EndpointError.Path != nil && e.EndpointError.Path.Tag == files.WriteErrorInsufficientSpace {
@ -139,23 +37,10 @@ func (b *batcher) finishBatch(ctx context.Context, items []*files.UploadSessionF
return complete, nil return complete, nil
} }
// commit a batch // Called by the batcher to commit a batch
func (b *batcher) commitBatch(ctx context.Context, items []*files.UploadSessionFinishArg, results []chan<- batcherResponse) (err error) { func (f *Fs) commitBatch(ctx context.Context, items []*files.UploadSessionFinishArg, results []*files.FileMetadata, errors []error) (err error) {
// If commit fails then signal clients if sync
var signalled = b.async
defer func() {
if err != nil && !signalled {
// Signal to clients that there was an error
for _, result := range results {
result <- batcherResponse{err: err}
}
}
}()
desc := fmt.Sprintf("%s batch length %d starting with: %s", b.mode, len(items), items[0].Commit.Path)
fs.Debugf(b.f, "Committing %s", desc)
// finalise the batch getting either a result or a job id to poll // finalise the batch getting either a result or a job id to poll
complete, err := b.finishBatch(ctx, items) complete, err := f.finishBatch(ctx, items)
if err != nil { if err != nil {
return err return err
} }
@ -166,19 +51,13 @@ func (b *batcher) commitBatch(ctx context.Context, items []*files.UploadSessionF
return fmt.Errorf("expecting %d items in batch but got %d", len(results), len(entries)) return fmt.Errorf("expecting %d items in batch but got %d", len(results), len(entries))
} }
// Report results to clients // Format results for return
var (
errorTag = ""
errorCount = 0
)
for i := range results { for i := range results {
item := entries[i] item := entries[i]
resp := batcherResponse{}
if item.Tag == "success" { if item.Tag == "success" {
resp.entry = item.Success results[i] = item.Success
} else { } else {
errorCount++ errorTag := item.Tag
errorTag = item.Tag
if item.Failure != nil { if item.Failure != nil {
errorTag = item.Failure.Tag errorTag = item.Failure.Tag
if item.Failure.LookupFailed != nil { if item.Failure.LookupFailed != nil {
@ -191,112 +70,9 @@ func (b *batcher) commitBatch(ctx context.Context, items []*files.UploadSessionF
errorTag += "/" + item.Failure.PropertiesError.Tag errorTag += "/" + item.Failure.PropertiesError.Tag
} }
} }
resp.err = fmt.Errorf("batch upload failed: %s", errorTag) errors[i] = fmt.Errorf("upload failed: %s", errorTag)
}
if !b.async {
results[i] <- resp
} }
} }
// Show signalled so no need to report error to clients from now on
signalled = true
// Report an error if any failed in the batch
if errorTag != "" {
return fmt.Errorf("batch had %d errors: last error: %s", errorCount, errorTag)
}
fs.Debugf(b.f, "Committed %s", desc)
return nil return nil
} }
// commitLoop runs the commit engine in the background
func (b *batcher) commitLoop(ctx context.Context) {
var (
items []*files.UploadSessionFinishArg // current batch of uncommitted files
results []chan<- batcherResponse // current batch of clients awaiting results
idleTimer = time.NewTimer(b.timeout)
commit = func() {
err := b.commitBatch(ctx, items, results)
if err != nil {
fs.Errorf(b.f, "%s batch commit: failed to commit batch length %d: %v", b.mode, len(items), err)
}
items, results = nil, nil
}
)
defer b.wg.Done()
defer idleTimer.Stop()
idleTimer.Stop()
outer:
for {
select {
case req := <-b.in:
if req.isQuit() {
break outer
}
items = append(items, req.commitInfo)
results = append(results, req.result)
idleTimer.Stop()
if len(items) >= b.size {
commit()
} else {
idleTimer.Reset(b.timeout)
}
case <-idleTimer.C:
if len(items) > 0 {
fs.Debugf(b.f, "Batch idle for %v so committing", b.timeout)
commit()
}
}
}
// commit any remaining items
if len(items) > 0 {
commit()
}
}
// Shutdown finishes any pending batches then shuts everything down
//
// Can be called from atexit handler
func (b *batcher) Shutdown() {
if !b.Batching() {
return
}
b.shutOnce.Do(func() {
atexit.Unregister(b.atexit)
fs.Infof(b.f, "Committing uploads - please wait...")
// show that batcher is shutting down
close(b.closed)
// quit the commitLoop by sending a quitRequest message
//
// Note that we don't close b.in because that will
// cause write to closed channel in Commit when we are
// exiting due to a signal.
b.in <- quitRequest
b.wg.Wait()
})
}
// Commit commits the file using a batch call, first adding it to the
// batch and then waiting for the batch to complete in a synchronous
// way if async is not set.
func (b *batcher) Commit(ctx context.Context, commitInfo *files.UploadSessionFinishArg) (entry *files.FileMetadata, err error) {
select {
case <-b.closed:
return nil, fserrors.FatalError(errors.New("batcher is shutting down"))
default:
}
fs.Debugf(b.f, "Adding %q to batch", commitInfo.Commit.Path)
resp := make(chan batcherResponse, 1)
b.in <- batcherRequest{
commitInfo: commitInfo,
result: resp,
}
// If running async then don't wait for the result
if b.async {
return nil, nil
}
result := <-resp
return result.entry, result.err
}

View File

@ -47,6 +47,7 @@ import (
"github.com/rclone/rclone/fs/config/obscure" "github.com/rclone/rclone/fs/config/obscure"
"github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/batcher"
"github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/oauthutil" "github.com/rclone/rclone/lib/oauthutil"
"github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/pacer"
@ -121,6 +122,14 @@ var (
// Errors // Errors
errNotSupportedInSharedMode = fserrors.NoRetryError(errors.New("not supported in shared files mode")) errNotSupportedInSharedMode = fserrors.NoRetryError(errors.New("not supported in shared files mode"))
// Configure the batcher
defaultBatcherOptions = batcher.Options{
MaxBatchSize: 1000,
DefaultTimeoutSync: 500 * time.Millisecond,
DefaultTimeoutAsync: 10 * time.Second,
DefaultBatchSizeAsync: 100,
}
) )
// Gets an oauth config with the right scopes // Gets an oauth config with the right scopes
@ -152,7 +161,7 @@ func init() {
}, },
}) })
}, },
Options: append(oauthutil.SharedOptions, []fs.Option{{ Options: append(append(oauthutil.SharedOptions, []fs.Option{{
Name: "chunk_size", Name: "chunk_size",
Help: fmt.Sprintf(`Upload chunk size (< %v). Help: fmt.Sprintf(`Upload chunk size (< %v).
@ -210,68 +219,6 @@ Note that we don't unmount the shared folder afterwards so the
shared folder.`, shared folder.`,
Default: false, Default: false,
Advanced: true, Advanced: true,
}, {
Name: "batch_mode",
Help: `Upload file batching sync|async|off.
This sets the batch mode used by rclone.
For full info see [the main docs](https://rclone.org/dropbox/#batch-mode)
This has 3 possible values
- off - no batching
- sync - batch uploads and check completion (default)
- async - batch upload and don't check completion
Rclone will close any outstanding batches when it exits which may make
a delay on quit.
`,
Default: "sync",
Advanced: true,
}, {
Name: "batch_size",
Help: `Max number of files in upload batch.
This sets the batch size of files to upload. It has to be less than 1000.
By default this is 0 which means rclone which calculate the batch size
depending on the setting of batch_mode.
- batch_mode: async - default batch_size is 100
- batch_mode: sync - default batch_size is the same as --transfers
- batch_mode: off - not in use
Rclone will close any outstanding batches when it exits which may make
a delay on quit.
Setting this is a great idea if you are uploading lots of small files
as it will make them a lot quicker. You can use --transfers 32 to
maximise throughput.
`,
Default: 0,
Advanced: true,
}, {
Name: "batch_timeout",
Help: `Max time to allow an idle upload batch before uploading.
If an upload batch is idle for more than this long then it will be
uploaded.
The default for this is 0 which means rclone will choose a sensible
default based on the batch_mode in use.
- batch_mode: async - default batch_timeout is 10s
- batch_mode: sync - default batch_timeout is 500ms
- batch_mode: off - not in use
`,
Default: fs.Duration(0),
Advanced: true,
}, {
Name: "batch_commit_timeout",
Help: `Max time to wait for a batch to finish committing`,
Default: fs.Duration(10 * time.Minute),
Advanced: true,
}, { }, {
Name: "pacer_min_sleep", Name: "pacer_min_sleep",
Default: defaultMinSleep, Default: defaultMinSleep,
@ -290,23 +237,22 @@ default based on the batch_mode in use.
encoder.EncodeDel | encoder.EncodeDel |
encoder.EncodeRightSpace | encoder.EncodeRightSpace |
encoder.EncodeInvalidUtf8, encoder.EncodeInvalidUtf8,
}}...), }}...), defaultBatcherOptions.FsOptions("For full info see [the main docs](https://rclone.org/dropbox/#batch-mode)\n\n")...),
}) })
} }
// Options defines the configuration for this backend // Options defines the configuration for this backend
type Options struct { type Options struct {
ChunkSize fs.SizeSuffix `config:"chunk_size"` ChunkSize fs.SizeSuffix `config:"chunk_size"`
Impersonate string `config:"impersonate"` Impersonate string `config:"impersonate"`
SharedFiles bool `config:"shared_files"` SharedFiles bool `config:"shared_files"`
SharedFolders bool `config:"shared_folders"` SharedFolders bool `config:"shared_folders"`
BatchMode string `config:"batch_mode"` BatchMode string `config:"batch_mode"`
BatchSize int `config:"batch_size"` BatchSize int `config:"batch_size"`
BatchTimeout fs.Duration `config:"batch_timeout"` BatchTimeout fs.Duration `config:"batch_timeout"`
BatchCommitTimeout fs.Duration `config:"batch_commit_timeout"` AsyncBatch bool `config:"async_batch"`
AsyncBatch bool `config:"async_batch"` PacerMinSleep fs.Duration `config:"pacer_min_sleep"`
PacerMinSleep fs.Duration `config:"pacer_min_sleep"` Enc encoder.MultiEncoder `config:"encoding"`
Enc encoder.MultiEncoder `config:"encoding"`
} }
// Fs represents a remote dropbox server // Fs represents a remote dropbox server
@ -325,7 +271,7 @@ type Fs struct {
slashRootSlash string // root with "/" prefix and postfix, lowercase slashRootSlash string // root with "/" prefix and postfix, lowercase
pacer *fs.Pacer // To pace the API calls pacer *fs.Pacer // To pace the API calls
ns string // The namespace we are using or "" for none ns string // The namespace we are using or "" for none
batcher *batcher // batch builder batcher *batcher.Batcher[*files.UploadSessionFinishArg, *files.FileMetadata]
} }
// Object describes a dropbox object // Object describes a dropbox object
@ -451,7 +397,11 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
ci: ci, ci: ci,
pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(opt.PacerMinSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(opt.PacerMinSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
} }
f.batcher, err = newBatcher(ctx, f, f.opt.BatchMode, f.opt.BatchSize, time.Duration(f.opt.BatchTimeout)) batcherOptions := defaultBatcherOptions
batcherOptions.Mode = f.opt.BatchMode
batcherOptions.Size = f.opt.BatchSize
batcherOptions.Timeout = time.Duration(f.opt.BatchTimeout)
f.batcher, err = batcher.New(ctx, f, f.commitBatch, batcherOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1722,7 +1672,7 @@ func (o *Object) uploadChunked(ctx context.Context, in0 io.Reader, commitInfo *f
// If we are batching then we should have written all the data now // If we are batching then we should have written all the data now
// store the commit info now for a batch commit // store the commit info now for a batch commit
if o.fs.batcher.Batching() { if o.fs.batcher.Batching() {
return o.fs.batcher.Commit(ctx, args) return o.fs.batcher.Commit(ctx, o.remote, args)
} }
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {

282
lib/batcher/batcher.go Normal file
View File

@ -0,0 +1,282 @@
// Package batcher implements a generic batcher.
//
// It uses two types:
//
// Item - the thing to be batched
// Result - the result from the batching
//
// And one function of type CommitBatchFn which is called to do the actual batching.
package batcher
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/lib/atexit"
)
// Options for configuring the batcher
type Options struct {
Mode string // mode of the batcher "sync", "async" or "off"
Size int // size of batch
Timeout time.Duration // timeout before committing the batch
MaxBatchSize int // max size the batch can be
DefaultTimeoutSync time.Duration // default time to kick off the batch if nothing added for this long (sync)
DefaultTimeoutAsync time.Duration // default time to kick off the batch if nothing added for this long (async)
DefaultBatchSizeAsync int // default batch size if async
}
// CommitBatchFn is called to commit a batch of Item and return Result to the callers.
//
// It should commit the batch of items then for each result i (of
// which there should be len(items)) it should set either results[i]
// or errors[i]
type CommitBatchFn[Item, Result any] func(ctx context.Context, items []Item, results []Result, errors []error) (err error)
// Batcher holds info about the current items waiting to be acted on.
type Batcher[Item, Result any] struct {
opt Options // options for configuring the batcher
f any // logging identity for fs.Debugf(f, ...)
commit CommitBatchFn[Item, Result] // User defined function to commit the batch
async bool // whether we are using async batching
in chan request[Item, Result] // incoming items to batch
closed chan struct{} // close to indicate batcher shut down
atexit atexit.FnHandle // atexit handle
shutOnce sync.Once // make sure we shutdown once only
wg sync.WaitGroup // wait for shutdown
}
// request holds an incoming request with a place for a reply
type request[Item, Result any] struct {
item Item
name string
result chan<- response[Result]
quit bool // if set then quit
}
// response holds a response to be delivered to clients waiting
// for a batch to complete.
type response[Result any] struct {
err error
entry Result
}
// New creates a Batcher for Item and Result calling commit to do the actual committing.
func New[Item, Result any](ctx context.Context, f any, commit CommitBatchFn[Item, Result], opt Options) (*Batcher[Item, Result], error) {
// fs.Debugf(f, "Creating batcher with mode %q, size %d, timeout %v", mode, size, timeout)
if opt.Size > opt.MaxBatchSize || opt.Size < 0 {
return nil, fmt.Errorf("batcher: batch size must be < %d and >= 0 - it is currently %d", opt.MaxBatchSize, opt.Size)
}
async := false
switch opt.Mode {
case "sync":
if opt.Size <= 0 {
ci := fs.GetConfig(ctx)
opt.Size = ci.Transfers
}
if opt.Timeout <= 0 {
opt.Timeout = opt.DefaultTimeoutSync
}
case "async":
if opt.Size <= 0 {
opt.Size = opt.DefaultBatchSizeAsync
}
if opt.Timeout <= 0 {
opt.Timeout = opt.DefaultTimeoutAsync
}
async = true
case "off":
opt.Size = 0
default:
return nil, fmt.Errorf("batcher: batch mode must be sync|async|off not %q", opt.Mode)
}
b := &Batcher[Item, Result]{
opt: opt,
f: f,
commit: commit,
async: async,
in: make(chan request[Item, Result], opt.Size),
closed: make(chan struct{}),
}
if b.Batching() {
b.atexit = atexit.Register(b.Shutdown)
b.wg.Add(1)
go b.commitLoop(context.Background())
}
return b, nil
}
// Batching returns true if batching is active
func (b *Batcher[Item, Result]) Batching() bool {
return b.opt.Size > 0
}
// commit a batch calling the user defined commit function then distributing the results.
func (b *Batcher[Item, Result]) commitBatch(ctx context.Context, requests []request[Item, Result]) (err error) {
// If commit fails then signal clients if sync
var signalled = b.async
defer func() {
if err != nil && !signalled {
// Signal to clients that there was an error
for _, req := range requests {
req.result <- response[Result]{err: err}
}
}
}()
desc := fmt.Sprintf("%s batch length %d starting with: %s", b.opt.Mode, len(requests), requests[0].name)
fs.Debugf(b.f, "Committing %s", desc)
var (
items = make([]Item, len(requests))
results = make([]Result, len(requests))
errors = make([]error, len(requests))
)
for i := range requests {
items[i] = requests[i].item
}
// Commit the batch
err = b.commit(ctx, items, results, errors)
if err != nil {
return err
}
// Report results to clients
var (
lastError error
errorCount = 0
)
for i, req := range requests {
result := results[i]
err := errors[i]
resp := response[Result]{}
if err == nil {
resp.entry = result
} else {
errorCount++
lastError = err
resp.err = fmt.Errorf("batch upload failed: %w", err)
}
if !b.async {
req.result <- resp
}
}
// show signalled so no need to report error to clients from now on
signalled = true
// Report an error if any failed in the batch
if lastError != nil {
return fmt.Errorf("batch had %d errors: last error: %w", errorCount, lastError)
}
fs.Debugf(b.f, "Committed %s", desc)
return nil
}
// commitLoop runs the commit engine in the background
func (b *Batcher[Item, Result]) commitLoop(ctx context.Context) {
var (
requests []request[Item, Result] // current batch of uncommitted Items
idleTimer = time.NewTimer(b.opt.Timeout)
commit = func() {
err := b.commitBatch(ctx, requests)
if err != nil {
fs.Errorf(b.f, "%s batch commit: failed to commit batch length %d: %v", b.opt.Mode, len(requests), err)
}
requests = nil
}
)
defer b.wg.Done()
defer idleTimer.Stop()
idleTimer.Stop()
outer:
for {
select {
case req := <-b.in:
if req.quit {
break outer
}
requests = append(requests, req)
idleTimer.Stop()
if len(requests) >= b.opt.Size {
commit()
} else {
idleTimer.Reset(b.opt.Timeout)
}
case <-idleTimer.C:
if len(requests) > 0 {
fs.Debugf(b.f, "Batch idle for %v so committing", b.opt.Timeout)
commit()
}
}
}
// commit any remaining items
if len(requests) > 0 {
commit()
}
}
// Shutdown finishes any pending batches then shuts everything down.
//
// This is registered as an atexit handler by New.
func (b *Batcher[Item, Result]) Shutdown() {
if !b.Batching() {
return
}
b.shutOnce.Do(func() {
atexit.Unregister(b.atexit)
fs.Infof(b.f, "Committing uploads - please wait...")
// show that batcher is shutting down
close(b.closed)
// quit the commitLoop by sending a quitRequest message
//
// Note that we don't close b.in because that will
// cause write to closed channel in Commit when we are
// exiting due to a signal.
b.in <- request[Item, Result]{quit: true}
b.wg.Wait()
})
}
// Commit commits the Item getting a Result or error using a batch
// call, first adding it to the batch and then waiting for the batch
// to complete in a synchronous way if async is not set.
//
// If async is set then this will return no error and a nil/empty
// Result.
//
// This should not be called if batching is off - check first with
// IsBatching.
func (b *Batcher[Item, Result]) Commit(ctx context.Context, name string, item Item) (entry Result, err error) {
select {
case <-b.closed:
return entry, fserrors.FatalError(errors.New("batcher is shutting down"))
default:
}
fs.Debugf(b.f, "Adding %q to batch", name)
resp := make(chan response[Result], 1)
b.in <- request[Item, Result]{
item: item,
name: name,
result: resp,
}
// If running async then don't wait for the result
if b.async {
return entry, nil
}
result := <-resp
return result.entry, result.err
}

275
lib/batcher/batcher_test.go Normal file
View File

@ -0,0 +1,275 @@
package batcher
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/rclone/rclone/fs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type (
Result string
Item string
)
func TestBatcherNew(t *testing.T) {
ctx := context.Background()
ci := fs.GetConfig(ctx)
opt := Options{
Mode: "async",
Size: 100,
Timeout: 1 * time.Second,
MaxBatchSize: 1000,
DefaultTimeoutSync: 500 * time.Millisecond,
DefaultTimeoutAsync: 10 * time.Second,
DefaultBatchSizeAsync: 100,
}
commitBatch := func(ctx context.Context, items []Item, results []Result, errors []error) (err error) {
return nil
}
b, err := New[Item, Result](ctx, nil, commitBatch, opt)
require.NoError(t, err)
require.True(t, b.Batching())
b.Shutdown()
opt.Mode = "sync"
b, err = New[Item, Result](ctx, nil, commitBatch, opt)
require.NoError(t, err)
require.True(t, b.Batching())
b.Shutdown()
opt.Mode = "off"
b, err = New[Item, Result](ctx, nil, commitBatch, opt)
require.NoError(t, err)
require.False(t, b.Batching())
b.Shutdown()
opt.Mode = "bad"
_, err = New[Item, Result](ctx, nil, commitBatch, opt)
require.ErrorContains(t, err, "batch mode")
opt.Mode = "async"
opt.Size = opt.MaxBatchSize + 1
_, err = New[Item, Result](ctx, nil, commitBatch, opt)
require.ErrorContains(t, err, "batch size")
opt.Mode = "sync"
opt.Size = 0
opt.Timeout = 0
b, err = New[Item, Result](ctx, nil, commitBatch, opt)
require.NoError(t, err)
assert.Equal(t, ci.Transfers, b.opt.Size)
assert.Equal(t, opt.DefaultTimeoutSync, b.opt.Timeout)
b.Shutdown()
opt.Mode = "async"
opt.Size = 0
opt.Timeout = 0
b, err = New[Item, Result](ctx, nil, commitBatch, opt)
require.NoError(t, err)
assert.Equal(t, opt.DefaultBatchSizeAsync, b.opt.Size)
assert.Equal(t, opt.DefaultTimeoutAsync, b.opt.Timeout)
b.Shutdown()
// Check we get an error on commit
_, err = b.Commit(ctx, "last", Item("last"))
require.ErrorContains(t, err, "shutting down")
}
func TestBatcherCommit(t *testing.T) {
ctx := context.Background()
opt := Options{
Mode: "sync",
Size: 3,
Timeout: 1 * time.Second,
MaxBatchSize: 1000,
DefaultTimeoutSync: 500 * time.Millisecond,
DefaultTimeoutAsync: 10 * time.Second,
DefaultBatchSizeAsync: 100,
}
var wg sync.WaitGroup
errFail := errors.New("fail")
var commits int
var totalSize int
commitBatch := func(ctx context.Context, items []Item, results []Result, errors []error) (err error) {
commits += 1
totalSize += len(items)
for i := range items {
if items[i] == "5" {
errors[i] = errFail
} else {
results[i] = Result(items[i]) + " result"
}
}
return nil
}
b, err := New[Item, Result](ctx, nil, commitBatch, opt)
require.NoError(t, err)
defer b.Shutdown()
for i := 0; i < 10; i++ {
wg.Add(1)
s := fmt.Sprintf("%d", i)
go func() {
defer wg.Done()
result, err := b.Commit(ctx, s, Item(s))
if s == "5" {
assert.True(t, errors.Is(err, errFail))
} else {
require.NoError(t, err)
assert.Equal(t, Result(s+" result"), result)
}
}()
}
wg.Wait()
assert.Equal(t, 4, commits)
assert.Equal(t, 10, totalSize)
}
func TestBatcherCommitFail(t *testing.T) {
ctx := context.Background()
opt := Options{
Mode: "sync",
Size: 3,
Timeout: 1 * time.Second,
MaxBatchSize: 1000,
DefaultTimeoutSync: 500 * time.Millisecond,
DefaultTimeoutAsync: 10 * time.Second,
DefaultBatchSizeAsync: 100,
}
var wg sync.WaitGroup
errFail := errors.New("fail")
var commits int
var totalSize int
commitBatch := func(ctx context.Context, items []Item, results []Result, errors []error) (err error) {
commits += 1
totalSize += len(items)
return errFail
}
b, err := New[Item, Result](ctx, nil, commitBatch, opt)
require.NoError(t, err)
defer b.Shutdown()
for i := 0; i < 10; i++ {
wg.Add(1)
s := fmt.Sprintf("%d", i)
go func() {
defer wg.Done()
_, err := b.Commit(ctx, s, Item(s))
assert.True(t, errors.Is(err, errFail))
}()
}
wg.Wait()
assert.Equal(t, 4, commits)
assert.Equal(t, 10, totalSize)
}
func TestBatcherCommitShutdown(t *testing.T) {
ctx := context.Background()
opt := Options{
Mode: "sync",
Size: 3,
Timeout: 1 * time.Second,
MaxBatchSize: 1000,
DefaultTimeoutSync: 500 * time.Millisecond,
DefaultTimeoutAsync: 10 * time.Second,
DefaultBatchSizeAsync: 100,
}
var wg sync.WaitGroup
var commits int
var totalSize int
commitBatch := func(ctx context.Context, items []Item, results []Result, errors []error) (err error) {
commits += 1
totalSize += len(items)
for i := range items {
results[i] = Result(items[i])
}
return nil
}
b, err := New[Item, Result](ctx, nil, commitBatch, opt)
require.NoError(t, err)
for i := 0; i < 10; i++ {
wg.Add(1)
s := fmt.Sprintf("%d", i)
go func() {
defer wg.Done()
result, err := b.Commit(ctx, s, Item(s))
assert.NoError(t, err)
assert.Equal(t, Result(s), result)
}()
}
time.Sleep(100 * time.Millisecond)
b.Shutdown() // shutdown with batches outstanding
wg.Wait()
assert.Equal(t, 4, commits)
assert.Equal(t, 10, totalSize)
}
func TestBatcherCommitAsync(t *testing.T) {
ctx := context.Background()
opt := Options{
Mode: "async",
Size: 3,
Timeout: 1 * time.Second,
MaxBatchSize: 1000,
DefaultTimeoutSync: 500 * time.Millisecond,
DefaultTimeoutAsync: 10 * time.Second,
DefaultBatchSizeAsync: 100,
}
var wg sync.WaitGroup
errFail := errors.New("fail")
var commits atomic.Int32
var totalSize atomic.Int32
commitBatch := func(ctx context.Context, items []Item, results []Result, errors []error) (err error) {
wg.Add(1)
defer wg.Done()
// t.Logf("commit %d", len(items))
commits.Add(1)
totalSize.Add(int32(len(items)))
for i := range items {
if items[i] == "5" {
errors[i] = errFail
} else {
results[i] = Result(items[i]) + " result"
}
}
return nil
}
b, err := New[Item, Result](ctx, nil, commitBatch, opt)
require.NoError(t, err)
defer b.Shutdown()
for i := 0; i < 10; i++ {
wg.Add(1)
s := fmt.Sprintf("%d", i)
go func() {
defer wg.Done()
result, err := b.Commit(ctx, s, Item(s))
// Async just returns straight away
require.NoError(t, err)
assert.Equal(t, Result(""), result)
}()
}
time.Sleep(2 * time.Second) // wait for batch timeout - needed with async
wg.Wait()
assert.Equal(t, int32(4), commits.Load())
assert.Equal(t, int32(10), totalSize.Load())
}

73
lib/batcher/options.go Normal file
View File

@ -0,0 +1,73 @@
package batcher
import (
"fmt"
"time"
"github.com/rclone/rclone/fs"
)
// FsOptions returns the batch mode fs.Options
func (opt *Options) FsOptions(extra string) []fs.Option {
return []fs.Option{{
Name: "batch_mode",
Help: fmt.Sprintf(`Upload file batching sync|async|off.
This sets the batch mode used by rclone.
%sThis has 3 possible values
- off - no batching
- sync - batch uploads and check completion (default)
- async - batch upload and don't check completion
Rclone will close any outstanding batches when it exits which may make
a delay on quit.
`, extra),
Default: "sync",
Advanced: true,
}, {
Name: "batch_size",
Help: fmt.Sprintf(`Max number of files in upload batch.
This sets the batch size of files to upload. It has to be less than %d.
By default this is 0 which means rclone which calculate the batch size
depending on the setting of batch_mode.
- batch_mode: async - default batch_size is %d
- batch_mode: sync - default batch_size is the same as --transfers
- batch_mode: off - not in use
Rclone will close any outstanding batches when it exits which may make
a delay on quit.
Setting this is a great idea if you are uploading lots of small files
as it will make them a lot quicker. You can use --transfers 32 to
maximise throughput.
`, opt.MaxBatchSize, opt.DefaultBatchSizeAsync),
Default: 0,
Advanced: true,
}, {
Name: "batch_timeout",
Help: fmt.Sprintf(`Max time to allow an idle upload batch before uploading.
If an upload batch is idle for more than this long then it will be
uploaded.
The default for this is 0 which means rclone will choose a sensible
default based on the batch_mode in use.
- batch_mode: async - default batch_timeout is %v
- batch_mode: sync - default batch_timeout is %v
- batch_mode: off - not in use
`, opt.DefaultTimeoutAsync, opt.DefaultTimeoutSync),
Default: fs.Duration(0),
Advanced: true,
}, {
Name: "batch_commit_timeout",
Help: `Max time to wait for a batch to finish committing`,
Default: fs.Duration(10 * time.Minute),
Advanced: true,
}}
}