pikpak: correct file transfer progress for uploads by hash

Pikpak can accelerate file uploads by leveraging existing content 
in its storage (identified by a custom hash called gcid). 
Previously, file transfer statistics were incorrect for uploads 
without outbound traffic as the input stream remained unchanged.

This commit addresses the issue by:

* Removing unnecessary unwrapping/wrapping of accountings 
before/after gcid calculation, leading immediate AccountRead() on buffering.
* Correctly tracking file transfer statistics for uploads 
with no incoming/outgoing traffic by marking them as Server Side Copies.

This change ensures correct statistics tracking and improves overall user experience.
This commit is contained in:
wiserain 2024-07-20 21:50:08 +09:00 committed by GitHub
parent b3edc9d360
commit 31fabb3402
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1248,6 +1248,12 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, leaf, dirID, gcid string,
return nil, fmt.Errorf("invalid response: %+v", new) return nil, fmt.Errorf("invalid response: %+v", new)
} else if new.File.Phase == api.PhaseTypeComplete { } else if new.File.Phase == api.PhaseTypeComplete {
// early return; in case of zero-byte objects // early return; in case of zero-byte objects
if acc, ok := in.(*accounting.Account); ok && acc != nil {
// if `in io.Reader` is still in type of `*accounting.Account` (meaning that it is unused)
// it is considered as a server side copy as no incoming/outgoing traffic occur at all
acc.ServerSideTransferStart()
acc.ServerSideCopyEnd(size)
}
return new.File, nil return new.File, nil
} }
@ -1711,18 +1717,12 @@ func (o *Object) upload(ctx context.Context, in io.Reader, src fs.ObjectInfo, wi
return fmt.Errorf("failed to calculate gcid: %w", err) return fmt.Errorf("failed to calculate gcid: %w", err)
} }
} else { } else {
// unwrap the accounting from the input, we use wrap to put it
// back on after the buffering
var wrap accounting.WrapFn
in, wrap = accounting.UnWrap(in)
var cleanup func() var cleanup func()
gcid, in, cleanup, err = readGcid(in, size, int64(o.fs.opt.HashMemoryThreshold)) gcid, in, cleanup, err = readGcid(in, size, int64(o.fs.opt.HashMemoryThreshold))
defer cleanup() defer cleanup()
if err != nil { if err != nil {
return fmt.Errorf("failed to calculate gcid: %w", err) return fmt.Errorf("failed to calculate gcid: %w", err)
} }
// Wrap the accounting back onto the stream
in = wrap(in)
} }
} }
fs.Debugf(o, "gcid = %s", gcid) fs.Debugf(o, "gcid = %s", gcid)