From 2b0a25a64d95edb671bfc31df99577bff3b2faff Mon Sep 17 00:00:00 2001 From: nielash Date: Mon, 11 Mar 2024 19:39:54 -0400 Subject: [PATCH] memory: fix deadlock in operations.Purge Before this change, the Memory backend had the potential to deadlock under certain conditions, if the ListR callback required locking the b.mu mutex. This was the case with operations.Purge, because Memory has no Purge method, and the fallback option does: err = DeleteFiles(ctx, listToChan(ctx, f, dir)) which potentially starts removing objects before the listing has completed. This change fixes the issue by batching all the entries before calling the callback on them. --- backend/memory/memory.go | 19 +++++++++--- backend/memory/memory_internal_test.go | 40 ++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 4 deletions(-) create mode 100644 backend/memory/memory_internal_test.go diff --git a/backend/memory/memory.go b/backend/memory/memory.go index 11c02d78a..608f42292 100644 --- a/backend/memory/memory.go +++ b/backend/memory/memory.go @@ -38,8 +38,7 @@ func init() { } // Options defines the configuration for this backend -type Options struct { -} +type Options struct{} // Fs represents a remote memory server type Fs struct { @@ -385,10 +384,22 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { bucket, directory := f.split(dir) list := walk.NewListRHelper(callback) + entries := fs.DirEntries{} listR := func(bucket, directory, prefix string, addBucket bool) error { - return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, entry fs.DirEntry, isDirectory bool) error { - return list.Add(entry) + err = f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, entry fs.DirEntry, isDirectory bool) error { + entries = append(entries, entry) // can't list.Add here -- could deadlock + return nil }) + if err != nil { + return err + } + for _, entry := range entries { + err = list.Add(entry) + if err != nil { + return err + } + } + return nil } if bucket == "" { entries, err := f.listBuckets(ctx) diff --git a/backend/memory/memory_internal_test.go b/backend/memory/memory_internal_test.go new file mode 100644 index 000000000..fe8db1f57 --- /dev/null +++ b/backend/memory/memory_internal_test.go @@ -0,0 +1,40 @@ +package memory + +import ( + "context" + "fmt" + "testing" + + _ "github.com/rclone/rclone/backend/local" + "github.com/rclone/rclone/fs/operations" + "github.com/rclone/rclone/fstest" + "github.com/rclone/rclone/fstest/fstests" + "github.com/stretchr/testify/require" +) + +var t1 = fstest.Time("2001-02-03T04:05:06.499999999Z") + +// InternalTest dispatches all internal tests +func (f *Fs) InternalTest(t *testing.T) { + t.Run("PurgeListDeadlock", func(t *testing.T) { + testPurgeListDeadlock(t) + }) +} + +// test that Purge fallback does not result in deadlock from concurrently listing and removing +func testPurgeListDeadlock(t *testing.T) { + ctx := context.Background() + r := fstest.NewRunIndividual(t) + r.Mkdir(ctx, r.Fremote) + r.Fremote.Features().Disable("Purge") // force fallback-purge + + // make a lot of files to prevent it from finishing too quickly + for i := 0; i < 100; i++ { + dst := "file" + fmt.Sprint(i) + ".txt" + r.WriteObject(ctx, dst, "hello", t1) + } + + require.NoError(t, operations.Purge(ctx, r.Fremote, "")) +} + +var _ fstests.InternalTester = (*Fs)(nil)