diff --git a/cmd/serve/ftp/ftp.go b/cmd/serve/ftp/ftp.go index 4f836bcd9..c601e47d9 100644 --- a/cmd/serve/ftp/ftp.go +++ b/cmd/serve/ftp/ftp.go @@ -214,8 +214,10 @@ func (d *Driver) ListDir(path string, callback func(ftp.FileInfo) error) (err er } // Account the transfer - accounting.Stats.Transferring(path) - defer accounting.Stats.DoneTransferring(path, true) + tr := accounting.Stats.NewTransferRemoteSize(path, node.Size()) + defer func() { + tr.Done(err) + }() for _, file := range dirEntries { err = callback(&FileInfo{file, file.Mode(), d.vfs.Opt.UID, d.vfs.Opt.GID}) @@ -311,8 +313,8 @@ func (d *Driver) GetFile(path string, offset int64) (size int64, fr io.ReadClose } // Account the transfer - accounting.Stats.Transferring(path) - defer accounting.Stats.DoneTransferring(path, true) + tr := accounting.Stats.NewTransferRemoteSize(path, node.Size()) + defer tr.Done(nil) return node.Size(), handle, nil } diff --git a/cmd/serve/http/http.go b/cmd/serve/http/http.go index a170f11e2..b8c8f393f 100644 --- a/cmd/serve/http/http.go +++ b/cmd/serve/http/http.go @@ -187,8 +187,8 @@ func (s *server) serveFile(w http.ResponseWriter, r *http.Request, remote string }() // Account the transfer - accounting.Stats.Transferring(remote) - defer accounting.Stats.DoneTransferring(remote, true) + tr := accounting.Stats.NewTransfer(obj) + defer tr.Done(nil) // FIXME in = fs.NewAccount(in, obj).WithBuffer() // account the transfer // Serve the file diff --git a/cmd/serve/httplib/serve/dir.go b/cmd/serve/httplib/serve/dir.go index 4156d9f33..36ea35cc7 100644 --- a/cmd/serve/httplib/serve/dir.go +++ b/cmd/serve/httplib/serve/dir.go @@ -75,8 +75,8 @@ func Error(what interface{}, w http.ResponseWriter, text string, err error) { // Serve serves a directory func (d *Directory) Serve(w http.ResponseWriter, r *http.Request) { // Account the transfer - accounting.Stats.Transferring(d.DirRemote) - defer accounting.Stats.DoneTransferring(d.DirRemote, true) + tr := accounting.Stats.NewTransferRemoteSize(d.DirRemote, -1) + defer tr.Done(nil) fs.Infof(d.DirRemote, "%s: Serving directory", r.RemoteAddr) diff --git a/cmd/serve/httplib/serve/serve.go b/cmd/serve/httplib/serve/serve.go index ef7b019b1..dba1ca8fa 100644 --- a/cmd/serve/httplib/serve/serve.go +++ b/cmd/serve/httplib/serve/serve.go @@ -75,22 +75,11 @@ func Object(w http.ResponseWriter, r *http.Request, o fs.Object) { http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) return } - accounting.Stats.Transferring(o.Remote()) - in := accounting.NewAccount(file, o) // account the transfer (no buffering) + tr := accounting.Stats.NewTransfer(o) defer func() { - closeErr := in.Close() - if closeErr != nil { - fs.Errorf(o, "Get request: close failed: %v", closeErr) - if err == nil { - err = closeErr - } - } - ok := err == nil - accounting.Stats.DoneTransferring(o.Remote(), ok) - if !ok { - accounting.Stats.Error(err) - } + tr.Done(err) }() + in := tr.Account(file) // account the transfer (no buffering) w.WriteHeader(code) diff --git a/fs/accounting/accounting.go b/fs/accounting/accounting.go index 0e6b766ec..4d3c2af4a 100644 --- a/fs/accounting/accounting.go +++ b/fs/accounting/accounting.go @@ -20,6 +20,7 @@ var ErrorMaxTransferLimitReached = fserrors.FatalError(errors.New("Max transfer // Account limits and accounts for one transfer type Account struct { + stats *StatsInfo // The mutex is to make sure Read() and Close() aren't called // concurrently. Unfortunately the persistent connection loop // in http transport calls Read() after Do() returns on @@ -45,10 +46,11 @@ type Account struct { const averagePeriod = 16 // period to do exponentially weighted averages over -// NewAccountSizeName makes a Account reader for an io.ReadCloser of +// newAccountSizeName makes a Account reader for an io.ReadCloser of // the given size and name -func NewAccountSizeName(in io.ReadCloser, size int64, name string) *Account { +func newAccountSizeName(stats *StatsInfo, in io.ReadCloser, size int64, name string) *Account { acc := &Account{ + stats: stats, in: in, close: in, origIn: in, @@ -60,15 +62,10 @@ func NewAccountSizeName(in io.ReadCloser, size int64, name string) *Account { max: int64(fs.Config.MaxTransfer), } go acc.averageLoop() - Stats.inProgress.set(acc.name, acc) + stats.inProgress.set(acc.name, acc) return acc } -// NewAccount makes a Account reader for an object -func NewAccount(in io.ReadCloser, obj fs.Object) *Account { - return NewAccountSizeName(in, obj.Size(), obj.Remote()) -} - // WithBuffer - If the file is above a certain size it adds an Async reader func (acc *Account) WithBuffer() *Account { acc.withBuf = true @@ -157,7 +154,7 @@ func (acc *Account) averageLoop() { // Check the read is valid func (acc *Account) checkRead() (err error) { acc.statmu.Lock() - if acc.max >= 0 && Stats.GetBytes() >= acc.max { + if acc.max >= 0 && acc.stats.GetBytes() >= acc.max { acc.statmu.Unlock() return ErrorMaxTransferLimitReached } @@ -177,7 +174,7 @@ func (acc *Account) accountRead(n int) { acc.bytes += int64(n) acc.statmu.Unlock() - Stats.Bytes(int64(n)) + acc.stats.Bytes(int64(n)) limitBandwidth(n) } @@ -219,7 +216,7 @@ func (acc *Account) Close() error { } acc.closed = true close(acc.exit) - Stats.inProgress.clear(acc.name) + acc.stats.inProgress.clear(acc.name) if acc.close == nil { return nil } diff --git a/fs/accounting/accounting_test.go b/fs/accounting/accounting_test.go index 0767745f7..55b864dae 100644 --- a/fs/accounting/accounting_test.go +++ b/fs/accounting/accounting_test.go @@ -12,7 +12,6 @@ import ( "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs/asyncreader" "github.com/ncw/rclone/fs/fserrors" - "github.com/ncw/rclone/fstest/mockobject" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -27,36 +26,27 @@ var ( func TestNewAccountSizeName(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - acc := NewAccountSizeName(in, 1, "test") + stats := NewStats() + acc := newAccountSizeName(stats, in, 1, "test") assert.Equal(t, in, acc.in) - assert.Equal(t, acc, Stats.inProgress.get("test")) + assert.Equal(t, acc, stats.inProgress.get("test")) err := acc.Close() assert.NoError(t, err) - assert.Nil(t, Stats.inProgress.get("test")) -} - -func TestNewAccount(t *testing.T) { - obj := mockobject.Object("test") - in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - acc := NewAccount(in, obj) - assert.Equal(t, in, acc.in) - assert.Equal(t, acc, Stats.inProgress.get("test")) - err := acc.Close() - assert.NoError(t, err) - assert.Nil(t, Stats.inProgress.get("test")) + assert.Nil(t, stats.inProgress.get("test")) } func TestAccountWithBuffer(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - acc := NewAccountSizeName(in, -1, "test") + stats := NewStats() + acc := newAccountSizeName(stats, in, -1, "test") acc.WithBuffer() // should have a buffer for an unknown size _, ok := acc.in.(*asyncreader.AsyncReader) require.True(t, ok) assert.NoError(t, acc.Close()) - acc = NewAccountSizeName(in, 1, "test") + acc = newAccountSizeName(stats, in, 1, "test") acc.WithBuffer() // should not have a buffer for a small size _, ok = acc.in.(*asyncreader.AsyncReader) @@ -66,7 +56,8 @@ func TestAccountWithBuffer(t *testing.T) { func TestAccountGetUpdateReader(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - acc := NewAccountSizeName(in, 1, "test") + stats := NewStats() + acc := newAccountSizeName(stats, in, 1, "test") assert.Equal(t, in, acc.GetReader()) @@ -80,12 +71,13 @@ func TestAccountGetUpdateReader(t *testing.T) { func TestAccountRead(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) - acc := NewAccountSizeName(in, 1, "test") + stats := NewStats() + acc := newAccountSizeName(stats, in, 1, "test") assert.True(t, acc.start.IsZero()) assert.Equal(t, 0, acc.lpBytes) assert.Equal(t, int64(0), acc.bytes) - assert.Equal(t, int64(0), Stats.bytes) + assert.Equal(t, int64(0), stats.bytes) var buf = make([]byte, 2) n, err := acc.Read(buf) @@ -96,7 +88,7 @@ func TestAccountRead(t *testing.T) { assert.False(t, acc.start.IsZero()) assert.Equal(t, 2, acc.lpBytes) assert.Equal(t, int64(2), acc.bytes) - assert.Equal(t, int64(2), Stats.bytes) + assert.Equal(t, int64(2), stats.bytes) n, err = acc.Read(buf) assert.NoError(t, err) @@ -112,7 +104,8 @@ func TestAccountRead(t *testing.T) { func TestAccountString(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) - acc := NewAccountSizeName(in, 3, "test") + stats := NewStats() + acc := newAccountSizeName(stats, in, 3, "test") // FIXME not an exhaustive test! @@ -131,7 +124,8 @@ func TestAccountString(t *testing.T) { // Test the Accounter interface methods on Account and accountStream func TestAccountAccounter(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) - acc := NewAccountSizeName(in, 3, "test") + stats := NewStats() + acc := newAccountSizeName(stats, in, 3, "test") assert.True(t, in == acc.OldStream()) @@ -192,10 +186,10 @@ func TestAccountMaxTransfer(t *testing.T) { defer func() { fs.Config.MaxTransfer = old }() - Stats.ResetCounters() in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100))) - acc := NewAccountSizeName(in, 1, "test") + stats := NewStats() + acc := newAccountSizeName(stats, in, 1, "test") var b = make([]byte, 10) diff --git a/fs/accounting/stats.go b/fs/accounting/stats.go index 0dbcfaf5b..743940780 100644 --- a/fs/accounting/stats.go +++ b/fs/accounting/stats.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "sort" "strings" "sync" "time" @@ -87,8 +88,8 @@ type StatsInfo struct { renameQueue int renameQueueSize int64 deletes int64 - start time.Time inProgress *inProgress + startedTransfers []*Transfer } // NewStats creates an initialised StatsInfo @@ -96,7 +97,6 @@ func NewStats() *StatsInfo { return &StatsInfo{ checking: newStringSet(fs.Config.Checkers, "checking"), transferring: newStringSet(fs.Config.Transfers, "transferring"), - start: time.Now(), inProgress: newInProgress(), } } @@ -105,7 +105,7 @@ func NewStats() *StatsInfo { func (s *StatsInfo) RemoteStats(ctx context.Context, in rc.Params) (out rc.Params, err error) { out = make(rc.Params) s.mu.RLock() - dt := time.Now().Sub(s.start) + dt := s.totalDuration() dtSeconds := dt.Seconds() speed := 0.0 if dt > 0 { @@ -149,6 +149,52 @@ func (s *StatsInfo) RemoteStats(ctx context.Context, in rc.Params) (out rc.Param return out, nil } +type timeRange struct { + start time.Time + end time.Time +} + +// Total duration is union of durations of all transfers belonging to this +// object. +// Needs to be protected by mutex. +func (s *StatsInfo) totalDuration() time.Duration { + now := time.Now() + // Extract time ranges of all transfers. + timeRanges := make([]timeRange, len(s.startedTransfers)) + for i := range s.startedTransfers { + start, end := s.startedTransfers[i].TimeRange() + if end.IsZero() { + end = now + } + timeRanges[i] = timeRange{start, end} + } + + // Sort by the starting time. + sort.Slice(timeRanges, func(i, j int) bool { + return timeRanges[i].start.Before(timeRanges[j].start) + }) + + // Merge overlaps and add distinctive ranges together for total. + var total time.Duration + var i, j = 0, 1 + for i < len(timeRanges) { + if j < len(timeRanges)-1 { + if timeRanges[j].start.Before(timeRanges[i].end) { + if timeRanges[i].end.Before(timeRanges[j].end) { + timeRanges[i].end = timeRanges[j].end + } + j++ + continue + } + } + total += timeRanges[i].end.Sub(timeRanges[i].start) + i = j + j++ + } + + return total +} + // eta returns the ETA of the current operation, // rounded to full seconds. // If the ETA cannot be determined 'ok' returns false. @@ -195,7 +241,7 @@ func (s *StatsInfo) String() string { s.mu.RLock() - dt := time.Now().Sub(s.start) + dt := s.totalDuration() dtSeconds := dt.Seconds() speed := 0.0 if dt > 0 { @@ -456,9 +502,16 @@ func (s *StatsInfo) GetTransfers() int64 { return s.transfers } -// Transferring adds a transfer into the stats -func (s *StatsInfo) Transferring(remote string) { +// NewTransfer adds a transfer to the stats from the object. +func (s *StatsInfo) NewTransfer(obj fs.Object) *Transfer { + s.transferring.add(obj.Remote()) + return newTransfer(s, obj) +} + +// NewTransferRemoteSize adds a transfer to the stats based on remote and size. +func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64) *Transfer { s.transferring.add(remote) + return newTransferRemoteSize(s, remote, size) } // DoneTransferring removes a transfer from the stats @@ -496,3 +549,10 @@ func (s *StatsInfo) SetRenameQueue(n int, size int64) { s.renameQueueSize = size s.mu.Unlock() } + +// AddTransfer adds reference to the started transfer. +func (s *StatsInfo) AddTransfer(transfer *Transfer) { + s.mu.Lock() + s.startedTransfers = append(s.startedTransfers, transfer) + s.mu.Unlock() +} diff --git a/fs/accounting/stats_test.go b/fs/accounting/stats_test.go index 8db584ed6..6141e083b 100644 --- a/fs/accounting/stats_test.go +++ b/fs/accounting/stats_test.go @@ -128,3 +128,54 @@ func TestStatsError(t *testing.T) { assert.False(t, s.HadRetryError()) assert.Equal(t, time.Time{}, s.RetryAfter()) } + +func TestStatsTotalDuration(t *testing.T) { + time1 := time.Now().Add(-40 * time.Second) + time2 := time1.Add(10 * time.Second) + time3 := time2.Add(10 * time.Second) + time4 := time3.Add(10 * time.Second) + s := NewStats() + s.AddTransfer(&Transfer{ + startedAt: time2, + completedAt: time3, + }) + s.AddTransfer(&Transfer{ + startedAt: time2, + completedAt: time2.Add(time.Second), + }) + s.AddTransfer(&Transfer{ + startedAt: time1, + completedAt: time3, + }) + s.AddTransfer(&Transfer{ + startedAt: time3, + completedAt: time4, + }) + s.AddTransfer(&Transfer{ + startedAt: time.Now(), + }) + + time.Sleep(time.Millisecond) + + s.mu.Lock() + total := s.totalDuration() + s.mu.Unlock() + + assert.True(t, 30*time.Second < total && total < 31*time.Second, total) +} + +func TestStatsTotalDuration2(t *testing.T) { + time1 := time.Now().Add(-40 * time.Second) + time2 := time1.Add(10 * time.Second) + s := NewStats() + s.AddTransfer(&Transfer{ + startedAt: time1, + completedAt: time2, + }) + + s.mu.Lock() + total := s.totalDuration() + s.mu.Unlock() + + assert.Equal(t, 10*time.Second, total) +} diff --git a/fs/accounting/transfer.go b/fs/accounting/transfer.go new file mode 100644 index 000000000..4ac1d53c6 --- /dev/null +++ b/fs/accounting/transfer.go @@ -0,0 +1,72 @@ +package accounting + +import ( + "io" + "sync" + "time" + + "github.com/ncw/rclone/fs" +) + +// Transfer keeps track of initiated transfers and provides access to +// accounting functions. +// Transfer needs to be closed on completion. +type Transfer struct { + stats *StatsInfo + acc *Account + remote string + size int64 + + mu sync.Mutex + startedAt time.Time + completedAt time.Time +} + +// newTransfer instantiates new transfer +func newTransfer(stats *StatsInfo, obj fs.Object) *Transfer { + return newTransferRemoteSize(stats, obj.Remote(), obj.Size()) +} + +func newTransferRemoteSize(stats *StatsInfo, remote string, size int64) *Transfer { + tr := &Transfer{ + stats: stats, + remote: remote, + size: size, + startedAt: time.Now(), + } + stats.AddTransfer(tr) + return tr +} + +// Done ends the transfer. +// Must be called after transfer is finished to run proper cleanups. +func (tr *Transfer) Done(err error) { + if err != nil { + tr.stats.Error(err) + } + if tr.acc != nil { + if err := tr.acc.Close(); err != nil { + fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "can't close account: %+v\n", err) + } + } + tr.stats.DoneTransferring(tr.remote, err == nil) + tr.mu.Lock() + tr.completedAt = time.Now() + tr.mu.Unlock() +} + +// Account returns reader that knows how to keep track of transfer progress. +func (tr *Transfer) Account(in io.ReadCloser) *Account { + if tr.acc != nil { + return tr.acc + } + return newAccountSizeName(tr.stats, in, tr.size, tr.remote) +} + +// TimeRange returns the time transfer started and ended at. If not completed +// it will return zero time for end time. +func (tr *Transfer) TimeRange() (time.Time, time.Time) { + tr.mu.Lock() + defer tr.mu.Unlock() + return tr.startedAt, tr.completedAt +} diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index 56d7677ab..dfa8d683a 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -110,7 +110,7 @@ func (mc *multiThreadCopyState) calculateChunks() { } // Copy src to (f, remote) using streams download threads and the OpenWriterAt feature -func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, streams int) (newDst fs.Object, err error) { +func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, streams int, tr *accounting.Transfer) (newDst fs.Object, err error) { openWriterAt := f.Features().OpenWriterAt if openWriterAt == nil { return nil, errors.New("multi-thread copy: OpenWriterAt not supported") @@ -132,8 +132,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, mc.calculateChunks() // Make accounting - mc.acc = accounting.NewAccount(nil, src) - defer fs.CheckClose(mc.acc, &err) + mc.acc = tr.Account(nil) // create write file handle mc.wc, err = openWriterAt(gCtx, remote, mc.size) diff --git a/fs/operations/multithread_test.go b/fs/operations/multithread_test.go index cd1883fb9..e27662005 100644 --- a/fs/operations/multithread_test.go +++ b/fs/operations/multithread_test.go @@ -5,6 +5,8 @@ import ( "fmt" "testing" + "github.com/ncw/rclone/fs/accounting" + "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fstest" "github.com/stretchr/testify/assert" @@ -49,6 +51,7 @@ func TestMultithreadCopy(t *testing.T) { {size: multithreadChunkSize*2 + 1, streams: 2}, } { t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { + var err error contents := fstest.RandomString(test.size) t1 := fstest.Time("2001-02-03T04:05:06.499999999Z") file1 := r.WriteObject(context.Background(), "file1", contents, t1) @@ -57,8 +60,13 @@ func TestMultithreadCopy(t *testing.T) { src, err := r.Fremote.NewObject(context.Background(), "file1") require.NoError(t, err) + accounting.Stats.ResetCounters() + tr := accounting.Stats.NewTransfer(src) - dst, err := multiThreadCopy(context.Background(), r.Flocal, "file1", src, 2) + defer func() { + tr.Done(err) + }() + dst, err := multiThreadCopy(context.Background(), r.Flocal, "file1", src, 2, tr) require.NoError(t, err) assert.Equal(t, src.Size(), dst.Size()) assert.Equal(t, "file1", dst.Remote()) diff --git a/fs/operations/operations.go b/fs/operations/operations.go index 3529a847a..a22cc78a5 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -251,9 +251,9 @@ var _ fs.MimeTyper = (*overrideRemoteObject)(nil) // It returns the destination object if possible. Note that this may // be nil. func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { - accounting.Stats.Transferring(src.Remote()) + tr := accounting.Stats.NewTransfer(src) defer func() { - accounting.Stats.DoneTransferring(src.Remote(), err == nil) + tr.Done(err) }() newDst = dst if fs.Config.DryRun { @@ -304,7 +304,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj if streams < 2 { streams = 2 } - dst, err = multiThreadCopy(ctx, f, remote, src, int(streams)) + dst, err = multiThreadCopy(ctx, f, remote, src, int(streams), tr) if doUpdate { actionTaken = "Multi-thread Copied (replaced existing)" } else { @@ -326,7 +326,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj dst, err = Rcat(ctx, f, remote, in0, src.ModTime(ctx)) newDst = dst } else { - in := accounting.NewAccount(in0, src).WithBuffer() // account and buffer the transfer + in := tr.Account(in0).WithBuffer() // account and buffer the transfer var wrappedSrc fs.ObjectInfo = src // We try to pass the original object if possible if src.Remote() != remote { @@ -854,17 +854,25 @@ func CheckIdentical(ctx context.Context, dst, src fs.Object) (differ bool, err e if err != nil { return true, errors.Wrapf(err, "failed to open %q", dst) } - in1 = accounting.NewAccount(in1, dst).WithBuffer() // account and buffer the transfer - defer fs.CheckClose(in1, &err) + tr1 := accounting.Stats.NewTransfer(dst) + defer func() { + tr1.Done(err) + }() + in1 = tr1.Account(in1).WithBuffer() // account and buffer the transfer in2, err := src.Open(ctx) if err != nil { return true, errors.Wrapf(err, "failed to open %q", src) } - in2 = accounting.NewAccount(in2, src).WithBuffer() // account and buffer the transfer - defer fs.CheckClose(in2, &err) + tr2 := accounting.Stats.NewTransfer(dst) + defer func() { + tr2.Done(err) + }() + in2 = tr2.Account(in2).WithBuffer() // account and buffer the transfer - return CheckEqualReaders(in1, in2) + // To assign err variable before defer. + differ, err = CheckEqualReaders(in1, in2) + return } // CheckDownload checks the files in fsrc and fdst according to Size @@ -1159,9 +1167,9 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error { var mu sync.Mutex return ListFn(ctx, f, func(o fs.Object) { var err error - accounting.Stats.Transferring(o.Remote()) + tr := accounting.Stats.NewTransfer(o) defer func() { - accounting.Stats.DoneTransferring(o.Remote(), err == nil) + tr.Done(err) }() opt := fs.RangeOption{Start: offset, End: -1} size := o.Size() @@ -1183,19 +1191,8 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error { } if count >= 0 { in = &readCloser{Reader: &io.LimitedReader{R: in, N: count}, Closer: in} - // reduce remaining size to count - if size > count { - size = count - } } - in = accounting.NewAccountSizeName(in, size, o.Remote()).WithBuffer() // account and buffer the transfer - defer func() { - err = in.Close() - if err != nil { - fs.CountError(err) - fs.Errorf(o, "Failed to close: %v", err) - } - }() + in = tr.Account(in).WithBuffer() // account and buffer the transfer // take the lock just before we output stuff, so at the last possible moment mu.Lock() defer mu.Unlock() @@ -1209,14 +1206,11 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error { // Rcat reads data from the Reader until EOF and uploads it to a file on remote func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time) (dst fs.Object, err error) { - accounting.Stats.Transferring(dstFileName) - in = accounting.NewAccountSizeName(in, -1, dstFileName).WithBuffer() + tr := accounting.Stats.NewTransferRemoteSize(dstFileName, -1) defer func() { - accounting.Stats.DoneTransferring(dstFileName, err == nil) - if otherErr := in.Close(); otherErr != nil { - fs.Debugf(fdst, "Rcat: failed to close source: %v", err) - } + tr.Done(err) }() + in = tr.Account(in).WithBuffer() hashOption := &fs.HashesOption{Hashes: fdst.Hashes()} hash, err := hash.NewMultiHasherTypes(fdst.Hashes()) @@ -1531,10 +1525,14 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo var obj fs.Object if size >= 0 { + var err error // Size known use Put - accounting.Stats.Transferring(dstFileName) - body := ioutil.NopCloser(in) // we let the server close the body - in := accounting.NewAccountSizeName(body, size, dstFileName) // account the transfer (no buffering) + tr := accounting.Stats.NewTransferRemoteSize(dstFileName, size) + defer func() { + tr.Done(err) + }() + body := ioutil.NopCloser(in) // we let the server close the body + in := tr.Account(body) // account the transfer (no buffering) if fs.Config.DryRun { fs.Logf("stdin", "Not uploading as --dry-run") @@ -1543,15 +1541,6 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo return nil, err } - var err error - defer func() { - closeErr := in.Close() - if closeErr != nil { - accounting.Stats.Error(closeErr) - fs.Errorf(dstFileName, "Post request: close failed: %v", closeErr) - } - accounting.Stats.DoneTransferring(dstFileName, err == nil) - }() info := object.NewStaticObjectInfo(dstFileName, modTime, size, true, nil, fdst) obj, err = fdst.Put(ctx, in, info) if err != nil { @@ -1675,14 +1664,15 @@ func moveOrCopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName str } return errors.Wrap(err, "error while attempting to move file to a temporary location") } - accounting.Stats.Transferring(srcFileName) + tr := accounting.Stats.NewTransfer(srcObj) + defer func() { + tr.Done(err) + }() tmpObj, err := Op(ctx, fdst, nil, tmpObjName, srcObj) if err != nil { - accounting.Stats.DoneTransferring(srcFileName, false) return errors.Wrap(err, "error while moving file to temporary location") } _, err = Op(ctx, fdst, nil, dstFileName, tmpObj) - accounting.Stats.DoneTransferring(srcFileName, err == nil) return err } diff --git a/vfs/read.go b/vfs/read.go index 49c2a10f3..fc3107cf8 100644 --- a/vfs/read.go +++ b/vfs/read.go @@ -16,6 +16,7 @@ import ( // ReadFileHandle is an open for read file handle on a File type ReadFileHandle struct { baseHandle + done func(err error) mu sync.Mutex closed bool // set if handle has been closed r *accounting.Account @@ -70,9 +71,11 @@ func (fh *ReadFileHandle) openPending() (err error) { if err != nil { return err } - fh.r = accounting.NewAccount(r, o).WithBuffer() // account the transfer + tr := accounting.Stats.NewTransfer(o) + fh.done = tr.Done + fh.r = tr.Account(r).WithBuffer() // account the transfer fh.opened = true - accounting.Stats.Transferring(o.Remote()) + return nil } @@ -347,9 +350,12 @@ func (fh *ReadFileHandle) close() error { fh.closed = true if fh.opened { - accounting.Stats.DoneTransferring(fh.remote, true) + var err error + defer func() { + fh.done(err) + }() // Close first so that we have hashes - err := fh.r.Close() + err = fh.r.Close() if err != nil { return err } diff --git a/vfs/read_write.go b/vfs/read_write.go index c3cc5a3eb..84a57a74e 100644 --- a/vfs/read_write.go +++ b/vfs/read_write.go @@ -87,9 +87,11 @@ func newRWFileHandle(d *Dir, f *File, remote string, flags int) (fh *RWFileHandl // copy an object to or from the remote while accounting for it func copyObj(f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { if operations.NeedTransfer(context.TODO(), dst, src) { - accounting.Stats.Transferring(src.Remote()) + tr := accounting.Stats.NewTransfer(src) + defer func() { + tr.Done(err) + }() newDst, err = operations.Copy(context.TODO(), f, dst, remote, src) - accounting.Stats.DoneTransferring(src.Remote(), err == nil) } else { newDst = dst }