diff --git a/cmd/serve/httplib/serve/serve.go b/cmd/serve/httplib/serve/serve.go index 89c4f696d..fd0cdb5b8 100644 --- a/cmd/serve/httplib/serve/serve.go +++ b/cmd/serve/httplib/serve/serve.go @@ -79,7 +79,7 @@ func Object(w http.ResponseWriter, r *http.Request, o fs.Object) { defer func() { tr.Done(err) }() - in := tr.Account(file) // account the transfer (no buffering) + in := tr.Account(r.Context(), file) // account the transfer (no buffering) w.WriteHeader(code) diff --git a/fs/accounting/accounting.go b/fs/accounting/accounting.go index 71aff851a..2686f2ebf 100644 --- a/fs/accounting/accounting.go +++ b/fs/accounting/accounting.go @@ -36,6 +36,7 @@ type Account struct { // shouldn't. mu sync.Mutex // mutex protects these values in io.Reader + ctx context.Context // current context for transfer - may change origIn io.ReadCloser close io.Closer size int64 @@ -64,10 +65,11 @@ const averagePeriod = 16 // period to do exponentially weighted averages over // newAccountSizeName makes an Account reader for an io.ReadCloser of // the given size and name -func newAccountSizeName(stats *StatsInfo, in io.ReadCloser, size int64, name string) *Account { +func newAccountSizeName(ctx context.Context, stats *StatsInfo, in io.ReadCloser, size int64, name string) *Account { acc := &Account{ stats: stats, in: in, + ctx: ctx, close: in, origIn: in, size: size, @@ -160,7 +162,7 @@ func (acc *Account) Abandon() { // UpdateReader updates the underlying io.ReadCloser stopping the // async buffer (if any) and re-adding it -func (acc *Account) UpdateReader(in io.ReadCloser) { +func (acc *Account) UpdateReader(ctx context.Context, in io.ReadCloser) { acc.mu.Lock() withBuf := acc.withBuf if withBuf { @@ -168,6 +170,7 @@ func (acc *Account) UpdateReader(in io.ReadCloser) { acc.withBuf = false } acc.in = in + acc.ctx = ctx acc.close = in acc.origIn = in acc.closed = false diff --git a/fs/accounting/accounting_test.go b/fs/accounting/accounting_test.go index bfee18a26..8161522f2 100644 --- a/fs/accounting/accounting_test.go +++ b/fs/accounting/accounting_test.go @@ -2,6 +2,7 @@ package accounting import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -29,7 +30,7 @@ var ( func TestNewAccountSizeName(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) stats := NewStats() - acc := newAccountSizeName(stats, in, 1, "test") + acc := newAccountSizeName(context.Background(), stats, in, 1, "test") assert.Equal(t, in, acc.in) assert.Equal(t, acc, stats.inProgress.get("test")) err := acc.Close() @@ -44,7 +45,7 @@ func TestAccountWithBuffer(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) stats := NewStats() - acc := newAccountSizeName(stats, in, -1, "test") + acc := newAccountSizeName(context.Background(), stats, in, -1, "test") assert.False(t, acc.HasBuffer()) acc.WithBuffer() assert.True(t, acc.HasBuffer()) @@ -53,7 +54,7 @@ func TestAccountWithBuffer(t *testing.T) { require.True(t, ok) assert.NoError(t, acc.Close()) - acc = newAccountSizeName(stats, in, 1, "test") + acc = newAccountSizeName(context.Background(), stats, in, 1, "test") acc.WithBuffer() // should not have a buffer for a small size _, ok = acc.in.(*asyncreader.AsyncReader) @@ -66,7 +67,7 @@ func TestAccountGetUpdateReader(t *testing.T) { return func(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) stats := NewStats() - acc := newAccountSizeName(stats, in, 1, "test") + acc := newAccountSizeName(context.Background(), stats, in, 1, "test") assert.Equal(t, in, acc.GetReader()) assert.Equal(t, acc, stats.inProgress.get("test")) @@ -77,7 +78,7 @@ func TestAccountGetUpdateReader(t *testing.T) { } in2 := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - acc.UpdateReader(in2) + acc.UpdateReader(context.Background(), in2) assert.Equal(t, in2, acc.GetReader()) assert.Equal(t, acc, stats.inProgress.get("test")) @@ -92,7 +93,7 @@ func TestAccountGetUpdateReader(t *testing.T) { func TestAccountRead(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) stats := NewStats() - acc := newAccountSizeName(stats, in, 1, "test") + acc := newAccountSizeName(context.Background(), stats, in, 1, "test") assert.True(t, acc.values.start.IsZero()) acc.values.mu.Lock() @@ -133,7 +134,7 @@ func testAccountWriteTo(t *testing.T, withBuffer bool) { } in := ioutil.NopCloser(bytes.NewBuffer(buf)) stats := NewStats() - acc := newAccountSizeName(stats, in, int64(len(buf)), "test") + acc := newAccountSizeName(context.Background(), stats, in, int64(len(buf)), "test") if withBuffer { acc = acc.WithBuffer() } @@ -173,7 +174,7 @@ func TestAccountWriteToWithBuffer(t *testing.T) { func TestAccountString(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) stats := NewStats() - acc := newAccountSizeName(stats, in, 3, "test") + acc := newAccountSizeName(context.Background(), stats, in, 3, "test") // FIXME not an exhaustive test! @@ -193,7 +194,7 @@ func TestAccountString(t *testing.T) { func TestAccountAccounter(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) stats := NewStats() - acc := newAccountSizeName(stats, in, 3, "test") + acc := newAccountSizeName(context.Background(), stats, in, 3, "test") assert.True(t, in == acc.OldStream()) @@ -260,7 +261,7 @@ func TestAccountMaxTransfer(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100))) stats := NewStats() - acc := newAccountSizeName(stats, in, 1, "test") + acc := newAccountSizeName(context.Background(), stats, in, 1, "test") var b = make([]byte, 10) @@ -277,7 +278,7 @@ func TestAccountMaxTransfer(t *testing.T) { fs.Config.CutoffMode = fs.CutoffModeSoft stats = NewStats() - acc = newAccountSizeName(stats, in, 1, "test") + acc = newAccountSizeName(context.Background(), stats, in, 1, "test") n, err = acc.Read(b) assert.Equal(t, 10, n) @@ -302,7 +303,7 @@ func TestAccountMaxTransferWriteTo(t *testing.T) { in := ioutil.NopCloser(readers.NewPatternReader(1024)) stats := NewStats() - acc := newAccountSizeName(stats, in, 1, "test") + acc := newAccountSizeName(context.Background(), stats, in, 1, "test") var b bytes.Buffer diff --git a/fs/accounting/transfer.go b/fs/accounting/transfer.go index 8234d2730..8365a4a0a 100644 --- a/fs/accounting/transfer.go +++ b/fs/accounting/transfer.go @@ -1,6 +1,7 @@ package accounting import ( + "context" "encoding/json" "io" "sync" @@ -135,12 +136,12 @@ func (tr *Transfer) Reset() { } // Account returns reader that knows how to keep track of transfer progress. -func (tr *Transfer) Account(in io.ReadCloser) *Account { +func (tr *Transfer) Account(ctx context.Context, in io.ReadCloser) *Account { tr.mu.Lock() if tr.acc == nil { - tr.acc = newAccountSizeName(tr.stats, in, tr.size, tr.remote) + tr.acc = newAccountSizeName(ctx, tr.stats, in, tr.size, tr.remote) } else { - tr.acc.UpdateReader(in) + tr.acc.UpdateReader(ctx, in) } tr.mu.Unlock() return tr.acc diff --git a/fs/operations/check.go b/fs/operations/check.go index eded6f825..dc2f99c07 100644 --- a/fs/operations/check.go +++ b/fs/operations/check.go @@ -318,7 +318,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo defer func() { tr1.Done(nil) // error handling is done by the caller }() - in1 = tr1.Account(in1).WithBuffer() // account and buffer the transfer + in1 = tr1.Account(ctx, in1).WithBuffer() // account and buffer the transfer in2, err := src.Open(ctx) if err != nil { @@ -328,7 +328,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo defer func() { tr2.Done(nil) // error handling is done by the caller }() - in2 = tr2.Account(in2).WithBuffer() // account and buffer the transfer + in2 = tr2.Account(ctx, in2).WithBuffer() // account and buffer the transfer // To assign err variable before defer. differ, err = CheckEqualReaders(in1, in2) diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index c5bed49bf..85b91bd74 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -158,7 +158,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, mc.calculateChunks() // Make accounting - mc.acc = tr.Account(nil) + mc.acc = tr.Account(ctx, nil) // create write file handle mc.wc, err = openWriterAt(gCtx, remote, mc.size) diff --git a/fs/operations/operations.go b/fs/operations/operations.go index e785198de..2d3a65108 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -366,7 +366,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj return nil, accounting.ErrorMaxTransferLimitReachedFatal } if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && f.Features().ServerSideAcrossConfigs)) { - in := tr.Account(nil) // account the transfer + in := tr.Account(ctx, nil) // account the transfer in.ServerSideCopyStart() newDst, err = doCopy(ctx, src, remote) if err == nil { @@ -421,7 +421,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj dst, err = Rcat(ctx, f, remote, in0, src.ModTime(ctx)) newDst = dst } else { - in := tr.Account(in0).WithBuffer() // account and buffer the transfer + in := tr.Account(ctx, in0).WithBuffer() // account and buffer the transfer var wrappedSrc fs.ObjectInfo = src // We try to pass the original object if possible if src.Remote() != remote { @@ -1054,7 +1054,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error { if count >= 0 { in = &readCloser{Reader: &io.LimitedReader{R: in, N: count}, Closer: in} } - in = tr.Account(in).WithBuffer() // account and buffer the transfer + in = tr.Account(ctx, in).WithBuffer() // account and buffer the transfer // take the lock just before we output stuff, so at the last possible moment mu.Lock() defer mu.Unlock() @@ -1072,7 +1072,7 @@ func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, defer func() { tr.Done(err) }() - in = tr.Account(in).WithBuffer() + in = tr.Account(ctx, in).WithBuffer() readCounter := readers.NewCountingReader(in) var trackingIn io.Reader @@ -1420,7 +1420,7 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo tr.Done(err) }() body := ioutil.NopCloser(in) // we let the server close the body - in := tr.Account(body) // account the transfer (no buffering) + in := tr.Account(ctx, body) // account the transfer (no buffering) if SkipDestructive(ctx, dstFileName, "upload from pipe") { // prevents "broken pipe" errors diff --git a/vfs/read.go b/vfs/read.go index 532728b8a..dcbae07d7 100644 --- a/vfs/read.go +++ b/vfs/read.go @@ -79,7 +79,7 @@ func (fh *ReadFileHandle) openPending() (err error) { } tr := accounting.GlobalStats().NewTransfer(o) fh.done = tr.Done - fh.r = tr.Account(r).WithBuffer() // account the transfer + fh.r = tr.Account(context.TODO(), r).WithBuffer() // account the transfer fh.opened = true return nil @@ -158,7 +158,7 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) { return err } } - fh.r.UpdateReader(r) + fh.r.UpdateReader(context.TODO(), r) fh.offset = offset return nil } diff --git a/vfs/vfscache/downloaders/downloaders.go b/vfs/vfscache/downloaders/downloaders.go index e5d3d3ce5..d8dfba5af 100644 --- a/vfs/vfscache/downloaders/downloaders.go +++ b/vfs/vfscache/downloaders/downloaders.go @@ -495,7 +495,7 @@ func (dl *downloader) open(offset int64) (err error) { if err != nil { return errors.Wrap(err, "vfs reader: failed to open source file") } - dl.in = dl.tr.Account(in0).WithBuffer() // account and buffer the transfer + dl.in = dl.tr.Account(dl.dls.ctx, in0).WithBuffer() // account and buffer the transfer dl.offset = offset