From 01a155fb006c274cfc3ca280a5de32a4f1d783c0 Mon Sep 17 00:00:00 2001 From: Roberto Ricci Date: Fri, 18 Aug 2023 16:56:26 +0200 Subject: [PATCH] fs: use atomic types --- fs/operations/check.go | 66 ++++++++++++++++++------------------- fs/operations/operations.go | 14 ++++---- fs/rc/jobs/job.go | 6 ++-- fs/rc/jobs/job_test.go | 30 ++++++++--------- fs/sync/pipe_test.go | 8 ++--- 5 files changed, 62 insertions(+), 62 deletions(-) diff --git a/fs/operations/check.go b/fs/operations/check.go index ba8c08986..f96efb139 100644 --- a/fs/operations/check.go +++ b/fs/operations/check.go @@ -51,11 +51,11 @@ type checkMarch struct { ioMu sync.Mutex wg sync.WaitGroup tokens chan struct{} - differences int32 - noHashes int32 - srcFilesMissing int32 - dstFilesMissing int32 - matches int32 + differences atomic.Int32 + noHashes atomic.Int32 + srcFilesMissing atomic.Int32 + dstFilesMissing atomic.Int32 + matches atomic.Int32 opt CheckOpt } @@ -83,8 +83,8 @@ func (c *checkMarch) DstOnly(dst fs.DirEntry) (recurse bool) { err := fmt.Errorf("file not in %v", c.opt.Fsrc) fs.Errorf(dst, "%v", err) _ = fs.CountError(err) - atomic.AddInt32(&c.differences, 1) - atomic.AddInt32(&c.srcFilesMissing, 1) + c.differences.Add(1) + c.srcFilesMissing.Add(1) c.report(dst, c.opt.MissingOnSrc, '-') case fs.Directory: // Do the same thing to the entire contents of the directory @@ -105,8 +105,8 @@ func (c *checkMarch) SrcOnly(src fs.DirEntry) (recurse bool) { err := fmt.Errorf("file not in %v", c.opt.Fdst) fs.Errorf(src, "%v", err) _ = fs.CountError(err) - atomic.AddInt32(&c.differences, 1) - atomic.AddInt32(&c.dstFilesMissing, 1) + c.differences.Add(1) + c.dstFilesMissing.Add(1) c.report(src, c.opt.MissingOnDst, '+') case fs.Directory: // Do the same thing to the entire contents of the directory @@ -157,16 +157,16 @@ func (c *checkMarch) Match(ctx context.Context, dst, src fs.DirEntry) (recurse b _ = fs.CountError(err) c.report(src, c.opt.Error, '!') } else if differ { - atomic.AddInt32(&c.differences, 1) + c.differences.Add(1) err := errors.New("files differ") // the checkFn has already logged the reason _ = fs.CountError(err) c.report(src, c.opt.Differ, '*') } else { - atomic.AddInt32(&c.matches, 1) + c.matches.Add(1) c.report(src, c.opt.Match, '=') if noHash { - atomic.AddInt32(&c.noHashes, 1) + c.noHashes.Add(1) fs.Debugf(dstX, "OK - could not check hash") } else { fs.Debugf(dstX, "OK") @@ -177,8 +177,8 @@ func (c *checkMarch) Match(ctx context.Context, dst, src fs.DirEntry) (recurse b err := fmt.Errorf("is file on %v but directory on %v", c.opt.Fsrc, c.opt.Fdst) fs.Errorf(src, "%v", err) _ = fs.CountError(err) - atomic.AddInt32(&c.differences, 1) - atomic.AddInt32(&c.dstFilesMissing, 1) + c.differences.Add(1) + c.dstFilesMissing.Add(1) c.report(src, c.opt.MissingOnDst, '+') } case fs.Directory: @@ -190,8 +190,8 @@ func (c *checkMarch) Match(ctx context.Context, dst, src fs.DirEntry) (recurse b err := fmt.Errorf("is file on %v but directory on %v", c.opt.Fdst, c.opt.Fsrc) fs.Errorf(dst, "%v", err) _ = fs.CountError(err) - atomic.AddInt32(&c.differences, 1) - atomic.AddInt32(&c.srcFilesMissing, 1) + c.differences.Add(1) + c.srcFilesMissing.Add(1) c.report(dst, c.opt.MissingOnSrc, '-') default: @@ -235,33 +235,33 @@ func CheckFn(ctx context.Context, opt *CheckOpt) error { } func (c *checkMarch) reportResults(ctx context.Context, err error) error { - if c.dstFilesMissing > 0 { - fs.Logf(c.opt.Fdst, "%d files missing", c.dstFilesMissing) + if c.dstFilesMissing.Load() > 0 { + fs.Logf(c.opt.Fdst, "%d files missing", c.dstFilesMissing.Load()) } - if c.srcFilesMissing > 0 { + if c.srcFilesMissing.Load() > 0 { entity := "files" if c.opt.Fsrc == nil { entity = "hashes" } - fs.Logf(c.opt.Fsrc, "%d %s missing", c.srcFilesMissing, entity) + fs.Logf(c.opt.Fsrc, "%d %s missing", c.srcFilesMissing.Load(), entity) } fs.Logf(c.opt.Fdst, "%d differences found", accounting.Stats(ctx).GetErrors()) if errs := accounting.Stats(ctx).GetErrors(); errs > 0 { fs.Logf(c.opt.Fdst, "%d errors while checking", errs) } - if c.noHashes > 0 { - fs.Logf(c.opt.Fdst, "%d hashes could not be checked", c.noHashes) + if c.noHashes.Load() > 0 { + fs.Logf(c.opt.Fdst, "%d hashes could not be checked", c.noHashes.Load()) } - if c.matches > 0 { - fs.Logf(c.opt.Fdst, "%d matching files", c.matches) + if c.matches.Load() > 0 { + fs.Logf(c.opt.Fdst, "%d matching files", c.matches.Load()) } if err != nil { return err } - if c.differences > 0 { + if c.differences.Load() > 0 { // Return an already counted error so we don't double count this error too - err = fserrors.FsError(fmt.Errorf("%d differences found", c.differences)) + err = fserrors.FsError(fmt.Errorf("%d differences found", c.differences.Load())) fserrors.Count(err) return err } @@ -430,7 +430,7 @@ func CheckSum(ctx context.Context, fsrc, fsum fs.Fs, sumFile string, hashType ha if lastErr == nil { lastErr = err } - atomic.AddInt32(&c.dstFilesMissing, 1) + c.dstFilesMissing.Add(1) c.reportFilename(filename, opt.MissingOnDst, '+') } @@ -457,8 +457,8 @@ func (c *checkMarch) checkSum(ctx context.Context, obj fs.Object, download bool, err = errors.New("sum not found") _ = fs.CountError(err) fs.Errorf(obj, "%v", err) - atomic.AddInt32(&c.differences, 1) - atomic.AddInt32(&c.srcFilesMissing, 1) + c.differences.Add(1) + c.srcFilesMissing.Add(1) c.report(obj, c.opt.MissingOnSrc, '-') return } @@ -515,12 +515,12 @@ func (c *checkMarch) matchSum(ctx context.Context, sumHash, objHash string, obj case objHash == "": fs.Debugf(nil, "%v = %s (sum)", hashType, sumHash) fs.Debugf(obj, "%v - could not check hash (%v)", hashType, c.opt.Fdst) - atomic.AddInt32(&c.noHashes, 1) - atomic.AddInt32(&c.matches, 1) + c.noHashes.Add(1) + c.matches.Add(1) c.report(obj, c.opt.Match, '=') case objHash == sumHash: fs.Debugf(obj, "%v = %s OK", hashType, sumHash) - atomic.AddInt32(&c.matches, 1) + c.matches.Add(1) c.report(obj, c.opt.Match, '=') default: err = errors.New("files differ") @@ -528,7 +528,7 @@ func (c *checkMarch) matchSum(ctx context.Context, sumHash, objHash string, obj fs.Debugf(nil, "%v = %s (sum)", hashType, sumHash) fs.Debugf(obj, "%v = %s (%v)", hashType, objHash, c.opt.Fdst) fs.Errorf(obj, "%v", err) - atomic.AddInt32(&c.differences, 1) + c.differences.Add(1) c.report(obj, c.opt.Differ, '*') } } diff --git a/fs/operations/operations.go b/fs/operations/operations.go index 669f1c156..0d032b5b4 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -764,8 +764,8 @@ func DeleteFilesWithBackupDir(ctx context.Context, toBeDeleted fs.ObjectsChan, b var wg sync.WaitGroup ci := fs.GetConfig(ctx) wg.Add(ci.Checkers) - var errorCount int32 - var fatalErrorCount int32 + var errorCount atomic.Int32 + var fatalErrorCount atomic.Int32 for i := 0; i < ci.Checkers; i++ { go func() { @@ -773,10 +773,10 @@ func DeleteFilesWithBackupDir(ctx context.Context, toBeDeleted fs.ObjectsChan, b for dst := range toBeDeleted { err := DeleteFileWithBackupDir(ctx, dst, backupDir) if err != nil { - atomic.AddInt32(&errorCount, 1) + errorCount.Add(1) if fserrors.IsFatalError(err) { fs.Errorf(nil, "Got fatal error on delete: %s", err) - atomic.AddInt32(&fatalErrorCount, 1) + fatalErrorCount.Add(1) return } } @@ -785,9 +785,9 @@ func DeleteFilesWithBackupDir(ctx context.Context, toBeDeleted fs.ObjectsChan, b } fs.Debugf(nil, "Waiting for deletions to finish") wg.Wait() - if errorCount > 0 { - err := fmt.Errorf("failed to delete %d files", errorCount) - if fatalErrorCount > 0 { + if errorCount.Load() > 0 { + err := fmt.Errorf("failed to delete %d files", errorCount.Load()) + if fatalErrorCount.Load() > 0 { return fserrors.FatalError(err) } return err diff --git a/fs/rc/jobs/job.go b/fs/rc/jobs/job.go index 3444178b0..5523a66d6 100644 --- a/fs/rc/jobs/job.go +++ b/fs/rc/jobs/job.go @@ -122,7 +122,7 @@ type Jobs struct { var ( running = newJobs() - jobID = int64(0) + jobID atomic.Int64 executeID = uuid.New().String() ) @@ -141,7 +141,7 @@ func SetOpt(opt *rc.Options) { // SetInitialJobID allows for setting jobID before starting any jobs. func SetInitialJobID(id int64) { - if !atomic.CompareAndSwapInt64(&jobID, 0, id) { + if !jobID.CompareAndSwap(0, id) { panic("Setting jobID is only possible before starting any jobs") } } @@ -264,7 +264,7 @@ var jobKey = jobKeyType{} // NewJob creates a Job and executes it, possibly in the background if _async is set func (jobs *Jobs) NewJob(ctx context.Context, fn rc.Func, in rc.Params) (job *Job, out rc.Params, err error) { - id := atomic.AddInt64(&jobID, 1) + id := jobID.Add(1) in = in.Copy() // copy input so we can change it ctx, isAsync, err := getAsync(ctx, in) diff --git a/fs/rc/jobs/job_test.go b/fs/rc/jobs/job_test.go index 3fa45a471..6fc4a822c 100644 --- a/fs/rc/jobs/job_test.go +++ b/fs/rc/jobs/job_test.go @@ -229,7 +229,7 @@ func TestJobRunPanic(t *testing.T) { func TestJobsNewJob(t *testing.T) { ctx := context.Background() - jobID = 0 + jobID.Store(0) jobs := newJobs() job, out, err := jobs.NewJob(ctx, noopFn, rc.Params{"_async": true}) require.NoError(t, err) @@ -241,7 +241,7 @@ func TestJobsNewJob(t *testing.T) { func TestStartJob(t *testing.T) { ctx := context.Background() - jobID = 0 + jobID.Store(0) job, out, err := NewJob(ctx, longFn, rc.Params{"_async": true}) assert.NoError(t, err) assert.Equal(t, rc.Params{"jobid": int64(1)}, out) @@ -249,7 +249,7 @@ func TestStartJob(t *testing.T) { } func TestExecuteJob(t *testing.T) { - jobID = 0 + jobID.Store(0) job, out, err := NewJob(context.Background(), shortFn, rc.Params{}) assert.NoError(t, err) assert.Equal(t, int64(1), job.ID) @@ -258,7 +258,7 @@ func TestExecuteJob(t *testing.T) { func TestExecuteJobWithConfig(t *testing.T) { ctx := context.Background() - jobID = 0 + jobID.Store(0) called := false jobFn := func(ctx context.Context, in rc.Params) (rc.Params, error) { ci := fs.GetConfig(ctx) @@ -274,7 +274,7 @@ func TestExecuteJobWithConfig(t *testing.T) { require.NoError(t, err) assert.Equal(t, true, called) // Retest with string parameter - jobID = 0 + jobID.Store(0) called = false _, _, err = NewJob(ctx, jobFn, rc.Params{ "_config": `{"BufferSize": "42M"}`, @@ -289,7 +289,7 @@ func TestExecuteJobWithConfig(t *testing.T) { func TestExecuteJobWithFilter(t *testing.T) { ctx := context.Background() called := false - jobID = 0 + jobID.Store(0) jobFn := func(ctx context.Context, in rc.Params) (rc.Params, error) { fi := filter.GetConfig(ctx) assert.Equal(t, fs.SizeSuffix(1024), fi.Opt.MaxSize) @@ -309,7 +309,7 @@ func TestExecuteJobWithFilter(t *testing.T) { func TestExecuteJobWithGroup(t *testing.T) { ctx := context.Background() - jobID = 0 + jobID.Store(0) called := false jobFn := func(ctx context.Context, in rc.Params) (rc.Params, error) { called = true @@ -327,7 +327,7 @@ func TestExecuteJobWithGroup(t *testing.T) { func TestExecuteJobErrorPropagation(t *testing.T) { ctx := context.Background() - jobID = 0 + jobID.Store(0) testErr := errors.New("test error") errorFn := func(ctx context.Context, in rc.Params) (out rc.Params, err error) { @@ -339,7 +339,7 @@ func TestExecuteJobErrorPropagation(t *testing.T) { func TestRcJobStatus(t *testing.T) { ctx := context.Background() - jobID = 0 + jobID.Store(0) _, _, err := NewJob(ctx, longFn, rc.Params{"_async": true}) assert.NoError(t, err) @@ -367,7 +367,7 @@ func TestRcJobStatus(t *testing.T) { func TestRcJobList(t *testing.T) { ctx := context.Background() - jobID = 0 + jobID.Store(0) _, _, err := NewJob(ctx, longFn, rc.Params{"_async": true}) assert.NoError(t, err) @@ -396,7 +396,7 @@ func TestRcJobList(t *testing.T) { func TestRcAsyncJobStop(t *testing.T) { ctx := context.Background() - jobID = 0 + jobID.Store(0) _, _, err := NewJob(ctx, ctxFn, rc.Params{"_async": true}) assert.NoError(t, err) @@ -434,7 +434,7 @@ func TestRcAsyncJobStop(t *testing.T) { func TestRcSyncJobStop(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) go func() { - jobID = 0 + jobID.Store(0) job, out, err := NewJob(ctx, ctxFn, rc.Params{}) assert.Error(t, err) assert.Equal(t, int64(1), job.ID) @@ -477,7 +477,7 @@ func TestRcSyncJobStop(t *testing.T) { func TestRcJobStopGroup(t *testing.T) { ctx := context.Background() - jobID = 0 + jobID.Store(0) _, _, err := NewJob(ctx, ctxFn, rc.Params{ "_async": true, "_group": "myparty", @@ -518,7 +518,7 @@ func TestRcJobStopGroup(t *testing.T) { } func TestOnFinish(t *testing.T) { - jobID = 0 + jobID.Store(0) done := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) job, _, err := NewJob(ctx, ctxParmFn(ctx, false), rc.Params{"_async": true}) @@ -538,7 +538,7 @@ func TestOnFinish(t *testing.T) { } func TestOnFinishAlreadyFinished(t *testing.T) { - jobID = 0 + jobID.Store(0) done := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/fs/sync/pipe_test.go b/fs/sync/pipe_test.go index 2d56506ce..4d9afbe26 100644 --- a/fs/sync/pipe_test.go +++ b/fs/sync/pipe_test.go @@ -112,7 +112,7 @@ func TestPipeConcurrent(t *testing.T) { obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone) pair1 := fs.ObjectPair{Src: obj1, Dst: nil} ctx := context.Background() - var count int64 + var count atomic.Int64 for j := 0; j < readWriters; j++ { wg.Add(2) @@ -123,7 +123,7 @@ func TestPipeConcurrent(t *testing.T) { pair2, ok := p.Get(ctx) assert.Equal(t, pair1, pair2) assert.Equal(t, true, ok) - atomic.AddInt64(&count, -1) + count.Add(-1) } }() go func() { @@ -132,13 +132,13 @@ func TestPipeConcurrent(t *testing.T) { // Put an object ok := p.Put(ctx, pair1) assert.Equal(t, true, ok) - atomic.AddInt64(&count, 1) + count.Add(1) } }() } wg.Wait() - assert.Equal(t, int64(0), count) + assert.Equal(t, int64(0), count.Load()) } func TestPipeOrderBy(t *testing.T) {