mirror of
https://github.com/rclone/rclone.git
synced 2024-12-11 19:53:46 +08:00
76b7bcd4d7
Before this change, in the event of a retryable error, bisync would always retry the maximum number of times allowed by the `--retries` flag, even if one of the retries was successful. This change fixes the issue, so that bisync moves on after the first successful retry.
365 lines
10 KiB
Go
365 lines
10 KiB
Go
package bisync
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"sort"
|
|
mutex "sync" // renamed as "sync" already in use
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/cmd/bisync/bilib"
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/accounting"
|
|
"github.com/rclone/rclone/fs/filter"
|
|
"github.com/rclone/rclone/fs/hash"
|
|
"github.com/rclone/rclone/fs/operations"
|
|
"github.com/rclone/rclone/fs/sync"
|
|
"github.com/rclone/rclone/lib/terminal"
|
|
)
|
|
|
|
// Results represents a pair of synced files, as reported by the LoggerFn
|
|
// Bisync uses this to determine what happened during the sync, and modify the listings accordingly
|
|
type Results struct {
|
|
Src string
|
|
Dst string
|
|
Name string
|
|
AltName string
|
|
Size int64
|
|
Modtime time.Time
|
|
Hash string
|
|
Flags string
|
|
Sigil operations.Sigil
|
|
Err error
|
|
Winner operations.Winner
|
|
IsWinner bool
|
|
IsSrc bool
|
|
IsDst bool
|
|
Origin string
|
|
}
|
|
|
|
// ResultsSlice is a slice of Results (obviously)
|
|
type ResultsSlice []Results
|
|
|
|
func (rs *ResultsSlice) has(name string) bool {
|
|
for _, r := range *rs {
|
|
if r.Name == name {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
var logger = operations.NewLoggerOpt()
|
|
var lock mutex.Mutex
|
|
var once mutex.Once
|
|
var ignoreListingChecksum bool
|
|
var ignoreListingModtime bool
|
|
var hashTypes map[string]hash.Type
|
|
var queueCI *fs.ConfigInfo
|
|
|
|
// allows us to get the right hashtype during the LoggerFn without knowing whether it's Path1/Path2
|
|
func getHashType(fname string) hash.Type {
|
|
ht, ok := hashTypes[fname]
|
|
if ok {
|
|
return ht
|
|
}
|
|
return hash.None
|
|
}
|
|
|
|
// FsPathIfAny handles type assertions and returns a formatted bilib.FsPath if valid, otherwise ""
|
|
func FsPathIfAny(x fs.DirEntry) string {
|
|
obj, ok := x.(fs.Object)
|
|
if x != nil && ok {
|
|
return bilib.FsPath(obj.Fs())
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func resultName(result Results, side, src, dst fs.DirEntry) string {
|
|
if side != nil {
|
|
return side.Remote()
|
|
} else if result.IsSrc && dst != nil {
|
|
return dst.Remote()
|
|
} else if src != nil {
|
|
return src.Remote()
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// returns the opposite side's name, only if different
|
|
func altName(name string, src, dst fs.DirEntry) string {
|
|
if src != nil && dst != nil {
|
|
if src.Remote() != dst.Remote() {
|
|
switch name {
|
|
case src.Remote():
|
|
return dst.Remote()
|
|
case dst.Remote():
|
|
return src.Remote()
|
|
}
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// WriteResults is Bisync's LoggerFn
|
|
func WriteResults(ctx context.Context, sigil operations.Sigil, src, dst fs.DirEntry, err error) {
|
|
lock.Lock()
|
|
defer lock.Unlock()
|
|
|
|
opt := operations.GetLoggerOpt(ctx)
|
|
result := Results{
|
|
Sigil: sigil,
|
|
Src: FsPathIfAny(src),
|
|
Dst: FsPathIfAny(dst),
|
|
Err: err,
|
|
Origin: "sync",
|
|
}
|
|
|
|
result.Winner = operations.WinningSide(ctx, sigil, src, dst, err)
|
|
|
|
fss := []fs.DirEntry{src, dst}
|
|
for i, side := range fss {
|
|
|
|
result.Name = resultName(result, side, src, dst)
|
|
result.AltName = altName(result.Name, src, dst)
|
|
result.IsSrc = i == 0
|
|
result.IsDst = i == 1
|
|
result.Flags = "-"
|
|
if side != nil {
|
|
result.Size = side.Size()
|
|
if !ignoreListingModtime {
|
|
result.Modtime = side.ModTime(ctx).In(TZ)
|
|
}
|
|
if !ignoreListingChecksum {
|
|
sideObj, ok := side.(fs.ObjectInfo)
|
|
if ok {
|
|
result.Hash, _ = sideObj.Hash(ctx, getHashType(sideObj.Fs().Name()))
|
|
result.Hash, _ = tryDownloadHash(ctx, sideObj, result.Hash)
|
|
}
|
|
|
|
}
|
|
}
|
|
result.IsWinner = result.Winner.Obj == side
|
|
|
|
// used during resync only
|
|
if err == fs.ErrorIsDir {
|
|
if src != nil {
|
|
result.Src = src.Remote()
|
|
result.Name = src.Remote()
|
|
} else {
|
|
result.Dst = dst.Remote()
|
|
result.Name = dst.Remote()
|
|
}
|
|
result.Flags = "d"
|
|
result.Size = -1
|
|
}
|
|
|
|
prettyprint(result, "writing result", fs.LogLevelDebug)
|
|
if result.Size < 0 && result.Flags != "d" && ((queueCI.CheckSum && !downloadHash) || queueCI.SizeOnly) {
|
|
once.Do(func() {
|
|
fs.Logf(result.Name, Color(terminal.YellowFg, "Files of unknown size (such as Google Docs) do not sync reliably with --checksum or --size-only. Consider using modtime instead (the default) or --drive-skip-gdocs"))
|
|
})
|
|
}
|
|
|
|
err := json.NewEncoder(opt.JSON).Encode(result)
|
|
if err != nil {
|
|
fs.Errorf(result, "Error encoding JSON: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// ReadResults decodes the JSON data from WriteResults
|
|
func ReadResults(results io.Reader) []Results {
|
|
dec := json.NewDecoder(results)
|
|
var slice []Results
|
|
for {
|
|
var r Results
|
|
if err := dec.Decode(&r); err == io.EOF {
|
|
break
|
|
}
|
|
prettyprint(r, "result", fs.LogLevelDebug)
|
|
slice = append(slice, r)
|
|
}
|
|
return slice
|
|
}
|
|
|
|
// for setup code shared by both fastCopy and resyncDir
|
|
func (b *bisyncRun) preCopy(ctx context.Context) context.Context {
|
|
queueCI = fs.GetConfig(ctx)
|
|
ignoreListingChecksum = b.opt.IgnoreListingChecksum
|
|
ignoreListingModtime = !b.opt.Compare.Modtime
|
|
hashTypes = map[string]hash.Type{
|
|
b.fs1.Name(): b.opt.Compare.HashType1,
|
|
b.fs2.Name(): b.opt.Compare.HashType2,
|
|
}
|
|
logger.LoggerFn = WriteResults
|
|
overridingEqual := false
|
|
if (b.opt.Compare.Modtime && b.opt.Compare.Checksum) || b.opt.Compare.DownloadHash {
|
|
overridingEqual = true
|
|
fs.Debugf(nil, "overriding equal")
|
|
// otherwise impossible in Sync, so override Equal
|
|
ctx = b.EqualFn(ctx)
|
|
}
|
|
if b.opt.ResyncMode == PreferOlder || b.opt.ResyncMode == PreferLarger || b.opt.ResyncMode == PreferSmaller {
|
|
overridingEqual = true
|
|
fs.Debugf(nil, "overriding equal")
|
|
ctx = b.EqualFn(ctx)
|
|
}
|
|
ctxCopyLogger := operations.WithSyncLogger(ctx, logger)
|
|
if b.opt.Compare.Checksum && (b.opt.Compare.NoSlowHash || b.opt.Compare.SlowHashSyncOnly) && b.opt.Compare.SlowHashDetected {
|
|
// set here in case !b.opt.Compare.Modtime
|
|
queueCI = fs.GetConfig(ctxCopyLogger)
|
|
if b.opt.Compare.NoSlowHash {
|
|
queueCI.CheckSum = false
|
|
}
|
|
if b.opt.Compare.SlowHashSyncOnly && !overridingEqual {
|
|
queueCI.CheckSum = true
|
|
}
|
|
}
|
|
return ctxCopyLogger
|
|
}
|
|
|
|
func (b *bisyncRun) fastCopy(ctx context.Context, fsrc, fdst fs.Fs, files bilib.Names, queueName string) ([]Results, error) {
|
|
if b.InGracefulShutdown {
|
|
return nil, nil
|
|
}
|
|
ctx = b.preCopy(ctx)
|
|
if err := b.saveQueue(files, queueName); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ctxCopy, filterCopy := filter.AddConfig(b.opt.setDryRun(ctx))
|
|
for _, file := range files.ToList() {
|
|
if err := filterCopy.AddFile(file); err != nil {
|
|
return nil, err
|
|
}
|
|
alias := b.aliases.Alias(file)
|
|
if alias != file {
|
|
if err := filterCopy.AddFile(alias); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
b.SyncCI = fs.GetConfig(ctxCopy) // allows us to request graceful shutdown
|
|
accounting.MaxCompletedTransfers = -1 // we need a complete list in the event of graceful shutdown
|
|
ctxCopy, b.CancelSync = context.WithCancel(ctxCopy)
|
|
b.testFn()
|
|
err := sync.Sync(ctxCopy, fdst, fsrc, b.opt.CreateEmptySrcDirs)
|
|
prettyprint(logger, "logger", fs.LogLevelDebug)
|
|
|
|
getResults := ReadResults(logger.JSON)
|
|
fs.Debugf(nil, "Got %v results for %v", len(getResults), queueName)
|
|
|
|
lineFormat := "%s %8d %s %s %s %q\n"
|
|
for _, result := range getResults {
|
|
fs.Debugf(nil, lineFormat, result.Flags, result.Size, result.Hash, "", result.Modtime, result.Name)
|
|
}
|
|
|
|
return getResults, err
|
|
}
|
|
|
|
func (b *bisyncRun) retryFastCopy(ctx context.Context, fsrc, fdst fs.Fs, files bilib.Names, queueName string, results []Results, err error) ([]Results, error) {
|
|
if err != nil && b.opt.Resilient && !b.InGracefulShutdown && b.opt.Retries > 1 {
|
|
for tries := 1; tries <= b.opt.Retries; tries++ {
|
|
fs.Logf(queueName, Color(terminal.YellowFg, "Received error: %v - retrying as --resilient is set. Retry %d/%d"), err, tries, b.opt.Retries)
|
|
accounting.GlobalStats().ResetErrors()
|
|
results, err = b.fastCopy(ctx, fsrc, fdst, files, queueName)
|
|
if err == nil || b.InGracefulShutdown {
|
|
return results, err
|
|
}
|
|
}
|
|
}
|
|
return results, err
|
|
}
|
|
|
|
func (b *bisyncRun) resyncDir(ctx context.Context, fsrc, fdst fs.Fs) ([]Results, error) {
|
|
ctx = b.preCopy(ctx)
|
|
|
|
err := sync.CopyDir(ctx, fdst, fsrc, b.opt.CreateEmptySrcDirs)
|
|
prettyprint(logger, "logger", fs.LogLevelDebug)
|
|
|
|
getResults := ReadResults(logger.JSON)
|
|
fs.Debugf(nil, "Got %v results for %v", len(getResults), "resync")
|
|
|
|
return getResults, err
|
|
}
|
|
|
|
// operation should be "make" or "remove"
|
|
func (b *bisyncRun) syncEmptyDirs(ctx context.Context, dst fs.Fs, candidates bilib.Names, dirsList *fileList, results *[]Results, operation string) {
|
|
if b.InGracefulShutdown {
|
|
return
|
|
}
|
|
fs.Debugf(nil, "syncing empty dirs")
|
|
if b.opt.CreateEmptySrcDirs && (!b.opt.Resync || operation == "make") {
|
|
|
|
candidatesList := candidates.ToList()
|
|
if operation == "remove" {
|
|
// reverse the sort order to ensure we remove subdirs before parent dirs
|
|
sort.Sort(sort.Reverse(sort.StringSlice(candidatesList)))
|
|
}
|
|
|
|
for _, s := range candidatesList {
|
|
var direrr error
|
|
if dirsList.has(s) { //make sure it's a dir, not a file
|
|
r := Results{}
|
|
r.Name = s
|
|
r.Size = -1
|
|
r.Modtime = dirsList.getTime(s).In(time.UTC)
|
|
r.Flags = "d"
|
|
r.Err = nil
|
|
r.Origin = "syncEmptyDirs"
|
|
r.Winner = operations.Winner{ // note: Obj not set
|
|
Side: "src",
|
|
Err: nil,
|
|
}
|
|
|
|
rSrc := r
|
|
rDst := r
|
|
rSrc.IsSrc = true
|
|
rSrc.IsDst = false
|
|
rDst.IsSrc = false
|
|
rDst.IsDst = true
|
|
rSrc.IsWinner = true
|
|
rDst.IsWinner = false
|
|
|
|
if operation == "remove" {
|
|
// directories made empty by the sync will have already been deleted during the sync
|
|
// this just catches the already-empty ones (excluded from sync by --files-from filter)
|
|
direrr = operations.TryRmdir(ctx, dst, s)
|
|
rSrc.Sigil = operations.MissingOnSrc
|
|
rDst.Sigil = operations.MissingOnSrc
|
|
rSrc.Dst = s
|
|
rDst.Dst = s
|
|
rSrc.Winner.Side = "none"
|
|
rDst.Winner.Side = "none"
|
|
} else if operation == "make" {
|
|
direrr = operations.Mkdir(ctx, dst, s)
|
|
rSrc.Sigil = operations.MissingOnDst
|
|
rDst.Sigil = operations.MissingOnDst
|
|
rSrc.Src = s
|
|
rDst.Src = s
|
|
} else {
|
|
direrr = fmt.Errorf("invalid operation. Expected 'make' or 'remove', received '%q'", operation)
|
|
}
|
|
|
|
if direrr != nil {
|
|
fs.Debugf(nil, "Error syncing directory: %v", direrr)
|
|
} else {
|
|
*results = append(*results, rSrc, rDst)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *bisyncRun) saveQueue(files bilib.Names, jobName string) error {
|
|
if !b.opt.SaveQueues {
|
|
return nil
|
|
}
|
|
queueFile := fmt.Sprintf("%s.%s.que", b.basePath, jobName)
|
|
return files.Save(queueFile)
|
|
}
|