From be0464f5f16523b33f36962c53048b6bb003d1cb Mon Sep 17 00:00:00 2001 From: Aleksandar Jankovic Date: Tue, 16 Jul 2019 13:56:20 +0200 Subject: [PATCH] accounting: change stats interface This is done to make clear ownership over accounting object and prepare for removing global stats object. Stats elapsed time calculation has been altered to account for actual transfer time instead of stats creation time. --- cmd/serve/ftp/ftp.go | 10 ++-- cmd/serve/http/http.go | 4 +- cmd/serve/httplib/serve/dir.go | 4 +- cmd/serve/httplib/serve/serve.go | 17 ++----- fs/accounting/accounting.go | 19 ++++---- fs/accounting/accounting_test.go | 44 ++++++++--------- fs/accounting/stats.go | 72 +++++++++++++++++++++++++--- fs/accounting/stats_test.go | 51 ++++++++++++++++++++ fs/accounting/transfer.go | 72 ++++++++++++++++++++++++++++ fs/operations/multithread.go | 5 +- fs/operations/multithread_test.go | 10 +++- fs/operations/operations.go | 78 ++++++++++++++----------------- vfs/read.go | 14 ++++-- vfs/read_write.go | 6 ++- 14 files changed, 288 insertions(+), 118 deletions(-) create mode 100644 fs/accounting/transfer.go diff --git a/cmd/serve/ftp/ftp.go b/cmd/serve/ftp/ftp.go index 4f836bcd9..c601e47d9 100644 --- a/cmd/serve/ftp/ftp.go +++ b/cmd/serve/ftp/ftp.go @@ -214,8 +214,10 @@ func (d *Driver) ListDir(path string, callback func(ftp.FileInfo) error) (err er } // Account the transfer - accounting.Stats.Transferring(path) - defer accounting.Stats.DoneTransferring(path, true) + tr := accounting.Stats.NewTransferRemoteSize(path, node.Size()) + defer func() { + tr.Done(err) + }() for _, file := range dirEntries { err = callback(&FileInfo{file, file.Mode(), d.vfs.Opt.UID, d.vfs.Opt.GID}) @@ -311,8 +313,8 @@ func (d *Driver) GetFile(path string, offset int64) (size int64, fr io.ReadClose } // Account the transfer - accounting.Stats.Transferring(path) - defer accounting.Stats.DoneTransferring(path, true) + tr := accounting.Stats.NewTransferRemoteSize(path, node.Size()) + defer tr.Done(nil) return node.Size(), handle, nil } diff --git a/cmd/serve/http/http.go b/cmd/serve/http/http.go index a170f11e2..b8c8f393f 100644 --- a/cmd/serve/http/http.go +++ b/cmd/serve/http/http.go @@ -187,8 +187,8 @@ func (s *server) serveFile(w http.ResponseWriter, r *http.Request, remote string }() // Account the transfer - accounting.Stats.Transferring(remote) - defer accounting.Stats.DoneTransferring(remote, true) + tr := accounting.Stats.NewTransfer(obj) + defer tr.Done(nil) // FIXME in = fs.NewAccount(in, obj).WithBuffer() // account the transfer // Serve the file diff --git a/cmd/serve/httplib/serve/dir.go b/cmd/serve/httplib/serve/dir.go index 4156d9f33..36ea35cc7 100644 --- a/cmd/serve/httplib/serve/dir.go +++ b/cmd/serve/httplib/serve/dir.go @@ -75,8 +75,8 @@ func Error(what interface{}, w http.ResponseWriter, text string, err error) { // Serve serves a directory func (d *Directory) Serve(w http.ResponseWriter, r *http.Request) { // Account the transfer - accounting.Stats.Transferring(d.DirRemote) - defer accounting.Stats.DoneTransferring(d.DirRemote, true) + tr := accounting.Stats.NewTransferRemoteSize(d.DirRemote, -1) + defer tr.Done(nil) fs.Infof(d.DirRemote, "%s: Serving directory", r.RemoteAddr) diff --git a/cmd/serve/httplib/serve/serve.go b/cmd/serve/httplib/serve/serve.go index ef7b019b1..dba1ca8fa 100644 --- a/cmd/serve/httplib/serve/serve.go +++ b/cmd/serve/httplib/serve/serve.go @@ -75,22 +75,11 @@ func Object(w http.ResponseWriter, r *http.Request, o fs.Object) { http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) return } - accounting.Stats.Transferring(o.Remote()) - in := accounting.NewAccount(file, o) // account the transfer (no buffering) + tr := accounting.Stats.NewTransfer(o) defer func() { - closeErr := in.Close() - if closeErr != nil { - fs.Errorf(o, "Get request: close failed: %v", closeErr) - if err == nil { - err = closeErr - } - } - ok := err == nil - accounting.Stats.DoneTransferring(o.Remote(), ok) - if !ok { - accounting.Stats.Error(err) - } + tr.Done(err) }() + in := tr.Account(file) // account the transfer (no buffering) w.WriteHeader(code) diff --git a/fs/accounting/accounting.go b/fs/accounting/accounting.go index 0e6b766ec..4d3c2af4a 100644 --- a/fs/accounting/accounting.go +++ b/fs/accounting/accounting.go @@ -20,6 +20,7 @@ var ErrorMaxTransferLimitReached = fserrors.FatalError(errors.New("Max transfer // Account limits and accounts for one transfer type Account struct { + stats *StatsInfo // The mutex is to make sure Read() and Close() aren't called // concurrently. Unfortunately the persistent connection loop // in http transport calls Read() after Do() returns on @@ -45,10 +46,11 @@ type Account struct { const averagePeriod = 16 // period to do exponentially weighted averages over -// NewAccountSizeName makes a Account reader for an io.ReadCloser of +// newAccountSizeName makes a Account reader for an io.ReadCloser of // the given size and name -func NewAccountSizeName(in io.ReadCloser, size int64, name string) *Account { +func newAccountSizeName(stats *StatsInfo, in io.ReadCloser, size int64, name string) *Account { acc := &Account{ + stats: stats, in: in, close: in, origIn: in, @@ -60,15 +62,10 @@ func NewAccountSizeName(in io.ReadCloser, size int64, name string) *Account { max: int64(fs.Config.MaxTransfer), } go acc.averageLoop() - Stats.inProgress.set(acc.name, acc) + stats.inProgress.set(acc.name, acc) return acc } -// NewAccount makes a Account reader for an object -func NewAccount(in io.ReadCloser, obj fs.Object) *Account { - return NewAccountSizeName(in, obj.Size(), obj.Remote()) -} - // WithBuffer - If the file is above a certain size it adds an Async reader func (acc *Account) WithBuffer() *Account { acc.withBuf = true @@ -157,7 +154,7 @@ func (acc *Account) averageLoop() { // Check the read is valid func (acc *Account) checkRead() (err error) { acc.statmu.Lock() - if acc.max >= 0 && Stats.GetBytes() >= acc.max { + if acc.max >= 0 && acc.stats.GetBytes() >= acc.max { acc.statmu.Unlock() return ErrorMaxTransferLimitReached } @@ -177,7 +174,7 @@ func (acc *Account) accountRead(n int) { acc.bytes += int64(n) acc.statmu.Unlock() - Stats.Bytes(int64(n)) + acc.stats.Bytes(int64(n)) limitBandwidth(n) } @@ -219,7 +216,7 @@ func (acc *Account) Close() error { } acc.closed = true close(acc.exit) - Stats.inProgress.clear(acc.name) + acc.stats.inProgress.clear(acc.name) if acc.close == nil { return nil } diff --git a/fs/accounting/accounting_test.go b/fs/accounting/accounting_test.go index 0767745f7..55b864dae 100644 --- a/fs/accounting/accounting_test.go +++ b/fs/accounting/accounting_test.go @@ -12,7 +12,6 @@ import ( "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs/asyncreader" "github.com/ncw/rclone/fs/fserrors" - "github.com/ncw/rclone/fstest/mockobject" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -27,36 +26,27 @@ var ( func TestNewAccountSizeName(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - acc := NewAccountSizeName(in, 1, "test") + stats := NewStats() + acc := newAccountSizeName(stats, in, 1, "test") assert.Equal(t, in, acc.in) - assert.Equal(t, acc, Stats.inProgress.get("test")) + assert.Equal(t, acc, stats.inProgress.get("test")) err := acc.Close() assert.NoError(t, err) - assert.Nil(t, Stats.inProgress.get("test")) -} - -func TestNewAccount(t *testing.T) { - obj := mockobject.Object("test") - in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - acc := NewAccount(in, obj) - assert.Equal(t, in, acc.in) - assert.Equal(t, acc, Stats.inProgress.get("test")) - err := acc.Close() - assert.NoError(t, err) - assert.Nil(t, Stats.inProgress.get("test")) + assert.Nil(t, stats.inProgress.get("test")) } func TestAccountWithBuffer(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - acc := NewAccountSizeName(in, -1, "test") + stats := NewStats() + acc := newAccountSizeName(stats, in, -1, "test") acc.WithBuffer() // should have a buffer for an unknown size _, ok := acc.in.(*asyncreader.AsyncReader) require.True(t, ok) assert.NoError(t, acc.Close()) - acc = NewAccountSizeName(in, 1, "test") + acc = newAccountSizeName(stats, in, 1, "test") acc.WithBuffer() // should not have a buffer for a small size _, ok = acc.in.(*asyncreader.AsyncReader) @@ -66,7 +56,8 @@ func TestAccountWithBuffer(t *testing.T) { func TestAccountGetUpdateReader(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - acc := NewAccountSizeName(in, 1, "test") + stats := NewStats() + acc := newAccountSizeName(stats, in, 1, "test") assert.Equal(t, in, acc.GetReader()) @@ -80,12 +71,13 @@ func TestAccountGetUpdateReader(t *testing.T) { func TestAccountRead(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) - acc := NewAccountSizeName(in, 1, "test") + stats := NewStats() + acc := newAccountSizeName(stats, in, 1, "test") assert.True(t, acc.start.IsZero()) assert.Equal(t, 0, acc.lpBytes) assert.Equal(t, int64(0), acc.bytes) - assert.Equal(t, int64(0), Stats.bytes) + assert.Equal(t, int64(0), stats.bytes) var buf = make([]byte, 2) n, err := acc.Read(buf) @@ -96,7 +88,7 @@ func TestAccountRead(t *testing.T) { assert.False(t, acc.start.IsZero()) assert.Equal(t, 2, acc.lpBytes) assert.Equal(t, int64(2), acc.bytes) - assert.Equal(t, int64(2), Stats.bytes) + assert.Equal(t, int64(2), stats.bytes) n, err = acc.Read(buf) assert.NoError(t, err) @@ -112,7 +104,8 @@ func TestAccountRead(t *testing.T) { func TestAccountString(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) - acc := NewAccountSizeName(in, 3, "test") + stats := NewStats() + acc := newAccountSizeName(stats, in, 3, "test") // FIXME not an exhaustive test! @@ -131,7 +124,8 @@ func TestAccountString(t *testing.T) { // Test the Accounter interface methods on Account and accountStream func TestAccountAccounter(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) - acc := NewAccountSizeName(in, 3, "test") + stats := NewStats() + acc := newAccountSizeName(stats, in, 3, "test") assert.True(t, in == acc.OldStream()) @@ -192,10 +186,10 @@ func TestAccountMaxTransfer(t *testing.T) { defer func() { fs.Config.MaxTransfer = old }() - Stats.ResetCounters() in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100))) - acc := NewAccountSizeName(in, 1, "test") + stats := NewStats() + acc := newAccountSizeName(stats, in, 1, "test") var b = make([]byte, 10) diff --git a/fs/accounting/stats.go b/fs/accounting/stats.go index 0dbcfaf5b..743940780 100644 --- a/fs/accounting/stats.go +++ b/fs/accounting/stats.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "sort" "strings" "sync" "time" @@ -87,8 +88,8 @@ type StatsInfo struct { renameQueue int renameQueueSize int64 deletes int64 - start time.Time inProgress *inProgress + startedTransfers []*Transfer } // NewStats creates an initialised StatsInfo @@ -96,7 +97,6 @@ func NewStats() *StatsInfo { return &StatsInfo{ checking: newStringSet(fs.Config.Checkers, "checking"), transferring: newStringSet(fs.Config.Transfers, "transferring"), - start: time.Now(), inProgress: newInProgress(), } } @@ -105,7 +105,7 @@ func NewStats() *StatsInfo { func (s *StatsInfo) RemoteStats(ctx context.Context, in rc.Params) (out rc.Params, err error) { out = make(rc.Params) s.mu.RLock() - dt := time.Now().Sub(s.start) + dt := s.totalDuration() dtSeconds := dt.Seconds() speed := 0.0 if dt > 0 { @@ -149,6 +149,52 @@ func (s *StatsInfo) RemoteStats(ctx context.Context, in rc.Params) (out rc.Param return out, nil } +type timeRange struct { + start time.Time + end time.Time +} + +// Total duration is union of durations of all transfers belonging to this +// object. +// Needs to be protected by mutex. +func (s *StatsInfo) totalDuration() time.Duration { + now := time.Now() + // Extract time ranges of all transfers. + timeRanges := make([]timeRange, len(s.startedTransfers)) + for i := range s.startedTransfers { + start, end := s.startedTransfers[i].TimeRange() + if end.IsZero() { + end = now + } + timeRanges[i] = timeRange{start, end} + } + + // Sort by the starting time. + sort.Slice(timeRanges, func(i, j int) bool { + return timeRanges[i].start.Before(timeRanges[j].start) + }) + + // Merge overlaps and add distinctive ranges together for total. + var total time.Duration + var i, j = 0, 1 + for i < len(timeRanges) { + if j < len(timeRanges)-1 { + if timeRanges[j].start.Before(timeRanges[i].end) { + if timeRanges[i].end.Before(timeRanges[j].end) { + timeRanges[i].end = timeRanges[j].end + } + j++ + continue + } + } + total += timeRanges[i].end.Sub(timeRanges[i].start) + i = j + j++ + } + + return total +} + // eta returns the ETA of the current operation, // rounded to full seconds. // If the ETA cannot be determined 'ok' returns false. @@ -195,7 +241,7 @@ func (s *StatsInfo) String() string { s.mu.RLock() - dt := time.Now().Sub(s.start) + dt := s.totalDuration() dtSeconds := dt.Seconds() speed := 0.0 if dt > 0 { @@ -456,9 +502,16 @@ func (s *StatsInfo) GetTransfers() int64 { return s.transfers } -// Transferring adds a transfer into the stats -func (s *StatsInfo) Transferring(remote string) { +// NewTransfer adds a transfer to the stats from the object. +func (s *StatsInfo) NewTransfer(obj fs.Object) *Transfer { + s.transferring.add(obj.Remote()) + return newTransfer(s, obj) +} + +// NewTransferRemoteSize adds a transfer to the stats based on remote and size. +func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64) *Transfer { s.transferring.add(remote) + return newTransferRemoteSize(s, remote, size) } // DoneTransferring removes a transfer from the stats @@ -496,3 +549,10 @@ func (s *StatsInfo) SetRenameQueue(n int, size int64) { s.renameQueueSize = size s.mu.Unlock() } + +// AddTransfer adds reference to the started transfer. +func (s *StatsInfo) AddTransfer(transfer *Transfer) { + s.mu.Lock() + s.startedTransfers = append(s.startedTransfers, transfer) + s.mu.Unlock() +} diff --git a/fs/accounting/stats_test.go b/fs/accounting/stats_test.go index 8db584ed6..6141e083b 100644 --- a/fs/accounting/stats_test.go +++ b/fs/accounting/stats_test.go @@ -128,3 +128,54 @@ func TestStatsError(t *testing.T) { assert.False(t, s.HadRetryError()) assert.Equal(t, time.Time{}, s.RetryAfter()) } + +func TestStatsTotalDuration(t *testing.T) { + time1 := time.Now().Add(-40 * time.Second) + time2 := time1.Add(10 * time.Second) + time3 := time2.Add(10 * time.Second) + time4 := time3.Add(10 * time.Second) + s := NewStats() + s.AddTransfer(&Transfer{ + startedAt: time2, + completedAt: time3, + }) + s.AddTransfer(&Transfer{ + startedAt: time2, + completedAt: time2.Add(time.Second), + }) + s.AddTransfer(&Transfer{ + startedAt: time1, + completedAt: time3, + }) + s.AddTransfer(&Transfer{ + startedAt: time3, + completedAt: time4, + }) + s.AddTransfer(&Transfer{ + startedAt: time.Now(), + }) + + time.Sleep(time.Millisecond) + + s.mu.Lock() + total := s.totalDuration() + s.mu.Unlock() + + assert.True(t, 30*time.Second < total && total < 31*time.Second, total) +} + +func TestStatsTotalDuration2(t *testing.T) { + time1 := time.Now().Add(-40 * time.Second) + time2 := time1.Add(10 * time.Second) + s := NewStats() + s.AddTransfer(&Transfer{ + startedAt: time1, + completedAt: time2, + }) + + s.mu.Lock() + total := s.totalDuration() + s.mu.Unlock() + + assert.Equal(t, 10*time.Second, total) +} diff --git a/fs/accounting/transfer.go b/fs/accounting/transfer.go new file mode 100644 index 000000000..4ac1d53c6 --- /dev/null +++ b/fs/accounting/transfer.go @@ -0,0 +1,72 @@ +package accounting + +import ( + "io" + "sync" + "time" + + "github.com/ncw/rclone/fs" +) + +// Transfer keeps track of initiated transfers and provides access to +// accounting functions. +// Transfer needs to be closed on completion. +type Transfer struct { + stats *StatsInfo + acc *Account + remote string + size int64 + + mu sync.Mutex + startedAt time.Time + completedAt time.Time +} + +// newTransfer instantiates new transfer +func newTransfer(stats *StatsInfo, obj fs.Object) *Transfer { + return newTransferRemoteSize(stats, obj.Remote(), obj.Size()) +} + +func newTransferRemoteSize(stats *StatsInfo, remote string, size int64) *Transfer { + tr := &Transfer{ + stats: stats, + remote: remote, + size: size, + startedAt: time.Now(), + } + stats.AddTransfer(tr) + return tr +} + +// Done ends the transfer. +// Must be called after transfer is finished to run proper cleanups. +func (tr *Transfer) Done(err error) { + if err != nil { + tr.stats.Error(err) + } + if tr.acc != nil { + if err := tr.acc.Close(); err != nil { + fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "can't close account: %+v\n", err) + } + } + tr.stats.DoneTransferring(tr.remote, err == nil) + tr.mu.Lock() + tr.completedAt = time.Now() + tr.mu.Unlock() +} + +// Account returns reader that knows how to keep track of transfer progress. +func (tr *Transfer) Account(in io.ReadCloser) *Account { + if tr.acc != nil { + return tr.acc + } + return newAccountSizeName(tr.stats, in, tr.size, tr.remote) +} + +// TimeRange returns the time transfer started and ended at. If not completed +// it will return zero time for end time. +func (tr *Transfer) TimeRange() (time.Time, time.Time) { + tr.mu.Lock() + defer tr.mu.Unlock() + return tr.startedAt, tr.completedAt +} diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index 56d7677ab..dfa8d683a 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -110,7 +110,7 @@ func (mc *multiThreadCopyState) calculateChunks() { } // Copy src to (f, remote) using streams download threads and the OpenWriterAt feature -func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, streams int) (newDst fs.Object, err error) { +func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, streams int, tr *accounting.Transfer) (newDst fs.Object, err error) { openWriterAt := f.Features().OpenWriterAt if openWriterAt == nil { return nil, errors.New("multi-thread copy: OpenWriterAt not supported") @@ -132,8 +132,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, mc.calculateChunks() // Make accounting - mc.acc = accounting.NewAccount(nil, src) - defer fs.CheckClose(mc.acc, &err) + mc.acc = tr.Account(nil) // create write file handle mc.wc, err = openWriterAt(gCtx, remote, mc.size) diff --git a/fs/operations/multithread_test.go b/fs/operations/multithread_test.go index cd1883fb9..e27662005 100644 --- a/fs/operations/multithread_test.go +++ b/fs/operations/multithread_test.go @@ -5,6 +5,8 @@ import ( "fmt" "testing" + "github.com/ncw/rclone/fs/accounting" + "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fstest" "github.com/stretchr/testify/assert" @@ -49,6 +51,7 @@ func TestMultithreadCopy(t *testing.T) { {size: multithreadChunkSize*2 + 1, streams: 2}, } { t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { + var err error contents := fstest.RandomString(test.size) t1 := fstest.Time("2001-02-03T04:05:06.499999999Z") file1 := r.WriteObject(context.Background(), "file1", contents, t1) @@ -57,8 +60,13 @@ func TestMultithreadCopy(t *testing.T) { src, err := r.Fremote.NewObject(context.Background(), "file1") require.NoError(t, err) + accounting.Stats.ResetCounters() + tr := accounting.Stats.NewTransfer(src) - dst, err := multiThreadCopy(context.Background(), r.Flocal, "file1", src, 2) + defer func() { + tr.Done(err) + }() + dst, err := multiThreadCopy(context.Background(), r.Flocal, "file1", src, 2, tr) require.NoError(t, err) assert.Equal(t, src.Size(), dst.Size()) assert.Equal(t, "file1", dst.Remote()) diff --git a/fs/operations/operations.go b/fs/operations/operations.go index 3529a847a..a22cc78a5 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -251,9 +251,9 @@ var _ fs.MimeTyper = (*overrideRemoteObject)(nil) // It returns the destination object if possible. Note that this may // be nil. func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { - accounting.Stats.Transferring(src.Remote()) + tr := accounting.Stats.NewTransfer(src) defer func() { - accounting.Stats.DoneTransferring(src.Remote(), err == nil) + tr.Done(err) }() newDst = dst if fs.Config.DryRun { @@ -304,7 +304,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj if streams < 2 { streams = 2 } - dst, err = multiThreadCopy(ctx, f, remote, src, int(streams)) + dst, err = multiThreadCopy(ctx, f, remote, src, int(streams), tr) if doUpdate { actionTaken = "Multi-thread Copied (replaced existing)" } else { @@ -326,7 +326,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj dst, err = Rcat(ctx, f, remote, in0, src.ModTime(ctx)) newDst = dst } else { - in := accounting.NewAccount(in0, src).WithBuffer() // account and buffer the transfer + in := tr.Account(in0).WithBuffer() // account and buffer the transfer var wrappedSrc fs.ObjectInfo = src // We try to pass the original object if possible if src.Remote() != remote { @@ -854,17 +854,25 @@ func CheckIdentical(ctx context.Context, dst, src fs.Object) (differ bool, err e if err != nil { return true, errors.Wrapf(err, "failed to open %q", dst) } - in1 = accounting.NewAccount(in1, dst).WithBuffer() // account and buffer the transfer - defer fs.CheckClose(in1, &err) + tr1 := accounting.Stats.NewTransfer(dst) + defer func() { + tr1.Done(err) + }() + in1 = tr1.Account(in1).WithBuffer() // account and buffer the transfer in2, err := src.Open(ctx) if err != nil { return true, errors.Wrapf(err, "failed to open %q", src) } - in2 = accounting.NewAccount(in2, src).WithBuffer() // account and buffer the transfer - defer fs.CheckClose(in2, &err) + tr2 := accounting.Stats.NewTransfer(dst) + defer func() { + tr2.Done(err) + }() + in2 = tr2.Account(in2).WithBuffer() // account and buffer the transfer - return CheckEqualReaders(in1, in2) + // To assign err variable before defer. + differ, err = CheckEqualReaders(in1, in2) + return } // CheckDownload checks the files in fsrc and fdst according to Size @@ -1159,9 +1167,9 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error { var mu sync.Mutex return ListFn(ctx, f, func(o fs.Object) { var err error - accounting.Stats.Transferring(o.Remote()) + tr := accounting.Stats.NewTransfer(o) defer func() { - accounting.Stats.DoneTransferring(o.Remote(), err == nil) + tr.Done(err) }() opt := fs.RangeOption{Start: offset, End: -1} size := o.Size() @@ -1183,19 +1191,8 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error { } if count >= 0 { in = &readCloser{Reader: &io.LimitedReader{R: in, N: count}, Closer: in} - // reduce remaining size to count - if size > count { - size = count - } } - in = accounting.NewAccountSizeName(in, size, o.Remote()).WithBuffer() // account and buffer the transfer - defer func() { - err = in.Close() - if err != nil { - fs.CountError(err) - fs.Errorf(o, "Failed to close: %v", err) - } - }() + in = tr.Account(in).WithBuffer() // account and buffer the transfer // take the lock just before we output stuff, so at the last possible moment mu.Lock() defer mu.Unlock() @@ -1209,14 +1206,11 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error { // Rcat reads data from the Reader until EOF and uploads it to a file on remote func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time) (dst fs.Object, err error) { - accounting.Stats.Transferring(dstFileName) - in = accounting.NewAccountSizeName(in, -1, dstFileName).WithBuffer() + tr := accounting.Stats.NewTransferRemoteSize(dstFileName, -1) defer func() { - accounting.Stats.DoneTransferring(dstFileName, err == nil) - if otherErr := in.Close(); otherErr != nil { - fs.Debugf(fdst, "Rcat: failed to close source: %v", err) - } + tr.Done(err) }() + in = tr.Account(in).WithBuffer() hashOption := &fs.HashesOption{Hashes: fdst.Hashes()} hash, err := hash.NewMultiHasherTypes(fdst.Hashes()) @@ -1531,10 +1525,14 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo var obj fs.Object if size >= 0 { + var err error // Size known use Put - accounting.Stats.Transferring(dstFileName) - body := ioutil.NopCloser(in) // we let the server close the body - in := accounting.NewAccountSizeName(body, size, dstFileName) // account the transfer (no buffering) + tr := accounting.Stats.NewTransferRemoteSize(dstFileName, size) + defer func() { + tr.Done(err) + }() + body := ioutil.NopCloser(in) // we let the server close the body + in := tr.Account(body) // account the transfer (no buffering) if fs.Config.DryRun { fs.Logf("stdin", "Not uploading as --dry-run") @@ -1543,15 +1541,6 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo return nil, err } - var err error - defer func() { - closeErr := in.Close() - if closeErr != nil { - accounting.Stats.Error(closeErr) - fs.Errorf(dstFileName, "Post request: close failed: %v", closeErr) - } - accounting.Stats.DoneTransferring(dstFileName, err == nil) - }() info := object.NewStaticObjectInfo(dstFileName, modTime, size, true, nil, fdst) obj, err = fdst.Put(ctx, in, info) if err != nil { @@ -1675,14 +1664,15 @@ func moveOrCopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName str } return errors.Wrap(err, "error while attempting to move file to a temporary location") } - accounting.Stats.Transferring(srcFileName) + tr := accounting.Stats.NewTransfer(srcObj) + defer func() { + tr.Done(err) + }() tmpObj, err := Op(ctx, fdst, nil, tmpObjName, srcObj) if err != nil { - accounting.Stats.DoneTransferring(srcFileName, false) return errors.Wrap(err, "error while moving file to temporary location") } _, err = Op(ctx, fdst, nil, dstFileName, tmpObj) - accounting.Stats.DoneTransferring(srcFileName, err == nil) return err } diff --git a/vfs/read.go b/vfs/read.go index 49c2a10f3..fc3107cf8 100644 --- a/vfs/read.go +++ b/vfs/read.go @@ -16,6 +16,7 @@ import ( // ReadFileHandle is an open for read file handle on a File type ReadFileHandle struct { baseHandle + done func(err error) mu sync.Mutex closed bool // set if handle has been closed r *accounting.Account @@ -70,9 +71,11 @@ func (fh *ReadFileHandle) openPending() (err error) { if err != nil { return err } - fh.r = accounting.NewAccount(r, o).WithBuffer() // account the transfer + tr := accounting.Stats.NewTransfer(o) + fh.done = tr.Done + fh.r = tr.Account(r).WithBuffer() // account the transfer fh.opened = true - accounting.Stats.Transferring(o.Remote()) + return nil } @@ -347,9 +350,12 @@ func (fh *ReadFileHandle) close() error { fh.closed = true if fh.opened { - accounting.Stats.DoneTransferring(fh.remote, true) + var err error + defer func() { + fh.done(err) + }() // Close first so that we have hashes - err := fh.r.Close() + err = fh.r.Close() if err != nil { return err } diff --git a/vfs/read_write.go b/vfs/read_write.go index c3cc5a3eb..84a57a74e 100644 --- a/vfs/read_write.go +++ b/vfs/read_write.go @@ -87,9 +87,11 @@ func newRWFileHandle(d *Dir, f *File, remote string, flags int) (fh *RWFileHandl // copy an object to or from the remote while accounting for it func copyObj(f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { if operations.NeedTransfer(context.TODO(), dst, src) { - accounting.Stats.Transferring(src.Remote()) + tr := accounting.Stats.NewTransfer(src) + defer func() { + tr.Done(err) + }() newDst, err = operations.Copy(context.TODO(), f, dst, remote, src) - accounting.Stats.DoneTransferring(src.Remote(), err == nil) } else { newDst = dst }