diff --git a/fs/accounting/accounting.go b/fs/accounting/accounting.go index 2f9e525fa..08a2d77a2 100644 --- a/fs/accounting/accounting.go +++ b/fs/accounting/accounting.go @@ -2,342 +2,16 @@ package accounting import ( - "bytes" "fmt" "io" - "sort" - "strings" "sync" "time" "github.com/VividCortex/ewma" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs/asyncreader" - "golang.org/x/net/context" // switch to "context" when we stop supporting go1.6 - "golang.org/x/time/rate" ) -// Globals -var ( - Stats = NewStats() - tokenBucketMu sync.Mutex // protects the token bucket variables - tokenBucket *rate.Limiter - prevTokenBucket = tokenBucket - bwLimitToggledOff = false - currLimitMu sync.Mutex // protects changes to the timeslot - currLimit fs.BwTimeSlot -) - -func init() { - // Set the function pointer up in fs - fs.CountError = Stats.Error -} - -const maxBurstSize = 1 * 1024 * 1024 // must be bigger than the biggest request - -// make a new empty token bucket with the bandwidth given -func newTokenBucket(bandwidth fs.SizeSuffix) *rate.Limiter { - newTokenBucket := rate.NewLimiter(rate.Limit(bandwidth), maxBurstSize) - // empty the bucket - err := newTokenBucket.WaitN(context.Background(), maxBurstSize) - if err != nil { - fs.Errorf(nil, "Failed to empty token bucket: %v", err) - } - return newTokenBucket -} - -// StartTokenBucket starts the token bucket if necessary -func StartTokenBucket() { - currLimitMu.Lock() - currLimit := fs.Config.BwLimit.LimitAt(time.Now()) - currLimitMu.Unlock() - - if currLimit.Bandwidth > 0 { - tokenBucket = newTokenBucket(currLimit.Bandwidth) - fs.Infof(nil, "Starting bandwidth limiter at %vBytes/s", &currLimit.Bandwidth) - - // Start the SIGUSR2 signal handler to toggle bandwidth. - // This function does nothing in windows systems. - startSignalHandler() - } -} - -// StartTokenTicker creates a ticker to update the bandwidth limiter every minute. -func StartTokenTicker() { - // If the timetable has a single entry or was not specified, we don't need - // a ticker to update the bandwidth. - if len(fs.Config.BwLimit) <= 1 { - return - } - - ticker := time.NewTicker(time.Minute) - go func() { - for range ticker.C { - limitNow := fs.Config.BwLimit.LimitAt(time.Now()) - currLimitMu.Lock() - - if currLimit.Bandwidth != limitNow.Bandwidth { - tokenBucketMu.Lock() - - // If bwlimit is toggled off, the change should only - // become active on the next toggle, which causes - // an exchange of tokenBucket <-> prevTokenBucket - var targetBucket **rate.Limiter - if bwLimitToggledOff { - targetBucket = &prevTokenBucket - } else { - targetBucket = &tokenBucket - } - - // Set new bandwidth. If unlimited, set tokenbucket to nil. - if limitNow.Bandwidth > 0 { - *targetBucket = newTokenBucket(limitNow.Bandwidth) - if bwLimitToggledOff { - fs.Logf(nil, "Scheduled bandwidth change. "+ - "Limit will be set to %vBytes/s when toggled on again.", &limitNow.Bandwidth) - } else { - fs.Logf(nil, "Scheduled bandwidth change. Limit set to %vBytes/s", &limitNow.Bandwidth) - } - } else { - *targetBucket = nil - fs.Logf(nil, "Scheduled bandwidth change. Bandwidth limits disabled") - } - - currLimit = limitNow - tokenBucketMu.Unlock() - } - currLimitMu.Unlock() - } - }() -} - -// stringSet holds a set of strings -type stringSet map[string]struct{} - -// inProgress holds a synchronizes map of in progress transfers -type inProgress struct { - mu sync.Mutex - m map[string]*Account -} - -// newInProgress makes a new inProgress object -func newInProgress() *inProgress { - return &inProgress{ - m: make(map[string]*Account, fs.Config.Transfers), - } -} - -// set marks the name as in progress -func (ip *inProgress) set(name string, acc *Account) { - ip.mu.Lock() - defer ip.mu.Unlock() - ip.m[name] = acc -} - -// clear marks the name as no longer in progress -func (ip *inProgress) clear(name string) { - ip.mu.Lock() - defer ip.mu.Unlock() - delete(ip.m, name) -} - -// get gets the account for name, of nil if not found -func (ip *inProgress) get(name string) *Account { - ip.mu.Lock() - defer ip.mu.Unlock() - return ip.m[name] -} - -// Strings returns all the strings in the stringSet -func (ss stringSet) Strings() []string { - strings := make([]string, 0, len(ss)) - for name := range ss { - var out string - if acc := Stats.inProgress.get(name); acc != nil { - out = acc.String() - } else { - out = name - } - strings = append(strings, " * "+out) - } - sorted := sort.StringSlice(strings) - sorted.Sort() - return sorted -} - -// String returns all the file names in the stringSet joined by newline -func (ss stringSet) String() string { - return strings.Join(ss.Strings(), "\n") -} - -// StatsInfo limits and accounts all transfers -type StatsInfo struct { - lock sync.RWMutex - bytes int64 - errors int64 - lastError error - checks int64 - checking stringSet - transfers int64 - transferring stringSet - start time.Time - inProgress *inProgress -} - -// NewStats cretates an initialised StatsInfo -func NewStats() *StatsInfo { - return &StatsInfo{ - checking: make(stringSet, fs.Config.Checkers), - transferring: make(stringSet, fs.Config.Transfers), - start: time.Now(), - inProgress: newInProgress(), - } -} - -// String convert the StatsInfo to a string for printing -func (s *StatsInfo) String() string { - s.lock.RLock() - defer s.lock.RUnlock() - dt := time.Now().Sub(s.start) - dtSeconds := dt.Seconds() - speed := 0.0 - if dt > 0 { - speed = float64(s.bytes) / dtSeconds - } - dtRounded := dt - (dt % (time.Second / 10)) - buf := &bytes.Buffer{} - - if fs.Config.DataRateUnit == "bits" { - speed = speed * 8 - } - - fmt.Fprintf(buf, ` -Transferred: %10s (%s) -Errors: %10d -Checks: %10d -Transferred: %10d -Elapsed time: %10v -`, - fs.SizeSuffix(s.bytes).Unit("Bytes"), fs.SizeSuffix(speed).Unit(strings.Title(fs.Config.DataRateUnit)+"/s"), - s.errors, - s.checks, - s.transfers, - dtRounded) - if len(s.checking) > 0 { - fmt.Fprintf(buf, "Checking:\n%s\n", s.checking) - } - if len(s.transferring) > 0 { - fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring) - } - return buf.String() -} - -// Log outputs the StatsInfo to the log -func (s *StatsInfo) Log() { - fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "%v\n", s) -} - -// Bytes updates the stats for bytes bytes -func (s *StatsInfo) Bytes(bytes int64) { - s.lock.Lock() - defer s.lock.Unlock() - s.bytes += bytes -} - -// Errors updates the stats for errors -func (s *StatsInfo) Errors(errors int64) { - s.lock.Lock() - defer s.lock.Unlock() - s.errors += errors -} - -// GetErrors reads the number of errors -func (s *StatsInfo) GetErrors() int64 { - s.lock.RLock() - defer s.lock.RUnlock() - return s.errors -} - -// GetLastError returns the lastError -func (s *StatsInfo) GetLastError() error { - s.lock.RLock() - defer s.lock.RUnlock() - return s.lastError -} - -// ResetCounters sets the counters (bytes, checks, errors, transfers) to 0 -func (s *StatsInfo) ResetCounters() { - s.lock.RLock() - defer s.lock.RUnlock() - s.bytes = 0 - s.errors = 0 - s.checks = 0 - s.transfers = 0 -} - -// ResetErrors sets the errors count to 0 -func (s *StatsInfo) ResetErrors() { - s.lock.RLock() - defer s.lock.RUnlock() - s.errors = 0 -} - -// Errored returns whether there have been any errors -func (s *StatsInfo) Errored() bool { - s.lock.RLock() - defer s.lock.RUnlock() - return s.errors != 0 -} - -// Error adds a single error into the stats and assigns lastError -func (s *StatsInfo) Error(err error) { - s.lock.Lock() - defer s.lock.Unlock() - s.errors++ - s.lastError = err -} - -// Checking adds a check into the stats -func (s *StatsInfo) Checking(remote string) { - s.lock.Lock() - defer s.lock.Unlock() - s.checking[remote] = struct{}{} -} - -// DoneChecking removes a check from the stats -func (s *StatsInfo) DoneChecking(remote string) { - s.lock.Lock() - defer s.lock.Unlock() - delete(s.checking, remote) - s.checks++ -} - -// GetTransfers reads the number of transfers -func (s *StatsInfo) GetTransfers() int64 { - s.lock.RLock() - defer s.lock.RUnlock() - return s.transfers -} - -// Transferring adds a transfer into the stats -func (s *StatsInfo) Transferring(remote string) { - s.lock.Lock() - defer s.lock.Unlock() - s.transferring[remote] = struct{}{} -} - -// DoneTransferring removes a transfer from the stats -// -// if ok is true then it increments the transfers count -func (s *StatsInfo) DoneTransferring(remote string, ok bool) { - s.lock.Lock() - defer s.lock.Unlock() - delete(s.transferring, remote) - if ok { - s.transfers++ - } -} - // Account limits and accounts for one transfer type Account struct { // The mutex is to make sure Read() and Close() aren't called @@ -483,17 +157,7 @@ func (acc *Account) read(in io.Reader, p []byte) (n int, err error) { Stats.Bytes(int64(n)) - // Get the token bucket in use - tokenBucketMu.Lock() - - // Limit the transfer speed if required - if tokenBucket != nil { - tbErr := tokenBucket.WaitN(context.Background(), n) - if tbErr != nil { - fs.Errorf(nil, "Token bucket error: %v", err) - } - } - tokenBucketMu.Unlock() + limitBandwidth(n) return } diff --git a/fs/accounting/inprogress.go b/fs/accounting/inprogress.go new file mode 100644 index 000000000..2f9e01491 --- /dev/null +++ b/fs/accounting/inprogress.go @@ -0,0 +1,41 @@ +package accounting + +import ( + "sync" + + "github.com/ncw/rclone/fs" +) + +// inProgress holds a synchronized map of in progress transfers +type inProgress struct { + mu sync.Mutex + m map[string]*Account +} + +// newInProgress makes a new inProgress object +func newInProgress() *inProgress { + return &inProgress{ + m: make(map[string]*Account, fs.Config.Transfers), + } +} + +// set marks the name as in progress +func (ip *inProgress) set(name string, acc *Account) { + ip.mu.Lock() + defer ip.mu.Unlock() + ip.m[name] = acc +} + +// clear marks the name as no longer in progress +func (ip *inProgress) clear(name string) { + ip.mu.Lock() + defer ip.mu.Unlock() + delete(ip.m, name) +} + +// get gets the account for name, of nil if not found +func (ip *inProgress) get(name string) *Account { + ip.mu.Lock() + defer ip.mu.Unlock() + return ip.m[name] +} diff --git a/fs/accounting/stats.go b/fs/accounting/stats.go new file mode 100644 index 000000000..7decbb0dd --- /dev/null +++ b/fs/accounting/stats.go @@ -0,0 +1,189 @@ +package accounting + +import ( + "bytes" + "fmt" + "strings" + "sync" + "time" + + "github.com/ncw/rclone/fs" +) + +var ( + // Stats is global statistics counter + Stats = NewStats() +) + +func init() { + // Set the function pointer up in fs + fs.CountError = Stats.Error +} + +// StatsInfo accounts all transfers +type StatsInfo struct { + lock sync.RWMutex + bytes int64 + errors int64 + lastError error + checks int64 + checking stringSet + transfers int64 + transferring stringSet + start time.Time + inProgress *inProgress +} + +// NewStats cretates an initialised StatsInfo +func NewStats() *StatsInfo { + return &StatsInfo{ + checking: make(stringSet, fs.Config.Checkers), + transferring: make(stringSet, fs.Config.Transfers), + start: time.Now(), + inProgress: newInProgress(), + } +} + +// String convert the StatsInfo to a string for printing +func (s *StatsInfo) String() string { + s.lock.RLock() + defer s.lock.RUnlock() + dt := time.Now().Sub(s.start) + dtSeconds := dt.Seconds() + speed := 0.0 + if dt > 0 { + speed = float64(s.bytes) / dtSeconds + } + dtRounded := dt - (dt % (time.Second / 10)) + buf := &bytes.Buffer{} + + if fs.Config.DataRateUnit == "bits" { + speed = speed * 8 + } + + fmt.Fprintf(buf, ` +Transferred: %10s (%s) +Errors: %10d +Checks: %10d +Transferred: %10d +Elapsed time: %10v +`, + fs.SizeSuffix(s.bytes).Unit("Bytes"), fs.SizeSuffix(speed).Unit(strings.Title(fs.Config.DataRateUnit)+"/s"), + s.errors, + s.checks, + s.transfers, + dtRounded) + if len(s.checking) > 0 { + fmt.Fprintf(buf, "Checking:\n%s\n", s.checking) + } + if len(s.transferring) > 0 { + fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring) + } + return buf.String() +} + +// Log outputs the StatsInfo to the log +func (s *StatsInfo) Log() { + fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "%v\n", s) +} + +// Bytes updates the stats for bytes bytes +func (s *StatsInfo) Bytes(bytes int64) { + s.lock.Lock() + defer s.lock.Unlock() + s.bytes += bytes +} + +// Errors updates the stats for errors +func (s *StatsInfo) Errors(errors int64) { + s.lock.Lock() + defer s.lock.Unlock() + s.errors += errors +} + +// GetErrors reads the number of errors +func (s *StatsInfo) GetErrors() int64 { + s.lock.RLock() + defer s.lock.RUnlock() + return s.errors +} + +// GetLastError returns the lastError +func (s *StatsInfo) GetLastError() error { + s.lock.RLock() + defer s.lock.RUnlock() + return s.lastError +} + +// ResetCounters sets the counters (bytes, checks, errors, transfers) to 0 +func (s *StatsInfo) ResetCounters() { + s.lock.RLock() + defer s.lock.RUnlock() + s.bytes = 0 + s.errors = 0 + s.checks = 0 + s.transfers = 0 +} + +// ResetErrors sets the errors count to 0 +func (s *StatsInfo) ResetErrors() { + s.lock.RLock() + defer s.lock.RUnlock() + s.errors = 0 +} + +// Errored returns whether there have been any errors +func (s *StatsInfo) Errored() bool { + s.lock.RLock() + defer s.lock.RUnlock() + return s.errors != 0 +} + +// Error adds a single error into the stats and assigns lastError +func (s *StatsInfo) Error(err error) { + s.lock.Lock() + defer s.lock.Unlock() + s.errors++ + s.lastError = err +} + +// Checking adds a check into the stats +func (s *StatsInfo) Checking(remote string) { + s.lock.Lock() + defer s.lock.Unlock() + s.checking[remote] = struct{}{} +} + +// DoneChecking removes a check from the stats +func (s *StatsInfo) DoneChecking(remote string) { + s.lock.Lock() + defer s.lock.Unlock() + delete(s.checking, remote) + s.checks++ +} + +// GetTransfers reads the number of transfers +func (s *StatsInfo) GetTransfers() int64 { + s.lock.RLock() + defer s.lock.RUnlock() + return s.transfers +} + +// Transferring adds a transfer into the stats +func (s *StatsInfo) Transferring(remote string) { + s.lock.Lock() + defer s.lock.Unlock() + s.transferring[remote] = struct{}{} +} + +// DoneTransferring removes a transfer from the stats +// +// if ok is true then it increments the transfers count +func (s *StatsInfo) DoneTransferring(remote string, ok bool) { + s.lock.Lock() + defer s.lock.Unlock() + delete(s.transferring, remote) + if ok { + s.transfers++ + } +} diff --git a/fs/accounting/stringset.go b/fs/accounting/stringset.go new file mode 100644 index 000000000..09b397ab5 --- /dev/null +++ b/fs/accounting/stringset.go @@ -0,0 +1,31 @@ +package accounting + +import ( + "sort" + "strings" +) + +// stringSet holds a set of strings +type stringSet map[string]struct{} + +// Strings returns all the strings in the stringSet +func (ss stringSet) Strings() []string { + strings := make([]string, 0, len(ss)) + for name := range ss { + var out string + if acc := Stats.inProgress.get(name); acc != nil { + out = acc.String() + } else { + out = name + } + strings = append(strings, " * "+out) + } + sorted := sort.StringSlice(strings) + sorted.Sort() + return sorted +} + +// String returns all the file names in the stringSet joined by newline +func (ss stringSet) String() string { + return strings.Join(ss.Strings(), "\n") +} diff --git a/fs/accounting/token_bucket.go b/fs/accounting/token_bucket.go new file mode 100644 index 000000000..02787159f --- /dev/null +++ b/fs/accounting/token_bucket.go @@ -0,0 +1,114 @@ +package accounting + +import ( + "sync" + "time" + + "github.com/ncw/rclone/fs" + "golang.org/x/net/context" // switch to "context" when we stop supporting go1.6 + "golang.org/x/time/rate" +) + +// Globals +var ( + tokenBucketMu sync.Mutex // protects the token bucket variables + tokenBucket *rate.Limiter + prevTokenBucket = tokenBucket + bwLimitToggledOff = false + currLimitMu sync.Mutex // protects changes to the timeslot + currLimit fs.BwTimeSlot +) + +const maxBurstSize = 1 * 1024 * 1024 // must be bigger than the biggest request + +// make a new empty token bucket with the bandwidth given +func newTokenBucket(bandwidth fs.SizeSuffix) *rate.Limiter { + newTokenBucket := rate.NewLimiter(rate.Limit(bandwidth), maxBurstSize) + // empty the bucket + err := newTokenBucket.WaitN(context.Background(), maxBurstSize) + if err != nil { + fs.Errorf(nil, "Failed to empty token bucket: %v", err) + } + return newTokenBucket +} + +// StartTokenBucket starts the token bucket if necessary +func StartTokenBucket() { + currLimitMu.Lock() + currLimit := fs.Config.BwLimit.LimitAt(time.Now()) + currLimitMu.Unlock() + + if currLimit.Bandwidth > 0 { + tokenBucket = newTokenBucket(currLimit.Bandwidth) + fs.Infof(nil, "Starting bandwidth limiter at %vBytes/s", &currLimit.Bandwidth) + + // Start the SIGUSR2 signal handler to toggle bandwidth. + // This function does nothing in windows systems. + startSignalHandler() + } +} + +// StartTokenTicker creates a ticker to update the bandwidth limiter every minute. +func StartTokenTicker() { + // If the timetable has a single entry or was not specified, we don't need + // a ticker to update the bandwidth. + if len(fs.Config.BwLimit) <= 1 { + return + } + + ticker := time.NewTicker(time.Minute) + go func() { + for range ticker.C { + limitNow := fs.Config.BwLimit.LimitAt(time.Now()) + currLimitMu.Lock() + + if currLimit.Bandwidth != limitNow.Bandwidth { + tokenBucketMu.Lock() + + // If bwlimit is toggled off, the change should only + // become active on the next toggle, which causes + // an exchange of tokenBucket <-> prevTokenBucket + var targetBucket **rate.Limiter + if bwLimitToggledOff { + targetBucket = &prevTokenBucket + } else { + targetBucket = &tokenBucket + } + + // Set new bandwidth. If unlimited, set tokenbucket to nil. + if limitNow.Bandwidth > 0 { + *targetBucket = newTokenBucket(limitNow.Bandwidth) + if bwLimitToggledOff { + fs.Logf(nil, "Scheduled bandwidth change. "+ + "Limit will be set to %vBytes/s when toggled on again.", &limitNow.Bandwidth) + } else { + fs.Logf(nil, "Scheduled bandwidth change. Limit set to %vBytes/s", &limitNow.Bandwidth) + } + } else { + *targetBucket = nil + fs.Logf(nil, "Scheduled bandwidth change. Bandwidth limits disabled") + } + + currLimit = limitNow + tokenBucketMu.Unlock() + } + currLimitMu.Unlock() + } + }() +} + +// limitBandwith sleeps for the correct amount of time for the passage +// of n bytes according to the current bandwidth limit +func limitBandwidth(n int) { + tokenBucketMu.Lock() + + // Limit the transfer speed if required + if tokenBucket != nil { + err := tokenBucket.WaitN(context.Background(), n) + if err != nil { + fs.Errorf(nil, "Token bucket error: %v", err) + } + } + + tokenBucketMu.Unlock() +}