mirror of
https://github.com/rclone/rclone.git
synced 2024-11-22 09:11:11 +08:00
accounting: isolate stats to groups
Introduce stats groups that will isolate accounting for logically different transferring operations. That way multiple accounting operations can be done in parallel without interfering with each other stats. Using groups is optional. There is dedicated global stats that will be used by default if no group is specified. This is operating mode for CLI usage which is just fire and forget operation. For running rclone as rc http server each request will create it's own group. Also there is an option to specify your own group.
This commit is contained in:
parent
be0464f5f1
commit
8243ff8bc8
|
@ -1084,16 +1084,16 @@ func (f *Fs) purge(ctx context.Context, oldOnly bool) error {
|
|||
go func() {
|
||||
defer wg.Done()
|
||||
for object := range toBeDeleted {
|
||||
accounting.Stats.Checking(object.Name)
|
||||
accounting.Stats(ctx).Checking(object.Name)
|
||||
checkErr(f.deleteByID(object.ID, object.Name))
|
||||
accounting.Stats.DoneChecking(object.Name)
|
||||
accounting.Stats(ctx).DoneChecking(object.Name)
|
||||
}
|
||||
}()
|
||||
}
|
||||
last := ""
|
||||
checkErr(f.list(ctx, "", true, "", 0, true, func(remote string, object *api.File, isDirectory bool) error {
|
||||
if !isDirectory {
|
||||
accounting.Stats.Checking(remote)
|
||||
accounting.Stats(ctx).Checking(remote)
|
||||
if oldOnly && last != remote {
|
||||
if object.Action == "hide" {
|
||||
fs.Debugf(remote, "Deleting current version (id %q) as it is a hide marker", object.ID)
|
||||
|
@ -1109,7 +1109,7 @@ func (f *Fs) purge(ctx context.Context, oldOnly bool) error {
|
|||
toBeDeleted <- object
|
||||
}
|
||||
last = remote
|
||||
accounting.Stats.DoneChecking(remote)
|
||||
accounting.Stats(ctx).DoneChecking(remote)
|
||||
}
|
||||
return nil
|
||||
}))
|
||||
|
|
|
@ -359,7 +359,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
|
|||
err = errors.Wrapf(err, "failed to open directory %q", dir)
|
||||
fs.Errorf(dir, "%v", err)
|
||||
if isPerm {
|
||||
accounting.Stats.Error(fserrors.NoRetryError(err))
|
||||
accounting.Stats(ctx).Error(fserrors.NoRetryError(err))
|
||||
err = nil // ignore error but fail sync
|
||||
}
|
||||
return nil, err
|
||||
|
@ -395,7 +395,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
|
|||
if fierr != nil {
|
||||
err = errors.Wrapf(err, "failed to read directory %q", namepath)
|
||||
fs.Errorf(dir, "%v", fierr)
|
||||
accounting.Stats.Error(fserrors.NoRetryError(fierr)) // fail the sync
|
||||
accounting.Stats(ctx).Error(fserrors.NoRetryError(fierr)) // fail the sync
|
||||
continue
|
||||
}
|
||||
fis = append(fis, fi)
|
||||
|
@ -418,7 +418,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
|
|||
// Skip bad symlinks
|
||||
err = fserrors.NoRetryError(errors.Wrap(err, "symlink"))
|
||||
fs.Errorf(newRemote, "Listing error: %v", err)
|
||||
accounting.Stats.Error(err)
|
||||
accounting.Stats(ctx).Error(err)
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
|
|
28
cmd/cmd.go
28
cmd/cmd.go
|
@ -232,25 +232,25 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) {
|
|||
for try := 1; try <= *retries; try++ {
|
||||
err = f()
|
||||
fs.CountError(err)
|
||||
lastErr := accounting.Stats.GetLastError()
|
||||
lastErr := accounting.GlobalStats().GetLastError()
|
||||
if err == nil {
|
||||
err = lastErr
|
||||
}
|
||||
if !Retry || !accounting.Stats.Errored() {
|
||||
if !Retry || !accounting.GlobalStats().Errored() {
|
||||
if try > 1 {
|
||||
fs.Errorf(nil, "Attempt %d/%d succeeded", try, *retries)
|
||||
}
|
||||
break
|
||||
}
|
||||
if accounting.Stats.HadFatalError() {
|
||||
if accounting.GlobalStats().HadFatalError() {
|
||||
fs.Errorf(nil, "Fatal error received - not attempting retries")
|
||||
break
|
||||
}
|
||||
if accounting.Stats.Errored() && !accounting.Stats.HadRetryError() {
|
||||
if accounting.GlobalStats().Errored() && !accounting.GlobalStats().HadRetryError() {
|
||||
fs.Errorf(nil, "Can't retry this error - not attempting retries")
|
||||
break
|
||||
}
|
||||
if retryAfter := accounting.Stats.RetryAfter(); !retryAfter.IsZero() {
|
||||
if retryAfter := accounting.GlobalStats().RetryAfter(); !retryAfter.IsZero() {
|
||||
d := retryAfter.Sub(time.Now())
|
||||
if d > 0 {
|
||||
fs.Logf(nil, "Received retry after error - sleeping until %s (%v)", retryAfter.Format(time.RFC3339Nano), d)
|
||||
|
@ -258,12 +258,12 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) {
|
|||
}
|
||||
}
|
||||
if lastErr != nil {
|
||||
fs.Errorf(nil, "Attempt %d/%d failed with %d errors and: %v", try, *retries, accounting.Stats.GetErrors(), lastErr)
|
||||
fs.Errorf(nil, "Attempt %d/%d failed with %d errors and: %v", try, *retries, accounting.GlobalStats().GetErrors(), lastErr)
|
||||
} else {
|
||||
fs.Errorf(nil, "Attempt %d/%d failed with %d errors", try, *retries, accounting.Stats.GetErrors())
|
||||
fs.Errorf(nil, "Attempt %d/%d failed with %d errors", try, *retries, accounting.GlobalStats().GetErrors())
|
||||
}
|
||||
if try < *retries {
|
||||
accounting.Stats.ResetErrors()
|
||||
accounting.GlobalStats().ResetErrors()
|
||||
}
|
||||
if *retriesInterval > 0 {
|
||||
time.Sleep(*retriesInterval)
|
||||
|
@ -271,7 +271,7 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) {
|
|||
}
|
||||
stopStats()
|
||||
if err != nil {
|
||||
nerrs := accounting.Stats.GetErrors()
|
||||
nerrs := accounting.GlobalStats().GetErrors()
|
||||
if nerrs <= 1 {
|
||||
log.Printf("Failed to %s: %v", cmd.Name(), err)
|
||||
} else {
|
||||
|
@ -279,8 +279,8 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) {
|
|||
}
|
||||
resolveExitCode(err)
|
||||
}
|
||||
if showStats && (accounting.Stats.Errored() || *statsInterval > 0) {
|
||||
accounting.Stats.Log()
|
||||
if showStats && (accounting.GlobalStats().Errored() || *statsInterval > 0) {
|
||||
accounting.GlobalStats().Log()
|
||||
}
|
||||
fs.Debugf(nil, "%d go routines active\n", runtime.NumGoroutine())
|
||||
|
||||
|
@ -303,8 +303,8 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) {
|
|||
}
|
||||
}
|
||||
|
||||
if accounting.Stats.Errored() {
|
||||
resolveExitCode(accounting.Stats.GetLastError())
|
||||
if accounting.GlobalStats().Errored() {
|
||||
resolveExitCode(accounting.GlobalStats().GetLastError())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -337,7 +337,7 @@ func StartStats() func() {
|
|||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
accounting.Stats.Log()
|
||||
accounting.GlobalStats().Log()
|
||||
case <-stopStats:
|
||||
ticker.Stop()
|
||||
return
|
||||
|
|
|
@ -93,7 +93,7 @@ func printProgress(logMessage string) {
|
|||
w, h = 80, 25
|
||||
}
|
||||
_ = h
|
||||
stats := strings.TrimSpace(accounting.Stats.String())
|
||||
stats := strings.TrimSpace(accounting.GlobalStats().String())
|
||||
logMessage = strings.TrimSpace(logMessage)
|
||||
|
||||
out := func(s string) {
|
||||
|
|
|
@ -214,7 +214,7 @@ func (d *Driver) ListDir(path string, callback func(ftp.FileInfo) error) (err er
|
|||
}
|
||||
|
||||
// Account the transfer
|
||||
tr := accounting.Stats.NewTransferRemoteSize(path, node.Size())
|
||||
tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size())
|
||||
defer func() {
|
||||
tr.Done(err)
|
||||
}()
|
||||
|
@ -313,7 +313,7 @@ func (d *Driver) GetFile(path string, offset int64) (size int64, fr io.ReadClose
|
|||
}
|
||||
|
||||
// Account the transfer
|
||||
tr := accounting.Stats.NewTransferRemoteSize(path, node.Size())
|
||||
tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size())
|
||||
defer tr.Done(nil)
|
||||
|
||||
return node.Size(), handle, nil
|
||||
|
|
|
@ -187,7 +187,7 @@ func (s *server) serveFile(w http.ResponseWriter, r *http.Request, remote string
|
|||
}()
|
||||
|
||||
// Account the transfer
|
||||
tr := accounting.Stats.NewTransfer(obj)
|
||||
tr := accounting.Stats(r.Context()).NewTransfer(obj)
|
||||
defer tr.Done(nil)
|
||||
// FIXME in = fs.NewAccount(in, obj).WithBuffer() // account the transfer
|
||||
|
||||
|
|
|
@ -75,7 +75,7 @@ func Error(what interface{}, w http.ResponseWriter, text string, err error) {
|
|||
// Serve serves a directory
|
||||
func (d *Directory) Serve(w http.ResponseWriter, r *http.Request) {
|
||||
// Account the transfer
|
||||
tr := accounting.Stats.NewTransferRemoteSize(d.DirRemote, -1)
|
||||
tr := accounting.Stats(r.Context()).NewTransferRemoteSize(d.DirRemote, -1)
|
||||
defer tr.Done(nil)
|
||||
|
||||
fs.Infof(d.DirRemote, "%s: Serving directory", r.RemoteAddr)
|
||||
|
|
|
@ -75,7 +75,7 @@ func Object(w http.ResponseWriter, r *http.Request, o fs.Object) {
|
|||
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
tr := accounting.Stats.NewTransfer(o)
|
||||
tr := accounting.Stats(r.Context()).NewTransfer(o)
|
||||
defer func() {
|
||||
tr.Done(err)
|
||||
}()
|
||||
|
|
|
@ -271,7 +271,7 @@ func (s *server) postObject(w http.ResponseWriter, r *http.Request, remote strin
|
|||
|
||||
_, err := operations.RcatSize(r.Context(), s.f, remote, r.Body, r.ContentLength, time.Now())
|
||||
if err != nil {
|
||||
accounting.Stats.Error(err)
|
||||
accounting.Stats(r.Context()).Error(err)
|
||||
fs.Errorf(remote, "Post request rcat error: %v", err)
|
||||
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||
|
||||
|
|
|
@ -39,3 +39,14 @@ func (ip *inProgress) get(name string) *Account {
|
|||
defer ip.mu.Unlock()
|
||||
return ip.m[name]
|
||||
}
|
||||
|
||||
// merge adds items from another inProgress
|
||||
func (ip *inProgress) merge(m *inProgress) {
|
||||
ip.mu.Lock()
|
||||
defer ip.mu.Unlock()
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
for key, val := range m.m {
|
||||
ip.m[key] = val
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ package accounting
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
@ -14,60 +13,6 @@ import (
|
|||
"github.com/ncw/rclone/fs/rc"
|
||||
)
|
||||
|
||||
var (
|
||||
// Stats is global statistics counter
|
||||
Stats = NewStats()
|
||||
)
|
||||
|
||||
func init() {
|
||||
// Set the function pointer up in fs
|
||||
fs.CountError = Stats.Error
|
||||
|
||||
rc.Add(rc.Call{
|
||||
Path: "core/stats",
|
||||
Fn: Stats.RemoteStats,
|
||||
Title: "Returns stats about current transfers.",
|
||||
Help: `
|
||||
This returns all available stats
|
||||
|
||||
rclone rc core/stats
|
||||
|
||||
Returns the following values:
|
||||
|
||||
` + "```" + `
|
||||
{
|
||||
"speed": average speed in bytes/sec since start of the process,
|
||||
"bytes": total transferred bytes since the start of the process,
|
||||
"errors": number of errors,
|
||||
"fatalError": whether there has been at least one FatalError,
|
||||
"retryError": whether there has been at least one non-NoRetryError,
|
||||
"checks": number of checked files,
|
||||
"transfers": number of transferred files,
|
||||
"deletes" : number of deleted files,
|
||||
"elapsedTime": time in seconds since the start of the process,
|
||||
"lastError": last occurred error,
|
||||
"transferring": an array of currently active file transfers:
|
||||
[
|
||||
{
|
||||
"bytes": total transferred bytes for this file,
|
||||
"eta": estimated time in seconds until file transfer completion
|
||||
"name": name of the file,
|
||||
"percentage": progress of the file transfer in percent,
|
||||
"speed": speed in bytes/sec,
|
||||
"speedAvg": speed in bytes/sec as an exponentially weighted moving average,
|
||||
"size": size of the file in bytes
|
||||
}
|
||||
],
|
||||
"checking": an array of names of currently active file checks
|
||||
[]
|
||||
}
|
||||
` + "```" + `
|
||||
Values for "transferring", "checking" and "lastError" are only assigned if data is available.
|
||||
The value for "eta" is null if an eta cannot be determined.
|
||||
`,
|
||||
})
|
||||
}
|
||||
|
||||
// StatsInfo accounts all transfers
|
||||
type StatsInfo struct {
|
||||
mu sync.RWMutex
|
||||
|
@ -102,7 +47,7 @@ func NewStats() *StatsInfo {
|
|||
}
|
||||
|
||||
// RemoteStats returns stats for rc
|
||||
func (s *StatsInfo) RemoteStats(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
||||
func (s *StatsInfo) RemoteStats() (out rc.Params, err error) {
|
||||
out = make(rc.Params)
|
||||
s.mu.RLock()
|
||||
dt := s.totalDuration()
|
||||
|
@ -237,7 +182,7 @@ func (s *StatsInfo) String() string {
|
|||
// checking and transferring have their own locking so read
|
||||
// here before lock to prevent deadlock on GetBytes
|
||||
transferring, checking := s.transferring.count(), s.checking.count()
|
||||
transferringBytesDone, transferringBytesTotal := s.transferring.progress()
|
||||
transferringBytesDone, transferringBytesTotal := s.transferring.progress(s)
|
||||
|
||||
s.mu.RLock()
|
||||
|
||||
|
@ -325,10 +270,10 @@ Elapsed time: %10v
|
|||
// Add per transfer stats if required
|
||||
if !fs.Config.StatsOneLine {
|
||||
if !s.checking.empty() {
|
||||
_, _ = fmt.Fprintf(buf, "Checking:\n%s\n", s.checking)
|
||||
_, _ = fmt.Fprintf(buf, "Checking:\n%s\n", s.checking.String(s.inProgress))
|
||||
}
|
||||
if !s.transferring.empty() {
|
||||
_, _ = fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring)
|
||||
_, _ = fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring.String(s.inProgress))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
189
fs/accounting/stats_groups.go
Normal file
189
fs/accounting/stats_groups.go
Normal file
|
@ -0,0 +1,189 @@
|
|||
package accounting
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/ncw/rclone/fs/rc"
|
||||
|
||||
"github.com/ncw/rclone/fs"
|
||||
)
|
||||
|
||||
const globalStats = "global_stats"
|
||||
|
||||
var groups *statsGroups
|
||||
|
||||
func remoteStats(ctx context.Context, in rc.Params) (rc.Params, error) {
|
||||
// Check to see if we should filter by group.
|
||||
group, err := in.GetString("group")
|
||||
if rc.NotErrParamNotFound(err) {
|
||||
return rc.Params{}, err
|
||||
}
|
||||
if group != "" {
|
||||
return StatsGroup(group).RemoteStats()
|
||||
}
|
||||
|
||||
return groups.sum().RemoteStats()
|
||||
}
|
||||
|
||||
func init() {
|
||||
// Init stats container
|
||||
groups = newStatsGroups()
|
||||
|
||||
// Set the function pointer up in fs
|
||||
fs.CountError = GlobalStats().Error
|
||||
|
||||
rc.Add(rc.Call{
|
||||
Path: "core/stats",
|
||||
Fn: remoteStats,
|
||||
Title: "Returns stats about current transfers.",
|
||||
Help: `
|
||||
This returns all available stats
|
||||
|
||||
rclone rc core/stats
|
||||
|
||||
Returns the following values:
|
||||
|
||||
` + "```" + `
|
||||
{
|
||||
"speed": average speed in bytes/sec since start of the process,
|
||||
"bytes": total transferred bytes since the start of the process,
|
||||
"errors": number of errors,
|
||||
"fatalError": whether there has been at least one FatalError,
|
||||
"retryError": whether there has been at least one non-NoRetryError,
|
||||
"checks": number of checked files,
|
||||
"transfers": number of transferred files,
|
||||
"deletes" : number of deleted files,
|
||||
"elapsedTime": time in seconds since the start of the process,
|
||||
"lastError": last occurred error,
|
||||
"transferring": an array of currently active file transfers:
|
||||
[
|
||||
{
|
||||
"bytes": total transferred bytes for this file,
|
||||
"eta": estimated time in seconds until file transfer completion
|
||||
"name": name of the file,
|
||||
"percentage": progress of the file transfer in percent,
|
||||
"speed": speed in bytes/sec,
|
||||
"speedAvg": speed in bytes/sec as an exponentially weighted moving average,
|
||||
"size": size of the file in bytes
|
||||
}
|
||||
],
|
||||
"checking": an array of names of currently active file checks
|
||||
[]
|
||||
}
|
||||
` + "```" + `
|
||||
Values for "transferring", "checking" and "lastError" are only assigned if data is available.
|
||||
The value for "eta" is null if an eta cannot be determined.
|
||||
`,
|
||||
})
|
||||
}
|
||||
|
||||
type statsGroupCtx int64
|
||||
|
||||
const statsGroupKey statsGroupCtx = 1
|
||||
|
||||
// WithStatsGroup returns copy of the parent context with assigned group.
|
||||
func WithStatsGroup(parent context.Context, group string) context.Context {
|
||||
return context.WithValue(parent, statsGroupKey, group)
|
||||
}
|
||||
|
||||
// StatsGroupFromContext returns group from the context if it's available.
|
||||
// Returns false if group is empty.
|
||||
func StatsGroupFromContext(ctx context.Context) (string, bool) {
|
||||
statsGroup, ok := ctx.Value(statsGroupKey).(string)
|
||||
if statsGroup == "" {
|
||||
ok = false
|
||||
}
|
||||
return statsGroup, ok
|
||||
}
|
||||
|
||||
// Stats gets stats by extracting group from context.
|
||||
func Stats(ctx context.Context) *StatsInfo {
|
||||
group, ok := StatsGroupFromContext(ctx)
|
||||
if !ok {
|
||||
return GlobalStats()
|
||||
}
|
||||
return StatsGroup(group)
|
||||
}
|
||||
|
||||
// StatsGroup gets stats by group name.
|
||||
func StatsGroup(group string) *StatsInfo {
|
||||
stats := groups.get(group)
|
||||
if stats == nil {
|
||||
return NewStatsGroup(group)
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
||||
// GlobalStats returns special stats used for global accounting.
|
||||
func GlobalStats() *StatsInfo {
|
||||
return StatsGroup(globalStats)
|
||||
}
|
||||
|
||||
// NewStatsGroup creates new stats under named group.
|
||||
func NewStatsGroup(group string) *StatsInfo {
|
||||
stats := NewStats()
|
||||
groups.set(group, stats)
|
||||
return stats
|
||||
}
|
||||
|
||||
// statsGroups holds a synchronized map of stats
|
||||
type statsGroups struct {
|
||||
mu sync.Mutex
|
||||
m map[string]*StatsInfo
|
||||
}
|
||||
|
||||
// newStatsGroups makes a new statsGroups object
|
||||
func newStatsGroups() *statsGroups {
|
||||
return &statsGroups{
|
||||
m: make(map[string]*StatsInfo),
|
||||
}
|
||||
}
|
||||
|
||||
// set marks the stats as belonging to a group
|
||||
func (sg *statsGroups) set(group string, acc *StatsInfo) {
|
||||
sg.mu.Lock()
|
||||
defer sg.mu.Unlock()
|
||||
sg.m[group] = acc
|
||||
}
|
||||
|
||||
// clear discards reference to group
|
||||
func (sg *statsGroups) clear(group string) {
|
||||
sg.mu.Lock()
|
||||
defer sg.mu.Unlock()
|
||||
delete(sg.m, group)
|
||||
}
|
||||
|
||||
// get gets the stats for group, or nil if not found
|
||||
func (sg *statsGroups) get(group string) *StatsInfo {
|
||||
sg.mu.Lock()
|
||||
defer sg.mu.Unlock()
|
||||
stats, ok := sg.m[group]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
||||
// get gets the stats for group, or nil if not found
|
||||
func (sg *statsGroups) sum() *StatsInfo {
|
||||
sg.mu.Lock()
|
||||
defer sg.mu.Unlock()
|
||||
sum := NewStats()
|
||||
for _, stats := range sg.m {
|
||||
sum.bytes += stats.bytes
|
||||
sum.errors += stats.errors
|
||||
sum.fatalError = sum.fatalError || stats.fatalError
|
||||
sum.retryError = sum.retryError || stats.retryError
|
||||
sum.checks += stats.checks
|
||||
sum.transfers += stats.transfers
|
||||
sum.deletes += stats.deletes
|
||||
sum.checking.merge(stats.checking)
|
||||
sum.transferring.merge(stats.transferring)
|
||||
sum.inProgress.merge(stats.inProgress)
|
||||
if sum.lastError == nil && stats.lastError != nil {
|
||||
sum.lastError = stats.lastError
|
||||
}
|
||||
}
|
||||
return sum
|
||||
}
|
|
@ -38,6 +38,17 @@ func (ss *stringSet) del(remote string) {
|
|||
ss.mu.Unlock()
|
||||
}
|
||||
|
||||
// merge adds items from another set
|
||||
func (ss *stringSet) merge(m *stringSet) {
|
||||
ss.mu.Lock()
|
||||
m.mu.Lock()
|
||||
for item := range m.items {
|
||||
ss.items[item] = struct{}{}
|
||||
}
|
||||
m.mu.Unlock()
|
||||
ss.mu.Unlock()
|
||||
}
|
||||
|
||||
// empty returns whether the set has any items
|
||||
func (ss *stringSet) empty() bool {
|
||||
ss.mu.RLock()
|
||||
|
@ -52,14 +63,14 @@ func (ss *stringSet) count() int {
|
|||
return len(ss.items)
|
||||
}
|
||||
|
||||
// Strings returns all the strings in the stringSet
|
||||
func (ss *stringSet) Strings() []string {
|
||||
// String returns string representation of set items.
|
||||
func (ss *stringSet) String(progress *inProgress) string {
|
||||
ss.mu.RLock()
|
||||
defer ss.mu.RUnlock()
|
||||
strings := make([]string, 0, len(ss.items))
|
||||
strngs := make([]string, 0, len(ss.items))
|
||||
for name := range ss.items {
|
||||
var out string
|
||||
if acc := Stats.inProgress.get(name); acc != nil {
|
||||
if acc := progress.get(name); acc != nil {
|
||||
out = acc.String()
|
||||
} else {
|
||||
out = fmt.Sprintf("%*s: %s",
|
||||
|
@ -68,24 +79,19 @@ func (ss *stringSet) Strings() []string {
|
|||
ss.name,
|
||||
)
|
||||
}
|
||||
strings = append(strings, " * "+out)
|
||||
strngs = append(strngs, " * "+out)
|
||||
}
|
||||
sorted := sort.StringSlice(strings)
|
||||
sorted := sort.StringSlice(strngs)
|
||||
sorted.Sort()
|
||||
return sorted
|
||||
}
|
||||
|
||||
// String returns all the file names in the stringSet joined by newline
|
||||
func (ss *stringSet) String() string {
|
||||
return strings.Join(ss.Strings(), "\n")
|
||||
return strings.Join(sorted, "\n")
|
||||
}
|
||||
|
||||
// progress returns total bytes read as well as the size.
|
||||
func (ss *stringSet) progress() (totalBytes, totalSize int64) {
|
||||
func (ss *stringSet) progress(stats *StatsInfo) (totalBytes, totalSize int64) {
|
||||
ss.mu.RLock()
|
||||
defer ss.mu.RUnlock()
|
||||
for name := range ss.items {
|
||||
if acc := Stats.inProgress.get(name); acc != nil {
|
||||
if acc := stats.inProgress.get(name); acc != nil {
|
||||
bytes, size := acc.progress()
|
||||
if size >= 0 && bytes >= 0 {
|
||||
totalBytes += bytes
|
||||
|
|
|
@ -60,8 +60,8 @@ func TestMultithreadCopy(t *testing.T) {
|
|||
|
||||
src, err := r.Fremote.NewObject(context.Background(), "file1")
|
||||
require.NoError(t, err)
|
||||
accounting.Stats.ResetCounters()
|
||||
tr := accounting.Stats.NewTransfer(src)
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
tr := accounting.GlobalStats().NewTransfer(src)
|
||||
|
||||
defer func() {
|
||||
tr.Done(err)
|
||||
|
|
|
@ -251,7 +251,7 @@ var _ fs.MimeTyper = (*overrideRemoteObject)(nil)
|
|||
// It returns the destination object if possible. Note that this may
|
||||
// be nil.
|
||||
func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) {
|
||||
tr := accounting.Stats.NewTransfer(src)
|
||||
tr := accounting.Stats(ctx).NewTransfer(src)
|
||||
defer func() {
|
||||
tr.Done(err)
|
||||
}()
|
||||
|
@ -281,13 +281,13 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
|
|||
actionTaken = "Copied (server side copy)"
|
||||
if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && f.Features().ServerSideAcrossConfigs)) {
|
||||
// Check transfer limit for server side copies
|
||||
if fs.Config.MaxTransfer >= 0 && accounting.Stats.GetBytes() >= int64(fs.Config.MaxTransfer) {
|
||||
if fs.Config.MaxTransfer >= 0 && accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) {
|
||||
return nil, accounting.ErrorMaxTransferLimitReached
|
||||
}
|
||||
newDst, err = doCopy(ctx, src, remote)
|
||||
if err == nil {
|
||||
dst = newDst
|
||||
accounting.Stats.Bytes(dst.Size()) // account the bytes for the server side transfer
|
||||
accounting.Stats(ctx).Bytes(dst.Size()) // account the bytes for the server side transfer
|
||||
}
|
||||
} else {
|
||||
err = fs.ErrorCantCopy
|
||||
|
@ -428,9 +428,9 @@ func SameObject(src, dst fs.Object) bool {
|
|||
// It returns the destination object if possible. Note that this may
|
||||
// be nil.
|
||||
func Move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) {
|
||||
accounting.Stats.Checking(src.Remote())
|
||||
accounting.Stats(ctx).Checking(src.Remote())
|
||||
defer func() {
|
||||
accounting.Stats.DoneChecking(src.Remote())
|
||||
accounting.Stats(ctx).DoneChecking(src.Remote())
|
||||
}()
|
||||
newDst = dst
|
||||
if fs.Config.DryRun {
|
||||
|
@ -501,8 +501,8 @@ func SuffixName(remote string) string {
|
|||
// If backupDir is set then it moves the file to there instead of
|
||||
// deleting
|
||||
func DeleteFileWithBackupDir(ctx context.Context, dst fs.Object, backupDir fs.Fs) (err error) {
|
||||
accounting.Stats.Checking(dst.Remote())
|
||||
numDeletes := accounting.Stats.Deletes(1)
|
||||
accounting.Stats(ctx).Checking(dst.Remote())
|
||||
numDeletes := accounting.Stats(ctx).Deletes(1)
|
||||
if fs.Config.MaxDelete != -1 && numDeletes > fs.Config.MaxDelete {
|
||||
return fserrors.FatalError(errors.New("--max-delete threshold reached"))
|
||||
}
|
||||
|
@ -523,7 +523,7 @@ func DeleteFileWithBackupDir(ctx context.Context, dst fs.Object, backupDir fs.Fs
|
|||
} else if !fs.Config.DryRun {
|
||||
fs.Infof(dst, actioned)
|
||||
}
|
||||
accounting.Stats.DoneChecking(dst.Remote())
|
||||
accounting.Stats(ctx).DoneChecking(dst.Remote())
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -709,8 +709,8 @@ func (c *checkMarch) SrcOnly(src fs.DirEntry) (recurse bool) {
|
|||
|
||||
// check to see if two objects are identical using the check function
|
||||
func (c *checkMarch) checkIdentical(ctx context.Context, dst, src fs.Object) (differ bool, noHash bool) {
|
||||
accounting.Stats.Checking(src.Remote())
|
||||
defer accounting.Stats.DoneChecking(src.Remote())
|
||||
accounting.Stats(ctx).Checking(src.Remote())
|
||||
defer accounting.Stats(ctx).DoneChecking(src.Remote())
|
||||
if sizeDiffers(src, dst) {
|
||||
err := errors.Errorf("Sizes differ")
|
||||
fs.Errorf(src, "%v", err)
|
||||
|
@ -797,7 +797,7 @@ func CheckFn(ctx context.Context, fdst, fsrc fs.Fs, check checkFn, oneway bool)
|
|||
fs.Logf(fsrc, "%d files missing", c.srcFilesMissing)
|
||||
}
|
||||
|
||||
fs.Logf(fdst, "%d differences found", accounting.Stats.GetErrors())
|
||||
fs.Logf(fdst, "%d differences found", accounting.Stats(ctx).GetErrors())
|
||||
if c.noHashes > 0 {
|
||||
fs.Logf(fdst, "%d hashes could not be checked", c.noHashes)
|
||||
}
|
||||
|
@ -854,7 +854,7 @@ func CheckIdentical(ctx context.Context, dst, src fs.Object) (differ bool, err e
|
|||
if err != nil {
|
||||
return true, errors.Wrapf(err, "failed to open %q", dst)
|
||||
}
|
||||
tr1 := accounting.Stats.NewTransfer(dst)
|
||||
tr1 := accounting.Stats(ctx).NewTransfer(dst)
|
||||
defer func() {
|
||||
tr1.Done(err)
|
||||
}()
|
||||
|
@ -864,7 +864,7 @@ func CheckIdentical(ctx context.Context, dst, src fs.Object) (differ bool, err e
|
|||
if err != nil {
|
||||
return true, errors.Wrapf(err, "failed to open %q", src)
|
||||
}
|
||||
tr2 := accounting.Stats.NewTransfer(dst)
|
||||
tr2 := accounting.Stats(ctx).NewTransfer(dst)
|
||||
defer func() {
|
||||
tr2.Done(err)
|
||||
}()
|
||||
|
@ -930,9 +930,9 @@ func List(ctx context.Context, f fs.Fs, w io.Writer) error {
|
|||
// Lists in parallel which may get them out of order
|
||||
func ListLong(ctx context.Context, f fs.Fs, w io.Writer) error {
|
||||
return ListFn(ctx, f, func(o fs.Object) {
|
||||
accounting.Stats.Checking(o.Remote())
|
||||
accounting.Stats(ctx).Checking(o.Remote())
|
||||
modTime := o.ModTime(ctx)
|
||||
accounting.Stats.DoneChecking(o.Remote())
|
||||
accounting.Stats(ctx).DoneChecking(o.Remote())
|
||||
syncFprintf(w, "%9d %s %s\n", o.Size(), modTime.Local().Format("2006-01-02 15:04:05.000000000"), o.Remote())
|
||||
})
|
||||
}
|
||||
|
@ -968,9 +968,9 @@ func DropboxHashSum(ctx context.Context, f fs.Fs, w io.Writer) error {
|
|||
// hashSum returns the human readable hash for ht passed in. This may
|
||||
// be UNSUPPORTED or ERROR.
|
||||
func hashSum(ctx context.Context, ht hash.Type, o fs.Object) string {
|
||||
accounting.Stats.Checking(o.Remote())
|
||||
accounting.Stats(ctx).Checking(o.Remote())
|
||||
sum, err := o.Hash(ctx, ht)
|
||||
accounting.Stats.DoneChecking(o.Remote())
|
||||
accounting.Stats(ctx).DoneChecking(o.Remote())
|
||||
if err == hash.ErrUnsupported {
|
||||
sum = "UNSUPPORTED"
|
||||
} else if err != nil {
|
||||
|
@ -1167,7 +1167,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error {
|
|||
var mu sync.Mutex
|
||||
return ListFn(ctx, f, func(o fs.Object) {
|
||||
var err error
|
||||
tr := accounting.Stats.NewTransfer(o)
|
||||
tr := accounting.Stats(ctx).NewTransfer(o)
|
||||
defer func() {
|
||||
tr.Done(err)
|
||||
}()
|
||||
|
@ -1206,7 +1206,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error {
|
|||
|
||||
// Rcat reads data from the Reader until EOF and uploads it to a file on remote
|
||||
func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time) (dst fs.Object, err error) {
|
||||
tr := accounting.Stats.NewTransferRemoteSize(dstFileName, -1)
|
||||
tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1)
|
||||
defer func() {
|
||||
tr.Done(err)
|
||||
}()
|
||||
|
@ -1527,7 +1527,7 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo
|
|||
if size >= 0 {
|
||||
var err error
|
||||
// Size known use Put
|
||||
tr := accounting.Stats.NewTransferRemoteSize(dstFileName, size)
|
||||
tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, size)
|
||||
defer func() {
|
||||
tr.Done(err)
|
||||
}()
|
||||
|
@ -1664,7 +1664,7 @@ func moveOrCopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName str
|
|||
}
|
||||
return errors.Wrap(err, "error while attempting to move file to a temporary location")
|
||||
}
|
||||
tr := accounting.Stats.NewTransfer(srcObj)
|
||||
tr := accounting.Stats(ctx).NewTransfer(srcObj)
|
||||
defer func() {
|
||||
tr.Done(err)
|
||||
}()
|
||||
|
@ -1711,11 +1711,11 @@ func moveOrCopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName str
|
|||
|
||||
_, err = Op(ctx, fdst, dstObj, dstFileName, srcObj)
|
||||
} else {
|
||||
accounting.Stats.Checking(srcFileName)
|
||||
accounting.Stats(ctx).Checking(srcFileName)
|
||||
if !cp {
|
||||
err = DeleteFile(ctx, srcObj)
|
||||
}
|
||||
defer accounting.Stats.DoneChecking(srcFileName)
|
||||
defer accounting.Stats(ctx).DoneChecking(srcFileName)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
// fstest.CheckItems() before use. This make sure the directory
|
||||
// listing is now consistent and stops cascading errors.
|
||||
//
|
||||
// Call accounting.Stats.ResetCounters() before every fs.Sync() as it
|
||||
// Call accounting.GlobalStats().ResetCounters() before every fs.Sync() as it
|
||||
// uses the error count internally.
|
||||
|
||||
package operations_test
|
||||
|
@ -315,15 +315,15 @@ func testCheck(t *testing.T, checkFunction func(ctx context.Context, fdst, fsrc
|
|||
|
||||
check := func(i int, wantErrors int64, wantChecks int64, oneway bool) {
|
||||
fs.Debugf(r.Fremote, "%d: Starting check test", i)
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
var buf bytes.Buffer
|
||||
log.SetOutput(&buf)
|
||||
defer func() {
|
||||
log.SetOutput(os.Stderr)
|
||||
}()
|
||||
err := checkFunction(context.Background(), r.Fremote, r.Flocal, oneway)
|
||||
gotErrors := accounting.Stats.GetErrors()
|
||||
gotChecks := accounting.Stats.GetChecks()
|
||||
gotErrors := accounting.GlobalStats().GetErrors()
|
||||
gotChecks := accounting.GlobalStats().GetChecks()
|
||||
if wantErrors == 0 && err != nil {
|
||||
t.Errorf("%d: Got error when not expecting one: %v", i, err)
|
||||
}
|
||||
|
|
|
@ -1,13 +1,18 @@
|
|||
// Manage background jobs that the rc is running
|
||||
|
||||
package rc
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ncw/rclone/fs/rc"
|
||||
|
||||
"github.com/ncw/rclone/fs/accounting"
|
||||
|
||||
"github.com/ncw/rclone/fs"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
@ -16,13 +21,14 @@ import (
|
|||
type Job struct {
|
||||
mu sync.Mutex
|
||||
ID int64 `json:"id"`
|
||||
Group string `json:"group"`
|
||||
StartTime time.Time `json:"startTime"`
|
||||
EndTime time.Time `json:"endTime"`
|
||||
Error string `json:"error"`
|
||||
Finished bool `json:"finished"`
|
||||
Success bool `json:"success"`
|
||||
Duration float64 `json:"duration"`
|
||||
Output Params `json:"output"`
|
||||
Output rc.Params `json:"output"`
|
||||
Stop func() `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -96,11 +102,11 @@ func (jobs *Jobs) Get(ID int64) *Job {
|
|||
}
|
||||
|
||||
// mark the job as finished
|
||||
func (job *Job) finish(out Params, err error) {
|
||||
func (job *Job) finish(out rc.Params, err error) {
|
||||
job.mu.Lock()
|
||||
job.EndTime = time.Now()
|
||||
if out == nil {
|
||||
out = make(Params)
|
||||
out = make(rc.Params)
|
||||
}
|
||||
job.Output = out
|
||||
job.Duration = job.EndTime.Sub(job.StartTime).Seconds()
|
||||
|
@ -117,7 +123,7 @@ func (job *Job) finish(out Params, err error) {
|
|||
}
|
||||
|
||||
// run the job until completion writing the return status
|
||||
func (job *Job) run(ctx context.Context, fn Func, in Params) {
|
||||
func (job *Job) run(ctx context.Context, fn rc.Func, in rc.Params) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
job.finish(nil, errors.Errorf("panic received: %v", r))
|
||||
|
@ -126,37 +132,93 @@ func (job *Job) run(ctx context.Context, fn Func, in Params) {
|
|||
job.finish(fn(ctx, in))
|
||||
}
|
||||
|
||||
// NewJob start a new Job off
|
||||
func (jobs *Jobs) NewJob(fn Func, in Params) *Job {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
func getGroup(in rc.Params) string {
|
||||
// Check to see if the group is set
|
||||
group, err := in.GetString("_group")
|
||||
if rc.NotErrParamNotFound(err) {
|
||||
fs.Errorf(nil, "Can't get _group param %+v", err)
|
||||
}
|
||||
delete(in, "_group")
|
||||
return group
|
||||
}
|
||||
|
||||
// NewAsyncJob start a new asynchronous Job off
|
||||
func (jobs *Jobs) NewAsyncJob(fn rc.Func, in rc.Params) *Job {
|
||||
id := atomic.AddInt64(&jobID, 1)
|
||||
|
||||
group := getGroup(in)
|
||||
if group == "" {
|
||||
group = fmt.Sprintf("job/%d", id)
|
||||
}
|
||||
ctx := accounting.WithStatsGroup(context.Background(), group)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
stop := func() {
|
||||
cancel()
|
||||
// Wait for cancel to propagate before returning.
|
||||
<-ctx.Done()
|
||||
}
|
||||
job := &Job{
|
||||
ID: atomic.AddInt64(&jobID, 1),
|
||||
ID: id,
|
||||
Group: group,
|
||||
StartTime: time.Now(),
|
||||
Stop: stop,
|
||||
}
|
||||
go job.run(ctx, fn, in)
|
||||
jobs.mu.Lock()
|
||||
jobs.jobs[job.ID] = job
|
||||
jobs.mu.Unlock()
|
||||
go job.run(ctx, fn, in)
|
||||
return job
|
||||
|
||||
}
|
||||
|
||||
// StartJob starts a new job and returns a Param suitable for output
|
||||
func StartJob(fn Func, in Params) (Params, error) {
|
||||
job := running.NewJob(fn, in)
|
||||
out := make(Params)
|
||||
// NewSyncJob start a new synchronous Job off
|
||||
func (jobs *Jobs) NewSyncJob(ctx context.Context, in rc.Params) (*Job, context.Context) {
|
||||
id := atomic.AddInt64(&jobID, 1)
|
||||
group := getGroup(in)
|
||||
if group == "" {
|
||||
group = fmt.Sprintf("job/%d", id)
|
||||
}
|
||||
ctxG := accounting.WithStatsGroup(ctx, fmt.Sprintf("job/%d", id))
|
||||
ctx, cancel := context.WithCancel(ctxG)
|
||||
stop := func() {
|
||||
cancel()
|
||||
// Wait for cancel to propagate before returning.
|
||||
<-ctx.Done()
|
||||
}
|
||||
job := &Job{
|
||||
ID: id,
|
||||
Group: group,
|
||||
StartTime: time.Now(),
|
||||
Stop: stop,
|
||||
}
|
||||
jobs.mu.Lock()
|
||||
jobs.jobs[job.ID] = job
|
||||
jobs.mu.Unlock()
|
||||
return job, ctx
|
||||
}
|
||||
|
||||
// StartAsyncJob starts a new job asynchronously and returns a Param suitable
|
||||
// for output.
|
||||
func StartAsyncJob(fn rc.Func, in rc.Params) (rc.Params, error) {
|
||||
job := running.NewAsyncJob(fn, in)
|
||||
out := make(rc.Params)
|
||||
out["jobid"] = job.ID
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ExecuteJob executes new job synchronously and returns a Param suitable for
|
||||
// output.
|
||||
func ExecuteJob(ctx context.Context, fn rc.Func, in rc.Params) (rc.Params, int64, error) {
|
||||
job, ctx := running.NewSyncJob(ctx, in)
|
||||
job.run(ctx, fn, in)
|
||||
var err error
|
||||
if !job.Success {
|
||||
err = errors.New(job.Error)
|
||||
}
|
||||
return job.Output, job.ID, err
|
||||
}
|
||||
|
||||
func init() {
|
||||
Add(Call{
|
||||
rc.Add(rc.Call{
|
||||
Path: "job/status",
|
||||
Fn: rcJobStatus,
|
||||
Title: "Reads the status of the job ID",
|
||||
|
@ -179,7 +241,7 @@ Results
|
|||
}
|
||||
|
||||
// Returns the status of a job
|
||||
func rcJobStatus(ctx context.Context, in Params) (out Params, err error) {
|
||||
func rcJobStatus(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
||||
jobID, err := in.GetInt64("jobid")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -190,8 +252,8 @@ func rcJobStatus(ctx context.Context, in Params) (out Params, err error) {
|
|||
}
|
||||
job.mu.Lock()
|
||||
defer job.mu.Unlock()
|
||||
out = make(Params)
|
||||
err = Reshape(&out, job)
|
||||
out = make(rc.Params)
|
||||
err = rc.Reshape(&out, job)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "reshape failed in job status")
|
||||
}
|
||||
|
@ -199,7 +261,7 @@ func rcJobStatus(ctx context.Context, in Params) (out Params, err error) {
|
|||
}
|
||||
|
||||
func init() {
|
||||
Add(Call{
|
||||
rc.Add(rc.Call{
|
||||
Path: "job/list",
|
||||
Fn: rcJobList,
|
||||
Title: "Lists the IDs of the running jobs",
|
||||
|
@ -212,14 +274,14 @@ Results
|
|||
}
|
||||
|
||||
// Returns list of job ids.
|
||||
func rcJobList(ctx context.Context, in Params) (out Params, err error) {
|
||||
out = make(Params)
|
||||
func rcJobList(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
||||
out = make(rc.Params)
|
||||
out["jobids"] = running.IDs()
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
Add(Call{
|
||||
rc.Add(rc.Call{
|
||||
Path: "job/stop",
|
||||
Fn: rcJobStop,
|
||||
Title: "Stop the running job",
|
||||
|
@ -230,7 +292,7 @@ func init() {
|
|||
}
|
||||
|
||||
// Stops the running job.
|
||||
func rcJobStop(ctx context.Context, in Params) (out Params, err error) {
|
||||
func rcJobStop(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
||||
jobID, err := in.GetInt64("jobid")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -241,7 +303,7 @@ func rcJobStop(ctx context.Context, in Params) (out Params, err error) {
|
|||
}
|
||||
job.mu.Lock()
|
||||
defer job.mu.Unlock()
|
||||
out = make(Params)
|
||||
out = make(rc.Params)
|
||||
job.Stop()
|
||||
return out, nil
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package rc
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -7,6 +7,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/ncw/rclone/fs"
|
||||
"github.com/ncw/rclone/fs/rc"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -36,10 +37,10 @@ func TestJobsExpire(t *testing.T) {
|
|||
jobs := newJobs()
|
||||
jobs.expireInterval = time.Millisecond
|
||||
assert.Equal(t, false, jobs.expireRunning)
|
||||
job := jobs.NewJob(func(ctx context.Context, in Params) (Params, error) {
|
||||
job := jobs.NewAsyncJob(func(ctx context.Context, in rc.Params) (rc.Params, error) {
|
||||
defer close(wait)
|
||||
return in, nil
|
||||
}, Params{})
|
||||
}, rc.Params{})
|
||||
<-wait
|
||||
assert.Equal(t, 1, len(jobs.jobs))
|
||||
jobs.Expire()
|
||||
|
@ -57,14 +58,14 @@ func TestJobsExpire(t *testing.T) {
|
|||
jobs.mu.Unlock()
|
||||
}
|
||||
|
||||
var noopFn = func(ctx context.Context, in Params) (Params, error) {
|
||||
var noopFn = func(ctx context.Context, in rc.Params) (rc.Params, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestJobsIDs(t *testing.T) {
|
||||
jobs := newJobs()
|
||||
job1 := jobs.NewJob(noopFn, Params{})
|
||||
job2 := jobs.NewJob(noopFn, Params{})
|
||||
job1 := jobs.NewAsyncJob(noopFn, rc.Params{})
|
||||
job2 := jobs.NewAsyncJob(noopFn, rc.Params{})
|
||||
wantIDs := []int64{job1.ID, job2.ID}
|
||||
gotIDs := jobs.IDs()
|
||||
require.Equal(t, 2, len(gotIDs))
|
||||
|
@ -76,17 +77,22 @@ func TestJobsIDs(t *testing.T) {
|
|||
|
||||
func TestJobsGet(t *testing.T) {
|
||||
jobs := newJobs()
|
||||
job := jobs.NewJob(noopFn, Params{})
|
||||
job := jobs.NewAsyncJob(noopFn, rc.Params{})
|
||||
assert.Equal(t, job, jobs.Get(job.ID))
|
||||
assert.Nil(t, jobs.Get(123123123123))
|
||||
}
|
||||
|
||||
var longFn = func(ctx context.Context, in Params) (Params, error) {
|
||||
var longFn = func(ctx context.Context, in rc.Params) (rc.Params, error) {
|
||||
time.Sleep(1 * time.Hour)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var ctxFn = func(ctx context.Context, in Params) (Params, error) {
|
||||
var shortFn = func(ctx context.Context, in rc.Params) (rc.Params, error) {
|
||||
time.Sleep(time.Millisecond)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var ctxFn = func(ctx context.Context, in rc.Params) (rc.Params, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
|
@ -105,17 +111,17 @@ func sleepJob() {
|
|||
|
||||
func TestJobFinish(t *testing.T) {
|
||||
jobs := newJobs()
|
||||
job := jobs.NewJob(longFn, Params{})
|
||||
job := jobs.NewAsyncJob(longFn, rc.Params{})
|
||||
sleepJob()
|
||||
|
||||
assert.Equal(t, true, job.EndTime.IsZero())
|
||||
assert.Equal(t, Params(nil), job.Output)
|
||||
assert.Equal(t, rc.Params(nil), job.Output)
|
||||
assert.Equal(t, 0.0, job.Duration)
|
||||
assert.Equal(t, "", job.Error)
|
||||
assert.Equal(t, false, job.Success)
|
||||
assert.Equal(t, false, job.Finished)
|
||||
|
||||
wantOut := Params{"a": 1}
|
||||
wantOut := rc.Params{"a": 1}
|
||||
job.finish(wantOut, nil)
|
||||
|
||||
assert.Equal(t, false, job.EndTime.IsZero())
|
||||
|
@ -125,18 +131,18 @@ func TestJobFinish(t *testing.T) {
|
|||
assert.Equal(t, true, job.Success)
|
||||
assert.Equal(t, true, job.Finished)
|
||||
|
||||
job = jobs.NewJob(longFn, Params{})
|
||||
job = jobs.NewAsyncJob(longFn, rc.Params{})
|
||||
sleepJob()
|
||||
job.finish(nil, nil)
|
||||
|
||||
assert.Equal(t, false, job.EndTime.IsZero())
|
||||
assert.Equal(t, Params{}, job.Output)
|
||||
assert.Equal(t, rc.Params{}, job.Output)
|
||||
assert.True(t, job.Duration >= floatSleepTime)
|
||||
assert.Equal(t, "", job.Error)
|
||||
assert.Equal(t, true, job.Success)
|
||||
assert.Equal(t, true, job.Finished)
|
||||
|
||||
job = jobs.NewJob(longFn, Params{})
|
||||
job = jobs.NewAsyncJob(longFn, rc.Params{})
|
||||
sleepJob()
|
||||
job.finish(wantOut, errors.New("potato"))
|
||||
|
||||
|
@ -152,14 +158,14 @@ func TestJobFinish(t *testing.T) {
|
|||
// part of NewJob, now just test the panic catching
|
||||
func TestJobRunPanic(t *testing.T) {
|
||||
wait := make(chan struct{})
|
||||
boom := func(ctx context.Context, in Params) (Params, error) {
|
||||
boom := func(ctx context.Context, in rc.Params) (rc.Params, error) {
|
||||
sleepJob()
|
||||
defer close(wait)
|
||||
panic("boom")
|
||||
}
|
||||
|
||||
jobs := newJobs()
|
||||
job := jobs.NewJob(boom, Params{})
|
||||
job := jobs.NewAsyncJob(boom, rc.Params{})
|
||||
<-wait
|
||||
runtime.Gosched() // yield to make sure job is updated
|
||||
|
||||
|
@ -176,7 +182,7 @@ func TestJobRunPanic(t *testing.T) {
|
|||
|
||||
job.mu.Lock()
|
||||
assert.Equal(t, false, job.EndTime.IsZero())
|
||||
assert.Equal(t, Params{}, job.Output)
|
||||
assert.Equal(t, rc.Params{}, job.Output)
|
||||
assert.True(t, job.Duration >= floatSleepTime)
|
||||
assert.Equal(t, "panic received: boom", job.Error)
|
||||
assert.Equal(t, false, job.Success)
|
||||
|
@ -187,7 +193,7 @@ func TestJobRunPanic(t *testing.T) {
|
|||
func TestJobsNewJob(t *testing.T) {
|
||||
jobID = 0
|
||||
jobs := newJobs()
|
||||
job := jobs.NewJob(noopFn, Params{})
|
||||
job := jobs.NewAsyncJob(noopFn, rc.Params{})
|
||||
assert.Equal(t, int64(1), job.ID)
|
||||
assert.Equal(t, job, jobs.Get(1))
|
||||
assert.NotEmpty(t, job.Stop)
|
||||
|
@ -195,19 +201,26 @@ func TestJobsNewJob(t *testing.T) {
|
|||
|
||||
func TestStartJob(t *testing.T) {
|
||||
jobID = 0
|
||||
out, err := StartJob(longFn, Params{})
|
||||
out, err := StartAsyncJob(longFn, rc.Params{})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, Params{"jobid": int64(1)}, out)
|
||||
assert.Equal(t, rc.Params{"jobid": int64(1)}, out)
|
||||
}
|
||||
|
||||
func TestExecuteJob(t *testing.T) {
|
||||
jobID = 0
|
||||
_, id, err := ExecuteJob(context.Background(), shortFn, rc.Params{})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(1), id)
|
||||
}
|
||||
|
||||
func TestRcJobStatus(t *testing.T) {
|
||||
jobID = 0
|
||||
_, err := StartJob(longFn, Params{})
|
||||
_, err := StartAsyncJob(longFn, rc.Params{})
|
||||
assert.NoError(t, err)
|
||||
|
||||
call := Calls.Get("job/status")
|
||||
call := rc.Calls.Get("job/status")
|
||||
assert.NotNil(t, call)
|
||||
in := Params{"jobid": 1}
|
||||
in := rc.Params{"jobid": 1}
|
||||
out, err := call.Fn(context.Background(), in)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, out)
|
||||
|
@ -216,12 +229,12 @@ func TestRcJobStatus(t *testing.T) {
|
|||
assert.Equal(t, false, out["finished"])
|
||||
assert.Equal(t, false, out["success"])
|
||||
|
||||
in = Params{"jobid": 123123123}
|
||||
in = rc.Params{"jobid": 123123123}
|
||||
_, err = call.Fn(context.Background(), in)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "job not found")
|
||||
|
||||
in = Params{"jobidx": 123123123}
|
||||
in = rc.Params{"jobidx": 123123123}
|
||||
_, err = call.Fn(context.Background(), in)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "Didn't find key")
|
||||
|
@ -229,45 +242,88 @@ func TestRcJobStatus(t *testing.T) {
|
|||
|
||||
func TestRcJobList(t *testing.T) {
|
||||
jobID = 0
|
||||
_, err := StartJob(longFn, Params{})
|
||||
_, err := StartAsyncJob(longFn, rc.Params{})
|
||||
assert.NoError(t, err)
|
||||
|
||||
call := Calls.Get("job/list")
|
||||
call := rc.Calls.Get("job/list")
|
||||
assert.NotNil(t, call)
|
||||
in := Params{}
|
||||
in := rc.Params{}
|
||||
out, err := call.Fn(context.Background(), in)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, out)
|
||||
assert.Equal(t, Params{"jobids": []int64{1}}, out)
|
||||
assert.Equal(t, rc.Params{"jobids": []int64{1}}, out)
|
||||
}
|
||||
|
||||
func TestRcJobStop(t *testing.T) {
|
||||
func TestRcAsyncJobStop(t *testing.T) {
|
||||
jobID = 0
|
||||
_, err := StartJob(ctxFn, Params{})
|
||||
_, err := StartAsyncJob(ctxFn, rc.Params{})
|
||||
assert.NoError(t, err)
|
||||
|
||||
call := Calls.Get("job/stop")
|
||||
call := rc.Calls.Get("job/stop")
|
||||
assert.NotNil(t, call)
|
||||
in := Params{"jobid": 1}
|
||||
in := rc.Params{"jobid": 1}
|
||||
out, err := call.Fn(context.Background(), in)
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, out)
|
||||
|
||||
in = Params{"jobid": 123123123}
|
||||
in = rc.Params{"jobid": 123123123}
|
||||
_, err = call.Fn(context.Background(), in)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "job not found")
|
||||
|
||||
in = Params{"jobidx": 123123123}
|
||||
in = rc.Params{"jobidx": 123123123}
|
||||
_, err = call.Fn(context.Background(), in)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "Didn't find key")
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
call = Calls.Get("job/status")
|
||||
call = rc.Calls.Get("job/status")
|
||||
assert.NotNil(t, call)
|
||||
in = Params{"jobid": 1}
|
||||
in = rc.Params{"jobid": 1}
|
||||
out, err = call.Fn(context.Background(), in)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, out)
|
||||
assert.Equal(t, float64(1), out["id"])
|
||||
assert.Equal(t, "context canceled", out["error"])
|
||||
assert.Equal(t, true, out["finished"])
|
||||
assert.Equal(t, false, out["success"])
|
||||
}
|
||||
|
||||
func TestRcSyncJobStop(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
jobID = 0
|
||||
_, id, err := ExecuteJob(ctx, ctxFn, rc.Params{})
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, int64(1), id)
|
||||
}()
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
call := rc.Calls.Get("job/stop")
|
||||
assert.NotNil(t, call)
|
||||
in := rc.Params{"jobid": 1}
|
||||
out, err := call.Fn(context.Background(), in)
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, out)
|
||||
|
||||
in = rc.Params{"jobid": 123123123}
|
||||
_, err = call.Fn(context.Background(), in)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "job not found")
|
||||
|
||||
in = rc.Params{"jobidx": 123123123}
|
||||
_, err = call.Fn(context.Background(), in)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "Didn't find key")
|
||||
|
||||
cancel()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
call = rc.Calls.Get("job/status")
|
||||
assert.NotNil(t, call)
|
||||
in = rc.Params{"jobid": 1}
|
||||
out, err = call.Fn(context.Background(), in)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, out)
|
|
@ -3,6 +3,8 @@ package rcserver
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"mime"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
@ -10,6 +12,10 @@ import (
|
|||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/skratchdot/open-golang/open"
|
||||
|
||||
"github.com/ncw/rclone/fs/rc/jobs"
|
||||
|
||||
"github.com/ncw/rclone/cmd/serve/httplib"
|
||||
"github.com/ncw/rclone/cmd/serve/httplib/serve"
|
||||
"github.com/ncw/rclone/fs"
|
||||
|
@ -18,7 +24,6 @@ import (
|
|||
"github.com/ncw/rclone/fs/list"
|
||||
"github.com/ncw/rclone/fs/rc"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/skratchdot/open-golang/open"
|
||||
)
|
||||
|
||||
// Start the remote control server if configured
|
||||
|
@ -79,7 +84,10 @@ func (s *Server) Serve() error {
|
|||
if user != "" || pass != "" {
|
||||
openURL.User = url.UserPassword(user, pass)
|
||||
}
|
||||
_ = open.Start(openURL.String())
|
||||
// Don't open browser if serving in testing environment.
|
||||
if flag.Lookup("test.v") == nil {
|
||||
_ = open.Start(openURL.String())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -181,15 +189,16 @@ func (s *Server) handlePost(w http.ResponseWriter, r *http.Request, path string)
|
|||
writeError(path, in, w, err, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
delete(in, "_async") // remove the async parameter after parsing so vfs operations don't get confused
|
||||
|
||||
fs.Debugf(nil, "rc: %q: with parameters %+v", path, in)
|
||||
var out rc.Params
|
||||
if isAsync {
|
||||
out, err = rc.StartJob(call.Fn, in)
|
||||
out, err = jobs.StartAsyncJob(call.Fn, in)
|
||||
} else {
|
||||
out, err = call.Fn(r.Context(), in)
|
||||
var jobID int64
|
||||
out, jobID, err = jobs.ExecuteJob(r.Context(), call.Fn, in)
|
||||
w.Header().Add("x-rclone-jobid", fmt.Sprintf("%d", jobID))
|
||||
}
|
||||
if err != nil {
|
||||
writeError(path, in, w, err, http.StatusInternalServerError)
|
||||
|
|
|
@ -611,10 +611,7 @@ func TestRCAsync(t *testing.T) {
|
|||
ContentType: "application/json",
|
||||
Body: `{ "_async":true }`,
|
||||
Status: http.StatusOK,
|
||||
Expected: `{
|
||||
"jobid": 1
|
||||
}
|
||||
`,
|
||||
Contains: regexp.MustCompile(`(?s)\{.*\"jobid\":.*\}`),
|
||||
}, {
|
||||
Name: "bad",
|
||||
URL: "rc/noop",
|
||||
|
|
|
@ -82,12 +82,12 @@ func newSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.Delete
|
|||
dstEmptyDirs: make(map[string]fs.DirEntry),
|
||||
srcEmptyDirs: make(map[string]fs.DirEntry),
|
||||
noTraverse: fs.Config.NoTraverse,
|
||||
toBeChecked: newPipe(accounting.Stats.SetCheckQueue, fs.Config.MaxBacklog),
|
||||
toBeUploaded: newPipe(accounting.Stats.SetTransferQueue, fs.Config.MaxBacklog),
|
||||
toBeChecked: newPipe(accounting.Stats(ctx).SetCheckQueue, fs.Config.MaxBacklog),
|
||||
toBeUploaded: newPipe(accounting.Stats(ctx).SetTransferQueue, fs.Config.MaxBacklog),
|
||||
deleteFilesCh: make(chan fs.Object, fs.Config.Checkers),
|
||||
trackRenames: fs.Config.TrackRenames,
|
||||
commonHash: fsrc.Hashes().Overlap(fdst.Hashes()).GetOne(),
|
||||
toBeRenamed: newPipe(accounting.Stats.SetRenameQueue, fs.Config.MaxBacklog),
|
||||
toBeRenamed: newPipe(accounting.Stats(ctx).SetRenameQueue, fs.Config.MaxBacklog),
|
||||
trackRenamesCh: make(chan fs.Object, fs.Config.Checkers),
|
||||
}
|
||||
s.ctx, s.cancel = context.WithCancel(ctx)
|
||||
|
@ -215,7 +215,7 @@ func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, wg *sync.WaitGroup) {
|
|||
return
|
||||
}
|
||||
src := pair.Src
|
||||
accounting.Stats.Checking(src.Remote())
|
||||
accounting.Stats(s.ctx).Checking(src.Remote())
|
||||
// Check to see if can store this
|
||||
if src.Storable() {
|
||||
NoNeedTransfer, err := operations.CompareOrCopyDest(s.ctx, s.fdst, pair.Dst, pair.Src, s.compareCopyDest, s.backupDir)
|
||||
|
@ -256,7 +256,7 @@ func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, wg *sync.WaitGroup) {
|
|||
}
|
||||
}
|
||||
}
|
||||
accounting.Stats.DoneChecking(src.Remote())
|
||||
accounting.Stats(s.ctx).DoneChecking(src.Remote())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -401,7 +401,7 @@ func (s *syncCopyMove) stopDeleters() {
|
|||
// checkSrcMap is clear then it assumes that the any source files that
|
||||
// have been found have been removed from dstFiles already.
|
||||
func (s *syncCopyMove) deleteFiles(checkSrcMap bool) error {
|
||||
if accounting.Stats.Errored() && !fs.Config.IgnoreErrors {
|
||||
if accounting.Stats(s.ctx).Errored() && !fs.Config.IgnoreErrors {
|
||||
fs.Errorf(s.fdst, "%v", fs.ErrorNotDeleting)
|
||||
return fs.ErrorNotDeleting
|
||||
}
|
||||
|
@ -437,7 +437,7 @@ func deleteEmptyDirectories(ctx context.Context, f fs.Fs, entriesMap map[string]
|
|||
if len(entriesMap) == 0 {
|
||||
return nil
|
||||
}
|
||||
if accounting.Stats.Errored() && !fs.Config.IgnoreErrors {
|
||||
if accounting.Stats(ctx).Errored() && !fs.Config.IgnoreErrors {
|
||||
fs.Errorf(f, "%v", fs.ErrorNotDeletingDirs)
|
||||
return fs.ErrorNotDeletingDirs
|
||||
}
|
||||
|
@ -497,8 +497,8 @@ func copyEmptyDirectories(ctx context.Context, f fs.Fs, entries map[string]fs.Di
|
|||
}
|
||||
}
|
||||
|
||||
if accounting.Stats.Errored() {
|
||||
fs.Debugf(f, "failed to copy %d directories", accounting.Stats.GetErrors())
|
||||
if accounting.Stats(ctx).Errored() {
|
||||
fs.Debugf(f, "failed to copy %d directories", accounting.Stats(ctx).GetErrors())
|
||||
}
|
||||
|
||||
if okCount > 0 {
|
||||
|
@ -587,12 +587,12 @@ func (s *syncCopyMove) makeRenameMap() {
|
|||
for obj := range in {
|
||||
// only create hash for dst fs.Object if its size could match
|
||||
if _, found := possibleSizes[obj.Size()]; found {
|
||||
accounting.Stats.Checking(obj.Remote())
|
||||
accounting.Stats(s.ctx).Checking(obj.Remote())
|
||||
hash := s.renameHash(obj)
|
||||
if hash != "" {
|
||||
s.pushRenameMap(hash, obj)
|
||||
}
|
||||
accounting.Stats.DoneChecking(obj.Remote())
|
||||
accounting.Stats(s.ctx).DoneChecking(obj.Remote())
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
|
@ -105,7 +105,7 @@ func TestSyncNoTraverse(t *testing.T) {
|
|||
|
||||
file1 := r.WriteFile("sub dir/hello world", "hello world", t1)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -306,24 +306,24 @@ func TestSyncBasedOnCheckSum(t *testing.T) {
|
|||
file1 := r.WriteFile("check sum", "-", t1)
|
||||
fstest.CheckItems(t, r.Flocal, file1)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We should have transferred exactly one file.
|
||||
assert.Equal(t, int64(1), accounting.Stats.GetTransfers())
|
||||
assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers())
|
||||
fstest.CheckItems(t, r.Fremote, file1)
|
||||
|
||||
// Change last modified date only
|
||||
file2 := r.WriteFile("check sum", "-", t2)
|
||||
fstest.CheckItems(t, r.Flocal, file2)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err = Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We should have transferred no files
|
||||
assert.Equal(t, int64(0), accounting.Stats.GetTransfers())
|
||||
assert.Equal(t, int64(0), accounting.GlobalStats().GetTransfers())
|
||||
fstest.CheckItems(t, r.Flocal, file2)
|
||||
fstest.CheckItems(t, r.Fremote, file1)
|
||||
}
|
||||
|
@ -340,24 +340,24 @@ func TestSyncSizeOnly(t *testing.T) {
|
|||
file1 := r.WriteFile("sizeonly", "potato", t1)
|
||||
fstest.CheckItems(t, r.Flocal, file1)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We should have transferred exactly one file.
|
||||
assert.Equal(t, int64(1), accounting.Stats.GetTransfers())
|
||||
assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers())
|
||||
fstest.CheckItems(t, r.Fremote, file1)
|
||||
|
||||
// Update mtime, md5sum but not length of file
|
||||
file2 := r.WriteFile("sizeonly", "POTATO", t2)
|
||||
fstest.CheckItems(t, r.Flocal, file2)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err = Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We should have transferred no files
|
||||
assert.Equal(t, int64(0), accounting.Stats.GetTransfers())
|
||||
assert.Equal(t, int64(0), accounting.GlobalStats().GetTransfers())
|
||||
fstest.CheckItems(t, r.Flocal, file2)
|
||||
fstest.CheckItems(t, r.Fremote, file1)
|
||||
}
|
||||
|
@ -374,24 +374,24 @@ func TestSyncIgnoreSize(t *testing.T) {
|
|||
file1 := r.WriteFile("ignore-size", "contents", t1)
|
||||
fstest.CheckItems(t, r.Flocal, file1)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We should have transferred exactly one file.
|
||||
assert.Equal(t, int64(1), accounting.Stats.GetTransfers())
|
||||
assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers())
|
||||
fstest.CheckItems(t, r.Fremote, file1)
|
||||
|
||||
// Update size but not date of file
|
||||
file2 := r.WriteFile("ignore-size", "longer contents but same date", t1)
|
||||
fstest.CheckItems(t, r.Flocal, file2)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err = Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We should have transferred no files
|
||||
assert.Equal(t, int64(0), accounting.Stats.GetTransfers())
|
||||
assert.Equal(t, int64(0), accounting.GlobalStats().GetTransfers())
|
||||
fstest.CheckItems(t, r.Flocal, file2)
|
||||
fstest.CheckItems(t, r.Fremote, file1)
|
||||
}
|
||||
|
@ -402,24 +402,24 @@ func TestSyncIgnoreTimes(t *testing.T) {
|
|||
file1 := r.WriteBoth(context.Background(), "existing", "potato", t1)
|
||||
fstest.CheckItems(t, r.Fremote, file1)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We should have transferred exactly 0 files because the
|
||||
// files were identical.
|
||||
assert.Equal(t, int64(0), accounting.Stats.GetTransfers())
|
||||
assert.Equal(t, int64(0), accounting.GlobalStats().GetTransfers())
|
||||
|
||||
fs.Config.IgnoreTimes = true
|
||||
defer func() { fs.Config.IgnoreTimes = false }()
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err = Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We should have transferred exactly one file even though the
|
||||
// files were identical.
|
||||
assert.Equal(t, int64(1), accounting.Stats.GetTransfers())
|
||||
assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers())
|
||||
|
||||
fstest.CheckItems(t, r.Flocal, file1)
|
||||
fstest.CheckItems(t, r.Fremote, file1)
|
||||
|
@ -433,7 +433,7 @@ func TestSyncIgnoreExisting(t *testing.T) {
|
|||
fs.Config.IgnoreExisting = true
|
||||
defer func() { fs.Config.IgnoreExisting = false }()
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
fstest.CheckItems(t, r.Flocal, file1)
|
||||
|
@ -441,7 +441,7 @@ func TestSyncIgnoreExisting(t *testing.T) {
|
|||
|
||||
// Change everything
|
||||
r.WriteFile("existing", "newpotatoes", t2)
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err = Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
// Items should not change
|
||||
|
@ -488,7 +488,7 @@ func TestSyncIgnoreErrors(t *testing.T) {
|
|||
fs.GetModifyWindow(r.Fremote),
|
||||
)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
fs.CountError(errors.New("boom"))
|
||||
assert.NoError(t, Sync(context.Background(), r.Fremote, r.Flocal, false))
|
||||
|
||||
|
@ -532,7 +532,7 @@ func TestSyncAfterChangingModtimeOnly(t *testing.T) {
|
|||
fs.Config.DryRun = true
|
||||
defer func() { fs.Config.DryRun = false }()
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -541,7 +541,7 @@ func TestSyncAfterChangingModtimeOnly(t *testing.T) {
|
|||
|
||||
fs.Config.DryRun = false
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err = Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -569,7 +569,7 @@ func TestSyncAfterChangingModtimeOnlyWithNoUpdateModTime(t *testing.T) {
|
|||
fstest.CheckItems(t, r.Flocal, file1)
|
||||
fstest.CheckItems(t, r.Fremote, file2)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -590,7 +590,7 @@ func TestSyncDoesntUpdateModtime(t *testing.T) {
|
|||
fstest.CheckItems(t, r.Flocal, file1)
|
||||
fstest.CheckItems(t, r.Fremote, file2)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -598,7 +598,7 @@ func TestSyncDoesntUpdateModtime(t *testing.T) {
|
|||
fstest.CheckItems(t, r.Fremote, file1)
|
||||
|
||||
// We should have transferred exactly one file, not set the mod time
|
||||
assert.Equal(t, int64(1), accounting.Stats.GetTransfers())
|
||||
assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers())
|
||||
}
|
||||
|
||||
func TestSyncAfterAddingAFile(t *testing.T) {
|
||||
|
@ -610,7 +610,7 @@ func TestSyncAfterAddingAFile(t *testing.T) {
|
|||
fstest.CheckItems(t, r.Flocal, file1, file2)
|
||||
fstest.CheckItems(t, r.Fremote, file1)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
fstest.CheckItems(t, r.Flocal, file1, file2)
|
||||
|
@ -625,7 +625,7 @@ func TestSyncAfterChangingFilesSizeOnly(t *testing.T) {
|
|||
fstest.CheckItems(t, r.Fremote, file1)
|
||||
fstest.CheckItems(t, r.Flocal, file2)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
fstest.CheckItems(t, r.Flocal, file2)
|
||||
|
@ -648,7 +648,7 @@ func TestSyncAfterChangingContentsOnly(t *testing.T) {
|
|||
fstest.CheckItems(t, r.Fremote, file1)
|
||||
fstest.CheckItems(t, r.Flocal, file2)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
fstest.CheckItems(t, r.Flocal, file2)
|
||||
|
@ -664,7 +664,7 @@ func TestSyncAfterRemovingAFileAndAddingAFileDryRun(t *testing.T) {
|
|||
file3 := r.WriteBoth(context.Background(), "empty space", "-", t2)
|
||||
|
||||
fs.Config.DryRun = true
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
fs.Config.DryRun = false
|
||||
require.NoError(t, err)
|
||||
|
@ -683,7 +683,7 @@ func TestSyncAfterRemovingAFileAndAddingAFile(t *testing.T) {
|
|||
fstest.CheckItems(t, r.Fremote, file2, file3)
|
||||
fstest.CheckItems(t, r.Flocal, file1, file3)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
fstest.CheckItems(t, r.Flocal, file1, file3)
|
||||
|
@ -729,7 +729,7 @@ func TestSyncAfterRemovingAFileAndAddingAFileSubDir(t *testing.T) {
|
|||
fs.GetModifyWindow(r.Fremote),
|
||||
)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -798,7 +798,7 @@ func TestSyncAfterRemovingAFileAndAddingAFileSubDirWithErrors(t *testing.T) {
|
|||
fs.GetModifyWindow(r.Fremote),
|
||||
)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
fs.CountError(errors.New("boom"))
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
assert.Equal(t, fs.ErrorNotDeleting, err)
|
||||
|
@ -876,7 +876,7 @@ func TestCopyDeleteBefore(t *testing.T) {
|
|||
fstest.CheckItems(t, r.Fremote, file1)
|
||||
fstest.CheckItems(t, r.Flocal, file2)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := CopyDir(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -899,14 +899,14 @@ func TestSyncWithExclude(t *testing.T) {
|
|||
filter.Active.Opt.MaxSize = -1
|
||||
}()
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
fstest.CheckItems(t, r.Fremote, file2, file1)
|
||||
|
||||
// Now sync the other way round and check enormous doesn't get
|
||||
// deleted as it is excluded from the sync
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err = Sync(context.Background(), r.Flocal, r.Fremote, false)
|
||||
require.NoError(t, err)
|
||||
fstest.CheckItems(t, r.Flocal, file2, file1, file3)
|
||||
|
@ -929,14 +929,14 @@ func TestSyncWithExcludeAndDeleteExcluded(t *testing.T) {
|
|||
filter.Active.Opt.DeleteExcluded = false
|
||||
}()
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
fstest.CheckItems(t, r.Fremote, file2)
|
||||
|
||||
// Check sync the other way round to make sure enormous gets
|
||||
// deleted even though it is excluded
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err = Sync(context.Background(), r.Flocal, r.Fremote, false)
|
||||
require.NoError(t, err)
|
||||
fstest.CheckItems(t, r.Flocal, file2)
|
||||
|
@ -971,7 +971,7 @@ func TestSyncWithUpdateOlder(t *testing.T) {
|
|||
fs.Config.ModifyWindow = oldModifyWindow
|
||||
}()
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
fstest.CheckItems(t, r.Fremote, oneO, twoF, threeO, fourF, fiveF)
|
||||
|
@ -995,7 +995,7 @@ func TestSyncWithTrackRenames(t *testing.T) {
|
|||
f1 := r.WriteFile("potato", "Potato Content", t1)
|
||||
f2 := r.WriteFile("yam", "Yam Content", t2)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
require.NoError(t, Sync(context.Background(), r.Fremote, r.Flocal, false))
|
||||
|
||||
fstest.CheckItems(t, r.Fremote, f1, f2)
|
||||
|
@ -1004,7 +1004,7 @@ func TestSyncWithTrackRenames(t *testing.T) {
|
|||
// Now rename locally.
|
||||
f2 = r.RenameFile(f2, "yaml")
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
require.NoError(t, Sync(context.Background(), r.Fremote, r.Flocal, false))
|
||||
|
||||
fstest.CheckItems(t, r.Fremote, f1, f2)
|
||||
|
@ -1012,15 +1012,15 @@ func TestSyncWithTrackRenames(t *testing.T) {
|
|||
if canTrackRenames {
|
||||
if r.Fremote.Features().Move == nil || r.Fremote.Name() == "TestUnion" { // union remote can Move but returns CantMove error
|
||||
// If no server side Move, we are falling back to Copy + Delete
|
||||
assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) // 1 copy
|
||||
assert.Equal(t, int64(4), accounting.Stats.GetChecks()) // 2 file checks + 1 move + 1 delete
|
||||
assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) // 1 copy
|
||||
assert.Equal(t, int64(4), accounting.GlobalStats().GetChecks()) // 2 file checks + 1 move + 1 delete
|
||||
} else {
|
||||
assert.Equal(t, int64(0), accounting.Stats.GetTransfers()) // 0 copy
|
||||
assert.Equal(t, int64(3), accounting.Stats.GetChecks()) // 2 file checks + 1 move
|
||||
assert.Equal(t, int64(0), accounting.GlobalStats().GetTransfers()) // 0 copy
|
||||
assert.Equal(t, int64(3), accounting.GlobalStats().GetChecks()) // 2 file checks + 1 move
|
||||
}
|
||||
} else {
|
||||
assert.Equal(t, int64(2), accounting.Stats.GetChecks()) // 2 file checks
|
||||
assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) // 0 copy
|
||||
assert.Equal(t, int64(2), accounting.GlobalStats().GetChecks()) // 2 file checks
|
||||
assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) // 0 copy
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1049,7 +1049,7 @@ func testServerSideMove(t *testing.T, r *fstest.Run, withFilter, testDeleteEmpty
|
|||
fstest.CheckItems(t, FremoteMove, file2, file3)
|
||||
|
||||
// Do server side move
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err = MoveDir(context.Background(), FremoteMove, r.Fremote, testDeleteEmptyDirs, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -1076,7 +1076,7 @@ func testServerSideMove(t *testing.T, r *fstest.Run, withFilter, testDeleteEmpty
|
|||
}
|
||||
|
||||
// Move it back to a new empty remote, dst does not exist this time
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err = MoveDir(context.Background(), FremoteMove2, FremoteMove, testDeleteEmptyDirs, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -1439,7 +1439,7 @@ func testSyncBackupDir(t *testing.T, suffix string, suffixKeepExtension bool) {
|
|||
fdst, err := fs.NewFs(r.FremoteName + "/dst")
|
||||
require.NoError(t, err)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err = Sync(context.Background(), fdst, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -1464,7 +1464,7 @@ func testSyncBackupDir(t *testing.T, suffix string, suffixKeepExtension bool) {
|
|||
|
||||
// This should delete three and overwrite one again, checking
|
||||
// the files got overwritten correctly in backup-dir
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err = Sync(context.Background(), fdst, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -1518,7 +1518,7 @@ func testSyncSuffix(t *testing.T, suffix string, suffixKeepExtension bool) {
|
|||
fdst, err := fs.NewFs(r.FremoteName + "/dst")
|
||||
require.NoError(t, err)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err = operations.CopyFile(context.Background(), fdst, r.Flocal, "one", "one")
|
||||
require.NoError(t, err)
|
||||
err = operations.CopyFile(context.Background(), fdst, r.Flocal, "two", "two")
|
||||
|
@ -1548,7 +1548,7 @@ func testSyncSuffix(t *testing.T, suffix string, suffixKeepExtension bool) {
|
|||
|
||||
// This should delete three and overwrite one again, checking
|
||||
// the files got overwritten correctly in backup-dir
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err = operations.CopyFile(context.Background(), fdst, r.Flocal, "one", "one")
|
||||
require.NoError(t, err)
|
||||
err = operations.CopyFile(context.Background(), fdst, r.Flocal, "two", "two")
|
||||
|
@ -1594,13 +1594,13 @@ func TestSyncUTFNorm(t *testing.T) {
|
|||
file2 := r.WriteObject(context.Background(), Encoding2, "This is a old test", t2)
|
||||
fstest.CheckItems(t, r.Fremote, file2)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We should have transferred exactly one file, but kept the
|
||||
// normalized state of the file.
|
||||
assert.Equal(t, int64(1), accounting.Stats.GetTransfers())
|
||||
assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers())
|
||||
fstest.CheckItems(t, r.Flocal, file1)
|
||||
file1.Path = file2.Path
|
||||
fstest.CheckItems(t, r.Fremote, file1)
|
||||
|
@ -1620,7 +1620,7 @@ func TestSyncImmutable(t *testing.T) {
|
|||
fstest.CheckItems(t, r.Fremote)
|
||||
|
||||
// Should succeed
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
fstest.CheckItems(t, r.Flocal, file1)
|
||||
|
@ -1632,7 +1632,7 @@ func TestSyncImmutable(t *testing.T) {
|
|||
fstest.CheckItems(t, r.Fremote, file1)
|
||||
|
||||
// Should fail with ErrorImmutableModified and not modify local or remote files
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err = Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
assert.EqualError(t, err, fs.ErrorImmutableModified.Error())
|
||||
fstest.CheckItems(t, r.Flocal, file2)
|
||||
|
@ -1659,7 +1659,7 @@ func TestSyncIgnoreCase(t *testing.T) {
|
|||
fstest.CheckItems(t, r.Fremote, file2)
|
||||
|
||||
// Should not copy files that are differently-cased but otherwise identical
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
require.NoError(t, err)
|
||||
fstest.CheckItems(t, r.Flocal, file1)
|
||||
|
@ -1694,7 +1694,7 @@ func TestAbort(t *testing.T) {
|
|||
fstest.CheckItems(t, r.Flocal, file1, file2, file3)
|
||||
fstest.CheckItems(t, r.Fremote)
|
||||
|
||||
accounting.Stats.ResetCounters()
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
|
||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||
assert.Equal(t, accounting.ErrorMaxTransferLimitReached, err)
|
||||
|
|
|
@ -274,7 +274,8 @@ func CheckListingWithPrecision(t *testing.T, f fs.Fs, items []Item, expectedDirs
|
|||
expectedDirs = filterEmptyDirs(t, items, expectedDirs)
|
||||
}
|
||||
is := NewItems(items)
|
||||
oldErrors := accounting.Stats.GetErrors()
|
||||
ctx := context.Background()
|
||||
oldErrors := accounting.Stats(ctx).GetErrors()
|
||||
var objs []fs.Object
|
||||
var dirs []fs.Directory
|
||||
var err error
|
||||
|
@ -283,7 +284,6 @@ func CheckListingWithPrecision(t *testing.T, f fs.Fs, items []Item, expectedDirs
|
|||
wantListing1, wantListing2 := makeListingFromItems(items)
|
||||
gotListing := "<unset>"
|
||||
listingOK := false
|
||||
ctx := context.Background()
|
||||
for i := 1; i <= retries; i++ {
|
||||
objs, dirs, err = walk.GetAll(ctx, f, "", true, -1)
|
||||
if err != nil && err != fs.ErrorDirNotFound {
|
||||
|
@ -317,8 +317,8 @@ func CheckListingWithPrecision(t *testing.T, f fs.Fs, items []Item, expectedDirs
|
|||
}
|
||||
is.Done(t)
|
||||
// Don't notice an error when listing an empty directory
|
||||
if len(items) == 0 && oldErrors == 0 && accounting.Stats.GetErrors() == 1 {
|
||||
accounting.Stats.ResetErrors()
|
||||
if len(items) == 0 && oldErrors == 0 && accounting.Stats(ctx).GetErrors() == 1 {
|
||||
accounting.Stats(ctx).ResetErrors()
|
||||
}
|
||||
// Check the directories
|
||||
if expectedDirs != nil {
|
||||
|
|
|
@ -71,7 +71,7 @@ func (fh *ReadFileHandle) openPending() (err error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tr := accounting.Stats.NewTransfer(o)
|
||||
tr := accounting.GlobalStats().NewTransfer(o)
|
||||
fh.done = tr.Done
|
||||
fh.r = tr.Account(r).WithBuffer() // account the transfer
|
||||
fh.opened = true
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/ncw/rclone/fs"
|
||||
"github.com/ncw/rclone/fs/accounting"
|
||||
"github.com/ncw/rclone/fs/log"
|
||||
"github.com/ncw/rclone/fs/operations"
|
||||
"github.com/ncw/rclone/lib/file"
|
||||
|
@ -87,10 +86,6 @@ func newRWFileHandle(d *Dir, f *File, remote string, flags int) (fh *RWFileHandl
|
|||
// copy an object to or from the remote while accounting for it
|
||||
func copyObj(f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) {
|
||||
if operations.NeedTransfer(context.TODO(), dst, src) {
|
||||
tr := accounting.Stats.NewTransfer(src)
|
||||
defer func() {
|
||||
tr.Done(err)
|
||||
}()
|
||||
newDst, err = operations.Copy(context.TODO(), f, dst, remote, src)
|
||||
} else {
|
||||
newDst = dst
|
||||
|
|
Loading…
Reference in New Issue
Block a user