operations: run hashing operations in parallel #3419

Before this change for a post copy Hash check we would run the hashes sequentially.

Now we run the hashes in parallel for a useful speedup.

Note that this refactors the hash check in Copy to use the standard
hash checking routine.
This commit is contained in:
Nick Craig-Wood 2019-08-10 10:28:26 +01:00
parent 402aaca7fe
commit ae9c0e56c8

View File

@ -51,30 +51,46 @@ func CheckHashes(ctx context.Context, src fs.ObjectInfo, dst fs.Object) (equal b
if common.Count() == 0 {
return true, hash.None, nil
}
ht = common.GetOne()
srcHash, err := src.Hash(ctx, ht)
equal, ht, _, _, err = checkHashes(ctx, src, dst, common.GetOne())
return equal, ht, err
}
// checkHashes does the work of CheckHashes but takes a hash.Type and
// returns the effective hash type used.
func checkHashes(ctx context.Context, src fs.ObjectInfo, dst fs.Object, ht hash.Type) (equal bool, htOut hash.Type, srcHash, dstHash string, err error) {
// Calculate hashes in parallel
g, ctx := errgroup.WithContext(ctx)
g.Go(func() (err error) {
srcHash, err = src.Hash(ctx, ht)
if err != nil {
fs.CountError(err)
fs.Errorf(src, "Failed to calculate src hash: %v", err)
}
return err
})
g.Go(func() (err error) {
dstHash, err = dst.Hash(ctx, ht)
if err != nil {
fs.CountError(err)
fs.Errorf(dst, "Failed to calculate dst hash: %v", err)
}
return err
})
err = g.Wait()
if err != nil {
fs.CountError(err)
fs.Errorf(src, "Failed to calculate src hash: %v", err)
return false, ht, err
return false, ht, srcHash, dstHash, err
}
if srcHash == "" {
return true, hash.None, nil
}
dstHash, err := dst.Hash(ctx, ht)
if err != nil {
fs.CountError(err)
fs.Errorf(dst, "Failed to calculate dst hash: %v", err)
return false, ht, err
return true, hash.None, srcHash, dstHash, nil
}
if dstHash == "" {
return true, hash.None, nil
return true, hash.None, srcHash, dstHash, nil
}
if srcHash != dstHash {
fs.Debugf(src, "%v = %s (%v)", ht, srcHash, src.Fs())
fs.Debugf(dst, "%v = %s (%v)", ht, dstHash, dst.Fs())
}
return srcHash == dstHash, ht, nil
return srcHash == dstHash, ht, srcHash, dstHash, nil
}
// Equal checks to see if the src and dst objects are equal by looking at
@ -377,24 +393,14 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
// Verify hashes are the same after transfer - ignoring blank hashes
if !fs.Config.IgnoreChecksum && hashType != hash.None {
var srcSum string
srcSum, err = src.Hash(ctx, hashType)
if err != nil {
// checkHashes has logged and counted errors
equal, _, srcSum, dstSum, _ := checkHashes(ctx, src, dst, hashType)
if !equal {
err = errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, srcSum, dstSum)
fs.Errorf(dst, "%v", err)
fs.CountError(err)
fs.Errorf(src, "Failed to read src hash: %v", err)
} else if srcSum != "" {
var dstSum string
dstSum, err = dst.Hash(ctx, hashType)
if err != nil {
fs.CountError(err)
fs.Errorf(dst, "Failed to read hash: %v", err)
} else if !hash.Equals(srcSum, dstSum) {
err = errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, srcSum, dstSum)
fs.Errorf(dst, "%v", err)
fs.CountError(err)
removeFailedCopy(ctx, dst)
return newDst, err
}
removeFailedCopy(ctx, dst)
return newDst, err
}
}