mirror of
https://github.com/rclone/rclone.git
synced 2025-01-08 00:23:49 +08:00
45255bccb3
In 04aa6969a4
we updated the displayed speed to be a rolling
average in core/stats and the progress output but we didn't update the
Prometheus metrics.
This patch updates the Prometheus metrics too.
Fixes #7053
859 lines
22 KiB
Go
859 lines
22 KiB
Go
package accounting
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/fserrors"
|
|
"github.com/rclone/rclone/fs/rc"
|
|
"github.com/rclone/rclone/lib/terminal"
|
|
)
|
|
|
|
const (
|
|
averagePeriodLength = time.Second
|
|
averageStopAfter = time.Minute
|
|
)
|
|
|
|
// MaxCompletedTransfers specifies maximum number of completed transfers in startedTransfers list
|
|
var MaxCompletedTransfers = 100
|
|
|
|
// StatsInfo accounts all transfers
|
|
// N.B.: if this struct is modified, please remember to also update sum() function in stats_groups
|
|
// to correctly count the updated fields
|
|
type StatsInfo struct {
|
|
mu sync.RWMutex
|
|
ctx context.Context
|
|
ci *fs.ConfigInfo
|
|
bytes int64
|
|
errors int64
|
|
lastError error
|
|
fatalError bool
|
|
retryError bool
|
|
retryAfter time.Time
|
|
checks int64
|
|
checking *transferMap
|
|
checkQueue int
|
|
checkQueueSize int64
|
|
transfers int64
|
|
transferring *transferMap
|
|
transferQueue int
|
|
transferQueueSize int64
|
|
renames int64
|
|
renameQueue int
|
|
renameQueueSize int64
|
|
deletes int64
|
|
deletesSize int64
|
|
deletedDirs int64
|
|
inProgress *inProgress
|
|
startedTransfers []*Transfer // currently active transfers
|
|
oldTimeRanges timeRanges // a merged list of time ranges for the transfers
|
|
oldDuration time.Duration // duration of transfers we have culled
|
|
group string
|
|
startTime time.Time // the moment these stats were initialized or reset
|
|
average averageValues
|
|
}
|
|
|
|
type averageValues struct {
|
|
mu sync.Mutex
|
|
lpBytes int64
|
|
lpTime time.Time
|
|
speed float64
|
|
stop chan bool
|
|
stopped sync.WaitGroup
|
|
startOnce sync.Once
|
|
stopOnce sync.Once
|
|
}
|
|
|
|
// NewStats creates an initialised StatsInfo
|
|
func NewStats(ctx context.Context) *StatsInfo {
|
|
ci := fs.GetConfig(ctx)
|
|
return &StatsInfo{
|
|
ctx: ctx,
|
|
ci: ci,
|
|
checking: newTransferMap(ci.Checkers, "checking"),
|
|
transferring: newTransferMap(ci.Transfers, "transferring"),
|
|
inProgress: newInProgress(ctx),
|
|
startTime: time.Now(),
|
|
average: averageValues{stop: make(chan bool)},
|
|
}
|
|
}
|
|
|
|
// RemoteStats returns stats for rc
|
|
func (s *StatsInfo) RemoteStats() (out rc.Params, err error) {
|
|
// NB if adding values here - make sure you update the docs in
|
|
// stats_groups.go
|
|
|
|
out = make(rc.Params)
|
|
|
|
ts := s.calculateTransferStats()
|
|
out["totalChecks"] = ts.totalChecks
|
|
out["totalTransfers"] = ts.totalTransfers
|
|
out["totalBytes"] = ts.totalBytes
|
|
out["transferTime"] = ts.transferTime
|
|
out["speed"] = ts.speed
|
|
|
|
s.mu.RLock()
|
|
out["bytes"] = s.bytes
|
|
out["errors"] = s.errors
|
|
out["fatalError"] = s.fatalError
|
|
out["retryError"] = s.retryError
|
|
out["checks"] = s.checks
|
|
out["transfers"] = s.transfers
|
|
out["deletes"] = s.deletes
|
|
out["deletedDirs"] = s.deletedDirs
|
|
out["renames"] = s.renames
|
|
out["elapsedTime"] = time.Since(s.startTime).Seconds()
|
|
eta, etaOK := eta(s.bytes, ts.totalBytes, ts.speed)
|
|
if etaOK {
|
|
out["eta"] = eta.Seconds()
|
|
} else {
|
|
out["eta"] = nil
|
|
}
|
|
s.mu.RUnlock()
|
|
|
|
if !s.checking.empty() {
|
|
out["checking"] = s.checking.remotes()
|
|
}
|
|
if !s.transferring.empty() {
|
|
out["transferring"] = s.transferring.rcStats(s.inProgress)
|
|
}
|
|
if s.errors > 0 {
|
|
out["lastError"] = s.lastError.Error()
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
// speed returns the average speed of the transfer in bytes/second
|
|
//
|
|
// Call with lock held
|
|
func (s *StatsInfo) speed() float64 {
|
|
return s.average.speed
|
|
}
|
|
|
|
// timeRange is a start and end time of a transfer
|
|
type timeRange struct {
|
|
start time.Time
|
|
end time.Time
|
|
}
|
|
|
|
// timeRanges is a list of non-overlapping start and end times for
|
|
// transfers
|
|
type timeRanges []timeRange
|
|
|
|
// merge all the overlapping time ranges
|
|
func (trs *timeRanges) merge() {
|
|
Trs := *trs
|
|
|
|
// Sort by the starting time.
|
|
sort.Slice(Trs, func(i, j int) bool {
|
|
return Trs[i].start.Before(Trs[j].start)
|
|
})
|
|
|
|
// Merge overlaps and add distinctive ranges together
|
|
var (
|
|
newTrs = Trs[:0]
|
|
i, j = 0, 1
|
|
)
|
|
for i < len(Trs) {
|
|
if j < len(Trs) {
|
|
if !Trs[i].end.Before(Trs[j].start) {
|
|
if Trs[i].end.Before(Trs[j].end) {
|
|
Trs[i].end = Trs[j].end
|
|
}
|
|
j++
|
|
continue
|
|
}
|
|
}
|
|
newTrs = append(newTrs, Trs[i])
|
|
i = j
|
|
j++
|
|
}
|
|
|
|
*trs = newTrs
|
|
}
|
|
|
|
// cull remove any ranges whose start and end are before cutoff
|
|
// returning their duration sum
|
|
func (trs *timeRanges) cull(cutoff time.Time) (d time.Duration) {
|
|
var newTrs = (*trs)[:0]
|
|
for _, tr := range *trs {
|
|
if cutoff.Before(tr.start) || cutoff.Before(tr.end) {
|
|
newTrs = append(newTrs, tr)
|
|
} else {
|
|
d += tr.end.Sub(tr.start)
|
|
}
|
|
}
|
|
*trs = newTrs
|
|
return d
|
|
}
|
|
|
|
// total the time out of the time ranges
|
|
func (trs timeRanges) total() (total time.Duration) {
|
|
for _, tr := range trs {
|
|
total += tr.end.Sub(tr.start)
|
|
}
|
|
return total
|
|
}
|
|
|
|
// Total duration is union of durations of all transfers belonging to this
|
|
// object.
|
|
// Needs to be protected by mutex.
|
|
func (s *StatsInfo) totalDuration() time.Duration {
|
|
// copy of s.oldTimeRanges with extra room for the current transfers
|
|
timeRanges := make(timeRanges, len(s.oldTimeRanges), len(s.oldTimeRanges)+len(s.startedTransfers))
|
|
copy(timeRanges, s.oldTimeRanges)
|
|
|
|
// Extract time ranges of all transfers.
|
|
now := time.Now()
|
|
for i := range s.startedTransfers {
|
|
start, end := s.startedTransfers[i].TimeRange()
|
|
if end.IsZero() {
|
|
end = now
|
|
}
|
|
timeRanges = append(timeRanges, timeRange{start, end})
|
|
}
|
|
|
|
timeRanges.merge()
|
|
return s.oldDuration + timeRanges.total()
|
|
}
|
|
|
|
const (
|
|
etaMaxSeconds = (1<<63 - 1) / int64(time.Second) // Largest possible ETA as number of seconds
|
|
etaMax = time.Duration(etaMaxSeconds) * time.Second // Largest possible ETA, which is in second precision, representing "292y24w3d23h47m16s"
|
|
)
|
|
|
|
// eta returns the ETA of the current operation,
|
|
// rounded to full seconds.
|
|
// If the ETA cannot be determined 'ok' returns false.
|
|
func eta(size, total int64, rate float64) (eta time.Duration, ok bool) {
|
|
if total <= 0 || size < 0 || rate <= 0 {
|
|
return 0, false
|
|
}
|
|
remaining := total - size
|
|
if remaining < 0 {
|
|
return 0, false
|
|
}
|
|
seconds := int64(float64(remaining) / rate)
|
|
if seconds < 0 {
|
|
// Got Int64 overflow
|
|
eta = etaMax
|
|
} else if seconds >= etaMaxSeconds {
|
|
// Would get Int64 overflow if converting from seconds to Duration (nanoseconds)
|
|
eta = etaMax
|
|
} else {
|
|
eta = time.Duration(seconds) * time.Second
|
|
}
|
|
return eta, true
|
|
}
|
|
|
|
// etaString returns the ETA of the current operation,
|
|
// rounded to full seconds.
|
|
// If the ETA cannot be determined it returns "-"
|
|
func etaString(done, total int64, rate float64) string {
|
|
d, ok := eta(done, total, rate)
|
|
if !ok {
|
|
return "-"
|
|
}
|
|
if d == etaMax {
|
|
return "-"
|
|
}
|
|
return fs.Duration(d).ShortReadableString()
|
|
}
|
|
|
|
// percent returns a/b as a percentage rounded to the nearest integer
|
|
// as a string
|
|
//
|
|
// if the percentage is invalid it returns "-"
|
|
func percent(a int64, b int64) string {
|
|
if a < 0 || b <= 0 {
|
|
return "-"
|
|
}
|
|
return fmt.Sprintf("%d%%", int(float64(a)*100/float64(b)+0.5))
|
|
}
|
|
|
|
// returned from calculateTransferStats
|
|
type transferStats struct {
|
|
totalChecks int64
|
|
totalTransfers int64
|
|
totalBytes int64
|
|
transferTime float64
|
|
speed float64
|
|
}
|
|
|
|
// calculateTransferStats calculates some additional transfer stats not
|
|
// stored directly in StatsInfo
|
|
func (s *StatsInfo) calculateTransferStats() (ts transferStats) {
|
|
// checking and transferring have their own locking so read
|
|
// here before lock to prevent deadlock on GetBytes
|
|
transferring, checking := s.transferring.count(), s.checking.count()
|
|
transferringBytesDone, transferringBytesTotal := s.transferring.progress(s)
|
|
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
ts.totalChecks = int64(s.checkQueue) + s.checks + int64(checking)
|
|
ts.totalTransfers = int64(s.transferQueue) + s.transfers + int64(transferring)
|
|
// note that s.bytes already includes transferringBytesDone so
|
|
// we take it off here to avoid double counting
|
|
ts.totalBytes = s.transferQueueSize + s.bytes + transferringBytesTotal - transferringBytesDone
|
|
ts.speed = s.average.speed
|
|
|
|
return ts
|
|
}
|
|
|
|
func (s *StatsInfo) averageLoop() {
|
|
var period float64
|
|
|
|
ticker := time.NewTicker(averagePeriodLength)
|
|
defer ticker.Stop()
|
|
|
|
startTime := time.Now()
|
|
a := &s.average
|
|
defer a.stopped.Done()
|
|
for {
|
|
select {
|
|
case now := <-ticker.C:
|
|
a.mu.Lock()
|
|
var elapsed float64
|
|
if a.lpTime.IsZero() {
|
|
elapsed = now.Sub(startTime).Seconds()
|
|
} else {
|
|
elapsed = now.Sub(a.lpTime).Seconds()
|
|
}
|
|
avg := 0.0
|
|
if elapsed > 0 {
|
|
avg = float64(a.lpBytes) / elapsed
|
|
}
|
|
if period < averagePeriod {
|
|
period++
|
|
}
|
|
a.speed = (avg + a.speed*(period-1)) / period
|
|
a.lpBytes = 0
|
|
a.lpTime = now
|
|
a.mu.Unlock()
|
|
case <-a.stop:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *StatsInfo) startAverageLoop() {
|
|
s.average.startOnce.Do(func() {
|
|
s.average.stopped.Add(1)
|
|
go s.averageLoop()
|
|
})
|
|
}
|
|
|
|
func (s *StatsInfo) stopAverageLoop() {
|
|
s.average.stopOnce.Do(func() {
|
|
close(s.average.stop)
|
|
s.average.stopped.Wait()
|
|
})
|
|
}
|
|
|
|
// String convert the StatsInfo to a string for printing
|
|
func (s *StatsInfo) String() string {
|
|
// NB if adding more stats in here, remember to add them into
|
|
// RemoteStats() too.
|
|
|
|
ts := s.calculateTransferStats()
|
|
|
|
s.mu.RLock()
|
|
|
|
var (
|
|
buf = &bytes.Buffer{}
|
|
xfrchkString = ""
|
|
dateString = ""
|
|
elapsedTime = time.Since(s.startTime)
|
|
elapsedTimeSecondsOnly = elapsedTime.Truncate(time.Second/10) % time.Minute
|
|
displaySpeedString string
|
|
)
|
|
|
|
if s.ci.DataRateUnit == "bits" {
|
|
displaySpeedString = fs.SizeSuffix(ts.speed * 8).BitRateUnit()
|
|
} else {
|
|
displaySpeedString = fs.SizeSuffix(ts.speed).ByteRateUnit()
|
|
}
|
|
|
|
if !s.ci.StatsOneLine {
|
|
_, _ = fmt.Fprintf(buf, "\nTransferred: ")
|
|
} else {
|
|
xfrchk := []string{}
|
|
if ts.totalTransfers > 0 && s.transferQueue > 0 {
|
|
xfrchk = append(xfrchk, fmt.Sprintf("xfr#%d/%d", s.transfers, ts.totalTransfers))
|
|
}
|
|
if ts.totalChecks > 0 && s.checkQueue > 0 {
|
|
xfrchk = append(xfrchk, fmt.Sprintf("chk#%d/%d", s.checks, ts.totalChecks))
|
|
}
|
|
if len(xfrchk) > 0 {
|
|
xfrchkString = fmt.Sprintf(" (%s)", strings.Join(xfrchk, ", "))
|
|
}
|
|
if s.ci.StatsOneLineDate {
|
|
t := time.Now()
|
|
dateString = t.Format(s.ci.StatsOneLineDateFormat) // Including the separator so people can customize it
|
|
}
|
|
}
|
|
|
|
_, _ = fmt.Fprintf(buf, "%s%13s / %s, %s, %s, ETA %s%s",
|
|
dateString,
|
|
fs.SizeSuffix(s.bytes).ByteUnit(),
|
|
fs.SizeSuffix(ts.totalBytes).ByteUnit(),
|
|
percent(s.bytes, ts.totalBytes),
|
|
displaySpeedString,
|
|
etaString(s.bytes, ts.totalBytes, ts.speed),
|
|
xfrchkString,
|
|
)
|
|
|
|
if s.ci.ProgressTerminalTitle {
|
|
// Writes ETA to the terminal title
|
|
terminal.WriteTerminalTitle("ETA: " + etaString(s.bytes, ts.totalBytes, ts.speed))
|
|
}
|
|
|
|
if !s.ci.StatsOneLine {
|
|
_, _ = buf.WriteRune('\n')
|
|
errorDetails := ""
|
|
switch {
|
|
case s.fatalError:
|
|
errorDetails = " (fatal error encountered)"
|
|
case s.retryError:
|
|
errorDetails = " (retrying may help)"
|
|
case s.errors != 0:
|
|
errorDetails = " (no need to retry)"
|
|
|
|
}
|
|
|
|
// Add only non zero stats
|
|
if s.errors != 0 {
|
|
_, _ = fmt.Fprintf(buf, "Errors: %10d%s\n",
|
|
s.errors, errorDetails)
|
|
}
|
|
if s.checks != 0 || ts.totalChecks != 0 {
|
|
_, _ = fmt.Fprintf(buf, "Checks: %10d / %d, %s\n",
|
|
s.checks, ts.totalChecks, percent(s.checks, ts.totalChecks))
|
|
}
|
|
if s.deletes != 0 || s.deletedDirs != 0 {
|
|
_, _ = fmt.Fprintf(buf, "Deleted: %10d (files), %d (dirs)\n", s.deletes, s.deletedDirs)
|
|
}
|
|
if s.renames != 0 {
|
|
_, _ = fmt.Fprintf(buf, "Renamed: %10d\n", s.renames)
|
|
}
|
|
if s.transfers != 0 || ts.totalTransfers != 0 {
|
|
_, _ = fmt.Fprintf(buf, "Transferred: %10d / %d, %s\n",
|
|
s.transfers, ts.totalTransfers, percent(s.transfers, ts.totalTransfers))
|
|
}
|
|
_, _ = fmt.Fprintf(buf, "Elapsed time: %10ss\n", strings.TrimRight(fs.Duration(elapsedTime.Truncate(time.Minute)).ReadableString(), "0s")+fmt.Sprintf("%.1f", elapsedTimeSecondsOnly.Seconds()))
|
|
}
|
|
|
|
// checking and transferring have their own locking so unlock
|
|
// here to prevent deadlock on GetBytes
|
|
s.mu.RUnlock()
|
|
|
|
// Add per transfer stats if required
|
|
if !s.ci.StatsOneLine {
|
|
if !s.checking.empty() {
|
|
_, _ = fmt.Fprintf(buf, "Checking:\n%s\n", s.checking.String(s.ctx, s.inProgress, s.transferring))
|
|
}
|
|
if !s.transferring.empty() {
|
|
_, _ = fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring.String(s.ctx, s.inProgress, nil))
|
|
}
|
|
}
|
|
|
|
return buf.String()
|
|
}
|
|
|
|
// Transferred returns list of all completed transfers including checked and
|
|
// failed ones.
|
|
func (s *StatsInfo) Transferred() []TransferSnapshot {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
ts := make([]TransferSnapshot, 0, len(s.startedTransfers))
|
|
|
|
for _, tr := range s.startedTransfers {
|
|
if tr.IsDone() {
|
|
ts = append(ts, tr.Snapshot())
|
|
}
|
|
}
|
|
|
|
return ts
|
|
}
|
|
|
|
// Log outputs the StatsInfo to the log
|
|
func (s *StatsInfo) Log() {
|
|
if s.ci.UseJSONLog {
|
|
out, _ := s.RemoteStats()
|
|
fs.LogLevelPrintf(s.ci.StatsLogLevel, nil, "%v%v\n", s, fs.LogValueHide("stats", out))
|
|
} else {
|
|
fs.LogLevelPrintf(s.ci.StatsLogLevel, nil, "%v\n", s)
|
|
}
|
|
|
|
}
|
|
|
|
// Bytes updates the stats for bytes bytes
|
|
func (s *StatsInfo) Bytes(bytes int64) {
|
|
s.average.mu.Lock()
|
|
s.average.lpBytes += bytes
|
|
s.average.mu.Unlock()
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.bytes += bytes
|
|
}
|
|
|
|
// GetBytes returns the number of bytes transferred so far
|
|
func (s *StatsInfo) GetBytes() int64 {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.bytes
|
|
}
|
|
|
|
// GetBytesWithPending returns the number of bytes transferred and remaining transfers
|
|
func (s *StatsInfo) GetBytesWithPending() int64 {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
pending := int64(0)
|
|
for _, tr := range s.startedTransfers {
|
|
if tr.acc != nil {
|
|
bytes, size := tr.acc.progress()
|
|
if bytes < size {
|
|
pending += size - bytes
|
|
}
|
|
}
|
|
}
|
|
return s.bytes + pending
|
|
}
|
|
|
|
// Errors updates the stats for errors
|
|
func (s *StatsInfo) Errors(errors int64) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.errors += errors
|
|
}
|
|
|
|
// GetErrors reads the number of errors
|
|
func (s *StatsInfo) GetErrors() int64 {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.errors
|
|
}
|
|
|
|
// GetLastError returns the lastError
|
|
func (s *StatsInfo) GetLastError() error {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.lastError
|
|
}
|
|
|
|
// GetChecks returns the number of checks
|
|
func (s *StatsInfo) GetChecks() int64 {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.checks
|
|
}
|
|
|
|
// FatalError sets the fatalError flag
|
|
func (s *StatsInfo) FatalError() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.fatalError = true
|
|
}
|
|
|
|
// HadFatalError returns whether there has been at least one FatalError
|
|
func (s *StatsInfo) HadFatalError() bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.fatalError
|
|
}
|
|
|
|
// RetryError sets the retryError flag
|
|
func (s *StatsInfo) RetryError() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.retryError = true
|
|
}
|
|
|
|
// HadRetryError returns whether there has been at least one non-NoRetryError
|
|
func (s *StatsInfo) HadRetryError() bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.retryError
|
|
}
|
|
|
|
var (
|
|
errMaxDelete = fserrors.FatalError(errors.New("--max-delete threshold reached"))
|
|
errMaxDeleteSize = fserrors.FatalError(errors.New("--max-delete-size threshold reached"))
|
|
)
|
|
|
|
// DeleteFile updates the stats for deleting a file
|
|
//
|
|
// It may return fatal errors if the threshold for --max-delete or
|
|
// --max-delete-size have been reached.
|
|
func (s *StatsInfo) DeleteFile(ctx context.Context, size int64) error {
|
|
ci := fs.GetConfig(ctx)
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if size < 0 {
|
|
size = 0
|
|
}
|
|
if ci.MaxDelete >= 0 && s.deletes+1 > ci.MaxDelete {
|
|
return errMaxDelete
|
|
}
|
|
if ci.MaxDeleteSize >= 0 && s.deletesSize+size > int64(ci.MaxDeleteSize) {
|
|
return errMaxDeleteSize
|
|
}
|
|
s.deletes++
|
|
s.deletesSize += size
|
|
return nil
|
|
}
|
|
|
|
// GetDeletes returns the number of deletes
|
|
func (s *StatsInfo) GetDeletes() int64 {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.deletes
|
|
}
|
|
|
|
// DeletedDirs updates the stats for deletedDirs
|
|
func (s *StatsInfo) DeletedDirs(deletedDirs int64) int64 {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.deletedDirs += deletedDirs
|
|
return s.deletedDirs
|
|
}
|
|
|
|
// Renames updates the stats for renames
|
|
func (s *StatsInfo) Renames(renames int64) int64 {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.renames += renames
|
|
return s.renames
|
|
}
|
|
|
|
// ResetCounters sets the counters (bytes, checks, errors, transfers, deletes, renames) to 0 and resets lastError, fatalError and retryError
|
|
func (s *StatsInfo) ResetCounters() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.bytes = 0
|
|
s.errors = 0
|
|
s.lastError = nil
|
|
s.fatalError = false
|
|
s.retryError = false
|
|
s.retryAfter = time.Time{}
|
|
s.checks = 0
|
|
s.transfers = 0
|
|
s.deletes = 0
|
|
s.deletesSize = 0
|
|
s.deletedDirs = 0
|
|
s.renames = 0
|
|
s.startedTransfers = nil
|
|
s.oldDuration = 0
|
|
|
|
s.stopAverageLoop()
|
|
s.average = averageValues{stop: make(chan bool)}
|
|
}
|
|
|
|
// ResetErrors sets the errors count to 0 and resets lastError, fatalError and retryError
|
|
func (s *StatsInfo) ResetErrors() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.errors = 0
|
|
s.lastError = nil
|
|
s.fatalError = false
|
|
s.retryError = false
|
|
s.retryAfter = time.Time{}
|
|
}
|
|
|
|
// Errored returns whether there have been any errors
|
|
func (s *StatsInfo) Errored() bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.errors != 0
|
|
}
|
|
|
|
// Error adds a single error into the stats, assigns lastError and eventually sets fatalError or retryError
|
|
func (s *StatsInfo) Error(err error) error {
|
|
if err == nil || fserrors.IsCounted(err) {
|
|
return err
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.errors++
|
|
s.lastError = err
|
|
err = fserrors.FsError(err)
|
|
fserrors.Count(err)
|
|
switch {
|
|
case fserrors.IsFatalError(err):
|
|
s.fatalError = true
|
|
case fserrors.IsRetryAfterError(err):
|
|
retryAfter := fserrors.RetryAfterErrorTime(err)
|
|
if s.retryAfter.IsZero() || retryAfter.Sub(s.retryAfter) > 0 {
|
|
s.retryAfter = retryAfter
|
|
}
|
|
s.retryError = true
|
|
case !fserrors.IsNoRetryError(err):
|
|
s.retryError = true
|
|
}
|
|
return err
|
|
}
|
|
|
|
// RetryAfter returns the time to retry after if it is set. It will
|
|
// be Zero if it isn't set.
|
|
func (s *StatsInfo) RetryAfter() time.Time {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.retryAfter
|
|
}
|
|
|
|
// NewCheckingTransfer adds a checking transfer to the stats, from the object.
|
|
func (s *StatsInfo) NewCheckingTransfer(obj fs.DirEntry, what string) *Transfer {
|
|
tr := newCheckingTransfer(s, obj, what)
|
|
s.checking.add(tr)
|
|
return tr
|
|
}
|
|
|
|
// DoneChecking removes a check from the stats
|
|
func (s *StatsInfo) DoneChecking(remote string) {
|
|
s.checking.del(remote)
|
|
s.mu.Lock()
|
|
s.checks++
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// GetTransfers reads the number of transfers
|
|
func (s *StatsInfo) GetTransfers() int64 {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.transfers
|
|
}
|
|
|
|
// NewTransfer adds a transfer to the stats from the object.
|
|
func (s *StatsInfo) NewTransfer(obj fs.DirEntry) *Transfer {
|
|
tr := newTransfer(s, obj)
|
|
s.transferring.add(tr)
|
|
s.startAverageLoop()
|
|
return tr
|
|
}
|
|
|
|
// NewTransferRemoteSize adds a transfer to the stats based on remote and size.
|
|
func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64) *Transfer {
|
|
tr := newTransferRemoteSize(s, remote, size, false, "")
|
|
s.transferring.add(tr)
|
|
s.startAverageLoop()
|
|
return tr
|
|
}
|
|
|
|
// DoneTransferring removes a transfer from the stats
|
|
//
|
|
// if ok is true and it was in the transfermap (to avoid incrementing in case of nested calls, #6213) then it increments the transfers count
|
|
func (s *StatsInfo) DoneTransferring(remote string, ok bool) {
|
|
existed := s.transferring.del(remote)
|
|
if ok && existed {
|
|
s.mu.Lock()
|
|
s.transfers++
|
|
s.mu.Unlock()
|
|
}
|
|
if s.transferring.empty() {
|
|
time.AfterFunc(averageStopAfter, s.stopAverageLoop)
|
|
}
|
|
}
|
|
|
|
// SetCheckQueue sets the number of queued checks
|
|
func (s *StatsInfo) SetCheckQueue(n int, size int64) {
|
|
s.mu.Lock()
|
|
s.checkQueue = n
|
|
s.checkQueueSize = size
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// SetTransferQueue sets the number of queued transfers
|
|
func (s *StatsInfo) SetTransferQueue(n int, size int64) {
|
|
s.mu.Lock()
|
|
s.transferQueue = n
|
|
s.transferQueueSize = size
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// SetRenameQueue sets the number of queued transfers
|
|
func (s *StatsInfo) SetRenameQueue(n int, size int64) {
|
|
s.mu.Lock()
|
|
s.renameQueue = n
|
|
s.renameQueueSize = size
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// AddTransfer adds reference to the started transfer.
|
|
func (s *StatsInfo) AddTransfer(transfer *Transfer) {
|
|
s.mu.Lock()
|
|
s.startedTransfers = append(s.startedTransfers, transfer)
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// removeTransfer removes a reference to the started transfer in
|
|
// position i.
|
|
//
|
|
// Must be called with the lock held
|
|
func (s *StatsInfo) removeTransfer(transfer *Transfer, i int) {
|
|
now := time.Now()
|
|
|
|
// add finished transfer onto old time ranges
|
|
start, end := transfer.TimeRange()
|
|
if end.IsZero() {
|
|
end = now
|
|
}
|
|
s.oldTimeRanges = append(s.oldTimeRanges, timeRange{start, end})
|
|
s.oldTimeRanges.merge()
|
|
|
|
// remove the found entry
|
|
s.startedTransfers = append(s.startedTransfers[:i], s.startedTransfers[i+1:]...)
|
|
|
|
// Find youngest active transfer
|
|
oldestStart := now
|
|
for i := range s.startedTransfers {
|
|
start, _ := s.startedTransfers[i].TimeRange()
|
|
if start.Before(oldestStart) {
|
|
oldestStart = start
|
|
}
|
|
}
|
|
|
|
// remove old entries older than that
|
|
s.oldDuration += s.oldTimeRanges.cull(oldestStart)
|
|
}
|
|
|
|
// RemoveTransfer removes a reference to the started transfer.
|
|
func (s *StatsInfo) RemoveTransfer(transfer *Transfer) {
|
|
s.mu.Lock()
|
|
for i, tr := range s.startedTransfers {
|
|
if tr == transfer {
|
|
s.removeTransfer(tr, i)
|
|
break
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// PruneTransfers makes sure there aren't too many old transfers by removing
|
|
// single finished transfer.
|
|
func (s *StatsInfo) PruneTransfers() {
|
|
if MaxCompletedTransfers < 0 {
|
|
return
|
|
}
|
|
s.mu.Lock()
|
|
// remove a transfer from the start if we are over quota
|
|
if len(s.startedTransfers) > MaxCompletedTransfers+s.ci.Transfers {
|
|
for i, tr := range s.startedTransfers {
|
|
if tr.IsDone() {
|
|
s.removeTransfer(tr, i)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
}
|