memory: fix deadlock in operations.Purge

Before this change, the Memory backend had the potential to deadlock under
certain conditions, if the ListR callback required locking the b.mu mutex. This
was the case with operations.Purge, because Memory has no Purge method, and the
fallback option does:

	err = DeleteFiles(ctx, listToChan(ctx, f, dir))

which potentially starts removing objects before the listing has completed.

This change fixes the issue by batching all the entries before calling the
callback on them.
This commit is contained in:
nielash 2024-03-11 19:39:54 -04:00
parent 2bebbfaded
commit 2b0a25a64d
2 changed files with 55 additions and 4 deletions

View File

@ -38,8 +38,7 @@ func init() {
} }
// Options defines the configuration for this backend // Options defines the configuration for this backend
type Options struct { type Options struct{}
}
// Fs represents a remote memory server // Fs represents a remote memory server
type Fs struct { type Fs struct {
@ -385,10 +384,22 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
bucket, directory := f.split(dir) bucket, directory := f.split(dir)
list := walk.NewListRHelper(callback) list := walk.NewListRHelper(callback)
entries := fs.DirEntries{}
listR := func(bucket, directory, prefix string, addBucket bool) error { listR := func(bucket, directory, prefix string, addBucket bool) error {
return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, entry fs.DirEntry, isDirectory bool) error { err = f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, entry fs.DirEntry, isDirectory bool) error {
return list.Add(entry) entries = append(entries, entry) // can't list.Add here -- could deadlock
return nil
}) })
if err != nil {
return err
}
for _, entry := range entries {
err = list.Add(entry)
if err != nil {
return err
}
}
return nil
} }
if bucket == "" { if bucket == "" {
entries, err := f.listBuckets(ctx) entries, err := f.listBuckets(ctx)

View File

@ -0,0 +1,40 @@
package memory
import (
"context"
"fmt"
"testing"
_ "github.com/rclone/rclone/backend/local"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/fstest/fstests"
"github.com/stretchr/testify/require"
)
var t1 = fstest.Time("2001-02-03T04:05:06.499999999Z")
// InternalTest dispatches all internal tests
func (f *Fs) InternalTest(t *testing.T) {
t.Run("PurgeListDeadlock", func(t *testing.T) {
testPurgeListDeadlock(t)
})
}
// test that Purge fallback does not result in deadlock from concurrently listing and removing
func testPurgeListDeadlock(t *testing.T) {
ctx := context.Background()
r := fstest.NewRunIndividual(t)
r.Mkdir(ctx, r.Fremote)
r.Fremote.Features().Disable("Purge") // force fallback-purge
// make a lot of files to prevent it from finishing too quickly
for i := 0; i < 100; i++ {
dst := "file" + fmt.Sprint(i) + ".txt"
r.WriteObject(ctx, dst, "hello", t1)
}
require.NoError(t, operations.Purge(ctx, r.Fremote, ""))
}
var _ fstests.InternalTester = (*Fs)(nil)