From 7f15cc95565b0d9e7115a98a07404d05f35a1024 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 13 Feb 2020 14:27:50 +0000 Subject: [PATCH] operations: make ReOpen and NewReOpen public for re-use elsewhere --- fs/operations/multithread.go | 2 +- fs/operations/operations.go | 2 +- fs/operations/reopen.go | 16 ++++++++-------- fs/operations/reopen_test.go | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index 1f615d4b2..54c228583 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -71,7 +71,7 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v starting", stream+1, mc.streams, start, end, fs.SizeSuffix(end-start)) - rc, err := newReOpen(ctx, mc.src, nil, &fs.RangeOption{Start: start, End: end - 1}, fs.Config.LowLevelRetries) + rc, err := NewReOpen(ctx, mc.src, nil, &fs.RangeOption{Start: start, End: end - 1}, fs.Config.LowLevelRetries) if err != nil { return errors.Wrap(err, "multpart copy: failed to open source") } diff --git a/fs/operations/operations.go b/fs/operations/operations.go index 129ea7ca4..2d3a9046e 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -403,7 +403,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj } } else { var in0 io.ReadCloser - in0, err = newReOpen(ctx, src, hashOption, nil, fs.Config.LowLevelRetries) + in0, err = NewReOpen(ctx, src, hashOption, nil, fs.Config.LowLevelRetries) if err != nil { err = errors.Wrap(err, "failed to open source object") } else { diff --git a/fs/operations/reopen.go b/fs/operations/reopen.go index 879bffee3..bd45f96a5 100644 --- a/fs/operations/reopen.go +++ b/fs/operations/reopen.go @@ -10,8 +10,8 @@ import ( "github.com/rclone/rclone/fs/fserrors" ) -// reOpen is a wrapper for an object reader which reopens the stream on error -type reOpen struct { +// ReOpen is a wrapper for an object reader which reopens the stream on error +type ReOpen struct { ctx context.Context mu sync.Mutex // mutex to protect the below src fs.Object // object to open @@ -30,14 +30,14 @@ var ( errorTooManyTries = errors.New("failed to reopen: too many retries") ) -// newReOpen makes a handle which will reopen itself and seek to where it was on errors +// NewReOpen makes a handle which will reopen itself and seek to where it was on errors // // If hashOption is set this will be applied when reading from the start // // If rangeOption is set then this will applied when reading from the // start, and updated on retries. -func newReOpen(ctx context.Context, src fs.Object, hashOption *fs.HashesOption, rangeOption *fs.RangeOption, maxTries int) (rc io.ReadCloser, err error) { - h := &reOpen{ +func NewReOpen(ctx context.Context, src fs.Object, hashOption *fs.HashesOption, rangeOption *fs.RangeOption, maxTries int) (rc io.ReadCloser, err error) { + h := &ReOpen{ ctx: ctx, src: src, hashOption: hashOption, @@ -56,7 +56,7 @@ func newReOpen(ctx context.Context, src fs.Object, hashOption *fs.HashesOption, // open the underlying handle - call with lock held // // we don't retry here as the Open() call will itself have low level retries -func (h *reOpen) open() error { +func (h *ReOpen) open() error { var optsArray [2]fs.OpenOption var opts = optsArray[:0] if h.read == 0 { @@ -93,7 +93,7 @@ func (h *reOpen) open() error { } // Read bytes retrying as necessary -func (h *reOpen) Read(p []byte) (n int, err error) { +func (h *ReOpen) Read(p []byte) (n int, err error) { h.mu.Lock() defer h.mu.Unlock() if h.err != nil { @@ -119,7 +119,7 @@ func (h *reOpen) Read(p []byte) (n int, err error) { } // Close the stream -func (h *reOpen) Close() error { +func (h *ReOpen) Close() error { h.mu.Lock() defer h.mu.Unlock() if !h.opened { diff --git a/fs/operations/reopen_test.go b/fs/operations/reopen_test.go index 17d773fa1..dac33e3cd 100644 --- a/fs/operations/reopen_test.go +++ b/fs/operations/reopen_test.go @@ -15,7 +15,7 @@ import ( ) // check interface -var _ io.ReadCloser = (*reOpen)(nil) +var _ io.ReadCloser = (*ReOpen)(nil) var errorTestError = errors.New("test error") @@ -74,7 +74,7 @@ func TestReOpen(t *testing.T) { breaks: breaks, } hashOption := &fs.HashesOption{Hashes: hash.NewHashSet(hash.MD5)} - return newReOpen(context.Background(), src, hashOption, rangeOption, maxRetries) + return NewReOpen(context.Background(), src, hashOption, rangeOption, maxRetries) } t.Run("Basics", func(t *testing.T) {