From 04aa6969a47516643cdf9084f9cdeae320a3bb81 Mon Sep 17 00:00:00 2001 From: Haochen Tong Date: Mon, 14 Jun 2021 00:10:15 +0800 Subject: [PATCH] accounting: calculate rolling average speed --- fs/accounting/stats.go | 90 +++++++++++++++++++++++++++++++---- fs/accounting/stats_groups.go | 3 +- go.sum | 10 ---- 3 files changed, 84 insertions(+), 19 deletions(-) diff --git a/fs/accounting/stats.go b/fs/accounting/stats.go index 8c6584d6c..8a5e67f37 100644 --- a/fs/accounting/stats.go +++ b/fs/accounting/stats.go @@ -15,6 +15,11 @@ import ( "github.com/rclone/rclone/lib/terminal" ) +const ( + averagePeriodLength = time.Second + averageStopAfter = time.Minute +) + // MaxCompletedTransfers specifies maximum number of completed transfers in startedTransfers list var MaxCompletedTransfers = 100 @@ -48,6 +53,18 @@ type StatsInfo struct { oldDuration time.Duration // duration of transfers we have culled group string startTime time.Time // the moment these stats were initialized or reset + average averageValues +} + +type averageValues struct { + mu sync.Mutex + lpBytes int64 + lpTime time.Time + speed float64 + stop chan bool + stopped sync.WaitGroup + startOnce sync.Once + stopOnce sync.Once } // NewStats creates an initialised StatsInfo @@ -60,6 +77,7 @@ func NewStats(ctx context.Context) *StatsInfo { transferring: newTransferMap(ci.Transfers, "transferring"), inProgress: newInProgress(ctx), startTime: time.Now(), + average: averageValues{stop: make(chan bool)}, } } @@ -109,7 +127,7 @@ func (s *StatsInfo) RemoteStats() (out rc.Params, err error) { return out, nil } -// Speed returns the average speed of the transfer in bytes/second +// speed returns the average speed of the transfer in bytes/second // // Call with lock held func (s *StatsInfo) speed() float64 { @@ -274,17 +292,61 @@ func (s *StatsInfo) calculateTransferStats() (ts transferStats) { // note that s.bytes already includes transferringBytesDone so // we take it off here to avoid double counting ts.totalBytes = s.transferQueueSize + s.bytes + transferringBytesTotal - transferringBytesDone - - dt := s.totalDuration() - ts.transferTime = dt.Seconds() - ts.speed = 0.0 - if dt > 0 { - ts.speed = float64(s.bytes) / ts.transferTime - } + ts.speed = s.average.speed return ts } +func (s *StatsInfo) averageLoop() { + var period float64 + + ticker := time.NewTicker(averagePeriodLength) + defer ticker.Stop() + + startTime := time.Now() + a := &s.average + defer a.stopped.Done() + for { + select { + case now := <-ticker.C: + a.mu.Lock() + var elapsed float64 + if a.lpTime.IsZero() { + elapsed = now.Sub(startTime).Seconds() + } else { + elapsed = now.Sub(a.lpTime).Seconds() + } + avg := 0.0 + if elapsed > 0 { + avg = float64(a.lpBytes) / elapsed + } + if period < averagePeriod { + period++ + } + a.speed = (avg + a.speed*(period-1)) / period + a.lpBytes = 0 + a.lpTime = now + a.mu.Unlock() + case <-a.stop: + return + } + } +} + +func (s *StatsInfo) startAverageLoop() { + s.average.startOnce.Do(func() { + s.average.stopped.Add(1) + go s.averageLoop() + }) +} + +func (s *StatsInfo) stopAverageLoop() { + s.average.stopOnce.Do(func() { + close(s.average.stop) + s.average.stopped.Wait() + }) +} + // String convert the StatsInfo to a string for printing func (s *StatsInfo) String() string { // NB if adding more stats in here, remember to add them into @@ -424,6 +486,10 @@ func (s *StatsInfo) Log() { // Bytes updates the stats for bytes bytes func (s *StatsInfo) Bytes(bytes int64) { + s.average.mu.Lock() + s.average.lpBytes += bytes + s.average.mu.Unlock() + s.mu.Lock() defer s.mu.Unlock() s.bytes += bytes @@ -549,6 +615,9 @@ func (s *StatsInfo) ResetCounters() { s.renames = 0 s.startedTransfers = nil s.oldDuration = 0 + + s.stopAverageLoop() + s.average = averageValues{stop: make(chan bool)} } // ResetErrors sets the errors count to 0 and resets lastError, fatalError and retryError @@ -629,6 +698,7 @@ func (s *StatsInfo) GetTransfers() int64 { func (s *StatsInfo) NewTransfer(obj fs.Object) *Transfer { tr := newTransfer(s, obj) s.transferring.add(tr) + s.startAverageLoop() return tr } @@ -636,6 +706,7 @@ func (s *StatsInfo) NewTransfer(obj fs.Object) *Transfer { func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64) *Transfer { tr := newTransferRemoteSize(s, remote, size, false) s.transferring.add(tr) + s.startAverageLoop() return tr } @@ -649,6 +720,9 @@ func (s *StatsInfo) DoneTransferring(remote string, ok bool) { s.transfers++ s.mu.Unlock() } + if s.transferring.empty() { + time.AfterFunc(averageStopAfter, s.stopAverageLoop) + } } // SetCheckQueue sets the number of queued checks diff --git a/fs/accounting/stats_groups.go b/fs/accounting/stats_groups.go index 9c293e8b8..421fabfc3 100644 --- a/fs/accounting/stats_groups.go +++ b/fs/accounting/stats_groups.go @@ -2,9 +2,10 @@ package accounting import ( "context" - "github.com/rclone/rclone/fs/rc" "sync" + "github.com/rclone/rclone/fs/rc" + "github.com/rclone/rclone/fs" ) diff --git a/go.sum b/go.sum index 84bdc7e9a..cd83a4556 100644 --- a/go.sum +++ b/go.sum @@ -479,8 +479,6 @@ github.com/ncw/swift/v2 v2.0.0/go.mod h1:z0A9RVdYPjNjXVo2pDOPxZ4eu3oarO1P91fTItc github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/nsf/termbox-go v1.1.0 h1:R+GIXVMaDxDQ2VHem5vO5h0mI8ZxLECTUNw1ZzXODzI= -github.com/nsf/termbox-go v1.1.0/go.mod h1:T0cTdVuOwf7pHQNtfhnEbzHbcNyCEcVU4YPpouCbVxo= github.com/nsf/termbox-go v1.1.1-0.20210421210813-2ff630277754 h1:4x51LJd1K+SE/z5cvh711BBMMAS9nHr9V3sXdns3HWQ= github.com/nsf/termbox-go v1.1.1-0.20210421210813-2ff630277754/go.mod h1:T0cTdVuOwf7pHQNtfhnEbzHbcNyCEcVU4YPpouCbVxo= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= @@ -522,8 +520,6 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= -github.com/pkg/sftp v1.12.0 h1:/f3b24xrDhkhddlaobPe2JgBqfdt+gC/NYl0QY9IOuI= -github.com/pkg/sftp v1.12.0/go.mod h1:fUqqXB5vEgVCZ131L+9say31RAri6aF6KDViawhxKK8= github.com/pkg/sftp v1.13.1-0.20210424083437-2b80967078b8 h1:0sHotAvxm+h6PnYq2sz+xbbmPyzCMsubDzTMqT1Gbkw= github.com/pkg/sftp v1.13.1-0.20210424083437-2b80967078b8/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -602,7 +598,6 @@ github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIK github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= -github.com/spacemonkeygo/monkit v0.0.0-20190623001553-09813957f0a8 h1:sO833wsQOMWK0ZW68mWYVhRwlcUaL125bUh1LeR7vxI= github.com/spacemonkeygo/monkit/v3 v3.0.4/go.mod h1:JcK1pCbReQsOsMKF/POFSZCq7drXFybgGmbc27tuwes= github.com/spacemonkeygo/monkit/v3 v3.0.7/go.mod h1:kj1ViJhlyADa7DiA4xVnTuPA46lFKbM7mxQTrXCuJP4= github.com/spacemonkeygo/monkit/v3 v3.0.10/go.mod h1:kj1ViJhlyADa7DiA4xVnTuPA46lFKbM7mxQTrXCuJP4= @@ -721,12 +716,9 @@ golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200117160349-530e935923ad/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= -golang.org/x/crypto v0.0.0-20210415154028-4f45737414dc h1:+q90ECDSAQirdykUN6sPEiBXBsp8Csjcca8Oy7bgLTA= -golang.org/x/crypto v0.0.0-20210415154028-4f45737414dc/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b h1:7mWr3k41Qtv8XlltBkDkl8LoP3mpSgBW8BUoxtEdbXg= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -901,8 +893,6 @@ golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210419170143-37df388d1f33 h1:zah5VTTvBlVRELjcDwGLLaWRHZJQsBtplweVYCii0KM= -golang.org/x/sys v0.0.0-20210419170143-37df388d1f33/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw= golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=