stats: fix race between ResetCounters and stopAverageLoop called from time.AfterFunc

Before this change StatsInfo.ResetCounters() and stopAverageLoop()
(when called from time.AfterFunc) could race on StatsInfo.average.
This was because the deferred stopAverageLoop accessed
StatsInfo.average without locking.

For some reason this only ever happened on macOS. This caused the CI
to fail on macOS thus causing the macOS builds not to appear.

This commit fixes the problem with a bit of extra locking.

It also renames all StatsInfo methods that should be called without
the lock to start with an initial underscore as this is the convention
we use elsewhere.

Fixes #7567
This commit is contained in:
Nick Craig-Wood 2024-01-16 17:30:24 +00:00
parent d20f647487
commit ae3c73f610
3 changed files with 31 additions and 17 deletions

View File

@ -90,7 +90,7 @@ func (c *RcloneCollector) Collect(ch chan<- prometheus.Metric) {
s.mu.RLock() s.mu.RLock()
ch <- prometheus.MustNewConstMetric(c.bytesTransferred, prometheus.CounterValue, float64(s.bytes)) ch <- prometheus.MustNewConstMetric(c.bytesTransferred, prometheus.CounterValue, float64(s.bytes))
ch <- prometheus.MustNewConstMetric(c.transferSpeed, prometheus.GaugeValue, s.speed()) ch <- prometheus.MustNewConstMetric(c.transferSpeed, prometheus.GaugeValue, s._speed())
ch <- prometheus.MustNewConstMetric(c.numOfErrors, prometheus.CounterValue, float64(s.errors)) ch <- prometheus.MustNewConstMetric(c.numOfErrors, prometheus.CounterValue, float64(s.errors))
ch <- prometheus.MustNewConstMetric(c.numOfCheckFiles, prometheus.CounterValue, float64(s.checks)) ch <- prometheus.MustNewConstMetric(c.numOfCheckFiles, prometheus.CounterValue, float64(s.checks))
ch <- prometheus.MustNewConstMetric(c.transferredFiles, prometheus.CounterValue, float64(s.transfers)) ch <- prometheus.MustNewConstMetric(c.transferredFiles, prometheus.CounterValue, float64(s.transfers))

View File

@ -139,10 +139,10 @@ func (s *StatsInfo) RemoteStats() (out rc.Params, err error) {
return out, nil return out, nil
} }
// speed returns the average speed of the transfer in bytes/second // _speed returns the average speed of the transfer in bytes/second
// //
// Call with lock held // Call with lock held
func (s *StatsInfo) speed() float64 { func (s *StatsInfo) _speed() float64 {
return s.average.speed return s.average.speed
} }
@ -213,8 +213,9 @@ func (trs timeRanges) total() (total time.Duration) {
// Total duration is union of durations of all transfers belonging to this // Total duration is union of durations of all transfers belonging to this
// object. // object.
//
// Needs to be protected by mutex. // Needs to be protected by mutex.
func (s *StatsInfo) totalDuration() time.Duration { func (s *StatsInfo) _totalDuration() time.Duration {
// copy of s.oldTimeRanges with extra room for the current transfers // copy of s.oldTimeRanges with extra room for the current transfers
timeRanges := make(timeRanges, len(s.oldTimeRanges), len(s.oldTimeRanges)+len(s.startedTransfers)) timeRanges := make(timeRanges, len(s.oldTimeRanges), len(s.oldTimeRanges)+len(s.startedTransfers))
copy(timeRanges, s.oldTimeRanges) copy(timeRanges, s.oldTimeRanges)
@ -313,7 +314,7 @@ func (s *StatsInfo) calculateTransferStats() (ts transferStats) {
// we take it off here to avoid double counting // we take it off here to avoid double counting
ts.totalBytes = s.transferQueueSize + s.bytes + transferringBytesTotal - transferringBytesDone ts.totalBytes = s.transferQueueSize + s.bytes + transferringBytesTotal - transferringBytesDone
ts.speed = s.average.speed ts.speed = s.average.speed
dt := s.totalDuration() dt := s._totalDuration()
ts.transferTime = dt.Seconds() ts.transferTime = dt.Seconds()
return ts return ts
@ -355,20 +356,33 @@ func (s *StatsInfo) averageLoop() {
} }
} }
// Start the average loop
func (s *StatsInfo) startAverageLoop() { func (s *StatsInfo) startAverageLoop() {
s.mu.RLock()
defer s.mu.RUnlock()
s.average.startOnce.Do(func() { s.average.startOnce.Do(func() {
s.average.stopped.Add(1) s.average.stopped.Add(1)
go s.averageLoop() go s.averageLoop()
}) })
} }
func (s *StatsInfo) stopAverageLoop() { // Stop the average loop
//
// Call with the mutex held
func (s *StatsInfo) _stopAverageLoop() {
s.average.stopOnce.Do(func() { s.average.stopOnce.Do(func() {
close(s.average.stop) close(s.average.stop)
s.average.stopped.Wait() s.average.stopped.Wait()
}) })
} }
// Stop the average loop
func (s *StatsInfo) stopAverageLoop() {
s.mu.RLock()
defer s.mu.RUnlock()
s._stopAverageLoop()
}
// String convert the StatsInfo to a string for printing // String convert the StatsInfo to a string for printing
func (s *StatsInfo) String() string { func (s *StatsInfo) String() string {
// NB if adding more stats in here, remember to add them into // NB if adding more stats in here, remember to add them into
@ -682,7 +696,7 @@ func (s *StatsInfo) ResetCounters() {
s.startedTransfers = nil s.startedTransfers = nil
s.oldDuration = 0 s.oldDuration = 0
s.stopAverageLoop() s._stopAverageLoop()
s.average = averageValues{stop: make(chan bool)} s.average = averageValues{stop: make(chan bool)}
} }
@ -822,11 +836,11 @@ func (s *StatsInfo) AddTransfer(transfer *Transfer) {
s.mu.Unlock() s.mu.Unlock()
} }
// removeTransfer removes a reference to the started transfer in // _removeTransfer removes a reference to the started transfer in
// position i. // position i.
// //
// Must be called with the lock held // Must be called with the lock held
func (s *StatsInfo) removeTransfer(transfer *Transfer, i int) { func (s *StatsInfo) _removeTransfer(transfer *Transfer, i int) {
now := time.Now() now := time.Now()
// add finished transfer onto old time ranges // add finished transfer onto old time ranges
@ -858,7 +872,7 @@ func (s *StatsInfo) RemoveTransfer(transfer *Transfer) {
s.mu.Lock() s.mu.Lock()
for i, tr := range s.startedTransfers { for i, tr := range s.startedTransfers {
if tr == transfer { if tr == transfer {
s.removeTransfer(tr, i) s._removeTransfer(tr, i)
break break
} }
} }
@ -876,7 +890,7 @@ func (s *StatsInfo) PruneTransfers() {
if len(s.startedTransfers) > MaxCompletedTransfers+s.ci.Transfers { if len(s.startedTransfers) > MaxCompletedTransfers+s.ci.Transfers {
for i, tr := range s.startedTransfers { for i, tr := range s.startedTransfers {
if tr.IsDone() { if tr.IsDone() {
s.removeTransfer(tr, i) s._removeTransfer(tr, i)
break break
} }
} }

View File

@ -157,7 +157,7 @@ func TestStatsTotalDuration(t *testing.T) {
s.AddTransfer(tr1) s.AddTransfer(tr1)
s.mu.Lock() s.mu.Lock()
total := s.totalDuration() total := s._totalDuration()
s.mu.Unlock() s.mu.Unlock()
assert.Equal(t, 1, len(s.startedTransfers)) assert.Equal(t, 1, len(s.startedTransfers))
@ -175,7 +175,7 @@ func TestStatsTotalDuration(t *testing.T) {
s.AddTransfer(tr1) s.AddTransfer(tr1)
s.mu.Lock() s.mu.Lock()
total := s.totalDuration() total := s._totalDuration()
s.mu.Unlock() s.mu.Unlock()
assert.Equal(t, time.Since(time1)/time.Second, total/time.Second) assert.Equal(t, time.Since(time1)/time.Second, total/time.Second)
@ -213,7 +213,7 @@ func TestStatsTotalDuration(t *testing.T) {
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
s.mu.Lock() s.mu.Lock()
total := s.totalDuration() total := s._totalDuration()
s.mu.Unlock() s.mu.Unlock()
assert.Equal(t, time.Duration(30), total/time.Second) assert.Equal(t, time.Duration(30), total/time.Second)
@ -244,7 +244,7 @@ func TestStatsTotalDuration(t *testing.T) {
}) })
s.mu.Lock() s.mu.Lock()
total := s.totalDuration() total := s._totalDuration()
s.mu.Unlock() s.mu.Unlock()
assert.Equal(t, startTime.Sub(time1)/time.Second, total/time.Second) assert.Equal(t, startTime.Sub(time1)/time.Second, total/time.Second)
@ -449,7 +449,7 @@ func TestPruneTransfers(t *testing.T) {
} }
s.mu.Lock() s.mu.Lock()
assert.Equal(t, time.Duration(test.Transfers)*time.Second, s.totalDuration()) assert.Equal(t, time.Duration(test.Transfers)*time.Second, s._totalDuration())
assert.Equal(t, test.Transfers, len(s.startedTransfers)) assert.Equal(t, test.Transfers, len(s.startedTransfers))
s.mu.Unlock() s.mu.Unlock()
@ -458,7 +458,7 @@ func TestPruneTransfers(t *testing.T) {
} }
s.mu.Lock() s.mu.Lock()
assert.Equal(t, time.Duration(test.Transfers)*time.Second, s.totalDuration()) assert.Equal(t, time.Duration(test.Transfers)*time.Second, s._totalDuration())
assert.Equal(t, test.ExpectedStartedTransfers, len(s.startedTransfers)) assert.Equal(t, test.ExpectedStartedTransfers, len(s.startedTransfers))
s.mu.Unlock() s.mu.Unlock()