// Implementation of sync/copy/move package fs import ( "sync" "time" "github.com/pkg/errors" ) type syncCopyMove struct { // parameters fdst Fs fsrc Fs Delete bool DoMove bool dir string // internal state noTraverse bool // if set don't trafevers the dst deleteBefore bool // set if we must delete objects before copying dstFiles map[string]Object // dst files, only used if Delete srcFiles map[string]Object // src files, only used if deleteBefore srcFilesChan chan Object // passes src objects srcFilesResult chan error // error result of src listing dstFilesResult chan error // error result of dst listing abort chan struct{} // signal to abort the copiers checkerWg sync.WaitGroup // wait for checkers toBeChecked ObjectPairChan // checkers channel copierWg sync.WaitGroup // wait for copiers toBeUploaded ObjectPairChan // copiers channel errorMu sync.Mutex // Mutex covering the errors variables err error // normal error from copy process noRetryErr error // error with NoRetry set fatalErr error // fatal error } func newSyncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) *syncCopyMove { s := &syncCopyMove{ fdst: fdst, fsrc: fsrc, Delete: Delete, DoMove: DoMove, dir: "", srcFilesChan: make(chan Object, Config.Checkers+Config.Transfers), srcFilesResult: make(chan error, 1), dstFilesResult: make(chan error, 1), noTraverse: Config.NoTraverse, abort: make(chan struct{}), toBeChecked: make(ObjectPairChan, Config.Transfers), toBeUploaded: make(ObjectPairChan, Config.Transfers), deleteBefore: Delete && Config.DeleteBefore, } if s.noTraverse && s.Delete { Debug(s.fdst, "Ignoring --no-traverse with sync") s.noTraverse = false } return s } // Check to see if have set the abort flag func (s *syncCopyMove) aborting() bool { select { case <-s.abort: return true default: } return false } // This reads the source files from s.srcFiles into srcFilesChan then // closes it // // It returns the final result of the read into s.srcFilesResult func (s *syncCopyMove) readSrcUsingMap() { outer: for _, o := range s.srcFiles { if s.aborting() { break outer } select { case s.srcFilesChan <- o: case <-s.abort: break outer } } close(s.srcFilesChan) s.srcFilesResult <- nil } // This reads the source files into srcFilesChan then closes it // // It returns the final result of the read into s.srcFilesResult func (s *syncCopyMove) readSrcUsingChan() { err := readFilesFn(s.fsrc, false, s.dir, func(o Object) error { if s.aborting() { return ErrorListAborted } select { case s.srcFilesChan <- o: case <-s.abort: return ErrorListAborted } return nil }) close(s.srcFilesChan) if err != nil { err = errors.Wrapf(err, "error listing source: %s", s.fsrc) } s.srcFilesResult <- err } // This reads the destination files in into dstFiles // // It returns the final result of the read into s.dstFilesResult func (s *syncCopyMove) readDstFiles() { var err error s.dstFiles, err = readFilesMap(s.fdst, Config.Filter.DeleteExcluded, s.dir) s.dstFilesResult <- err } // Check to see if src needs to be copied to dst and if so puts it in out // // Returns a flag which indicates whether the file needs to be transferred or not. func (s *syncCopyMove) checkOne(pair ObjectPair) bool { src, dst := pair.src, pair.dst if dst == nil { Debug(src, "Couldn't find file - need to transfer") return true } // If we should ignore existing files, don't transfer if Config.IgnoreExisting { Debug(src, "Destination exists, skipping") return false } // If we should upload unconditionally if Config.IgnoreTimes { Debug(src, "Transferring unconditionally as --ignore-times is in use") return true } // If UpdateOlder is in effect, skip if dst is newer than src if Config.UpdateOlder { srcModTime := src.ModTime() dstModTime := dst.ModTime() dt := dstModTime.Sub(srcModTime) // If have a mutually agreed precision then use that modifyWindow := Config.ModifyWindow if modifyWindow == ModTimeNotSupported { // Otherwise use 1 second as a safe default as // the resolution of the time a file was // uploaded. modifyWindow = time.Second } switch { case dt >= modifyWindow: Debug(src, "Destination is newer than source, skipping") return false case dt <= -modifyWindow: Debug(src, "Destination is older than source, transferring") default: if src.Size() == dst.Size() { Debug(src, "Destination mod time is within %v of source and sizes identical, skipping", modifyWindow) return false } Debug(src, "Destination mod time is within %v of source but sizes differ, transferring", modifyWindow) } } else { // Check to see if changed or not if Equal(src, dst) { Debug(src, "Unchanged skipping") return false } } return true } // This checks the types of errors returned while copying files func (s *syncCopyMove) processError(err error) { if err == nil { return } s.errorMu.Lock() defer s.errorMu.Unlock() switch { case IsFatalError(err): if !s.aborting() { close(s.abort) } s.fatalErr = err case IsNoRetryError(err): s.noRetryErr = err default: s.err = err } } // pairChecker reads Objects~s on in send to out if they need transferring. // // FIXME potentially doing lots of hashes at once func (s *syncCopyMove) pairChecker(in ObjectPairChan, out ObjectPairChan, wg *sync.WaitGroup) { defer wg.Done() for { if s.aborting() { return } select { case pair, ok := <-in: if !ok { return } src := pair.src Stats.Checking(src.Remote()) // Check to see if can store this if src.Storable() { if s.checkOne(pair) { out <- pair } else { // If moving need to delete the files we don't need to copy if s.DoMove { // Delete src if no error on copy s.processError(DeleteFile(src)) } } } Stats.DoneChecking(src.Remote()) case <-s.abort: return } } } // pairCopier reads Objects on in and copies them. func (s *syncCopyMove) pairCopier(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup) { defer wg.Done() for { if s.aborting() { return } select { case pair, ok := <-in: if !ok { return } src := pair.src Stats.Transferring(src.Remote()) var err error if Config.DryRun { Log(src, "Not copying as --dry-run") } else { err = Copy(fdst, pair.dst, src) s.processError(err) } Stats.DoneTransferring(src.Remote(), err == nil) case <-s.abort: return } } } // pairMover reads Objects on in and moves them if possible, or copies // them if not func (s *syncCopyMove) pairMover(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup) { defer wg.Done() // See if we have Move available fdstMover, haveMover := fdst.(Mover) for { if s.aborting() { return } select { case pair, ok := <-in: if !ok { return } transferredOK := true src := pair.src dst := pair.dst Stats.Transferring(src.Remote()) doCopy := func() { // Copy dst <- src err := Copy(fdst, dst, src) s.processError(err) if err != nil { transferredOK = false ErrorLog(src, "Not deleting as copy failed: %v", err) } else { // Delete src if no error on copy s.processError(DeleteFile(src)) } } if Config.DryRun { Log(src, "Not moving as --dry-run") } else if haveMover && src.Fs().Name() == fdst.Name() { // Delete destination if it exists if dst != nil { s.processError(DeleteFile(dst)) } // Move dst <- src _, err := fdstMover.Move(src, src.Remote()) if err != nil { // If this remote can't do moves, // then set the flag and copy if err == ErrorCantMove { Debug(src, "Can't move, switching to copy") haveMover = false doCopy() } else { Stats.Error() ErrorLog(dst, "Couldn't move: %v", err) s.processError(err) transferredOK = false } } else { Debug(src, "Moved") } } else { doCopy() } Stats.DoneTransferring(src.Remote(), transferredOK) case <-s.abort: return } } } // This starts the background checkers. func (s *syncCopyMove) startCheckers() { s.checkerWg.Add(Config.Checkers) for i := 0; i < Config.Checkers; i++ { go s.pairChecker(s.toBeChecked, s.toBeUploaded, &s.checkerWg) } } // This stops the background checkers func (s *syncCopyMove) stopCheckers() { close(s.toBeChecked) Log(s.fdst, "Waiting for checks to finish") s.checkerWg.Wait() } // This starts the background transfers func (s *syncCopyMove) startTransfers() { s.copierWg.Add(Config.Transfers) for i := 0; i < Config.Transfers; i++ { if s.DoMove { go s.pairMover(s.toBeUploaded, s.fdst, &s.copierWg) } else { go s.pairCopier(s.toBeUploaded, s.fdst, &s.copierWg) } } } // This stops the background transfers func (s *syncCopyMove) stopTransfers() { close(s.toBeUploaded) Log(s.fdst, "Waiting for transfers to finish") s.copierWg.Wait() } // This deletes the files in the dstFiles map. If checkSrcMap is set // then it checks to see if they exist first in srcFiles the source // file map, otherwise it unconditionally deletes them. If // checkSrcMap is clear then it assumes that the any source files that // have been found have been removed from dstFiles already. func (s *syncCopyMove) deleteFiles(checkSrcMap bool) error { if Stats.Errored() { ErrorLog(s.fdst, "%v", ErrorNotDeleting) return ErrorNotDeleting } // Delete the spare files toDelete := make(ObjectsChan, Config.Transfers) go func() { for remote, o := range s.dstFiles { if checkSrcMap { _, exists := s.srcFiles[remote] if exists { continue } } if s.aborting() { break } toDelete <- o } close(toDelete) }() return DeleteFiles(toDelete) } // Syncs fsrc into fdst // // If Delete is true then it deletes any files in fdst that aren't in fsrc // // If DoMove is true then files will be moved instead of copied // // dir is the start directory, "" for root func (s *syncCopyMove) run() error { if Same(s.fdst, s.fsrc) { ErrorLog(s.fdst, "Nothing to do as source and destination are the same") return nil } err := Mkdir(s.fdst) if err != nil { return err } // Start reading dstFiles if required if !s.noTraverse { go s.readDstFiles() } // If s.deleteBefore then we need to read the whole source map first if s.deleteBefore { // Read source files into the map s.srcFiles, err = readFilesMap(s.fsrc, false, s.dir) if err != nil { return err } // Pump the map into s.srcFilesChan go s.readSrcUsingMap() } else { go s.readSrcUsingChan() } // Wait for dstfiles to finish reading if we were reading them // and report any errors if !s.noTraverse { err = <-s.dstFilesResult if err != nil { return err } } // Delete files first if required // Have dstFiles and srcFiles complete at this point if s.deleteBefore { err = s.deleteFiles(true) if err != nil { return err } } // Start background checking and transferring pipeline s.startCheckers() s.startTransfers() // Do the transfers for src := range s.srcFilesChan { remote := src.Remote() var dst Object if s.noTraverse { var err error dst, err = s.fdst.NewObject(remote) if err != nil { dst = nil if err != ErrorObjectNotFound { Debug(src, "Error making NewObject: %v", err) } } } else { dst = s.dstFiles[remote] // Remove file from s.dstFiles because it exists in srcFiles delete(s.dstFiles, remote) } if dst != nil { s.toBeChecked <- ObjectPair{src, dst} } else { // No need to check since doesn't exist s.toBeUploaded <- ObjectPair{src, nil} } } // Stop background checking and transferring pipeline s.stopCheckers() s.stopTransfers() // Retrieve the delayed error from the source listing goroutine err = <-s.srcFilesResult // Delete files during or after if s.Delete && (Config.DeleteDuring || Config.DeleteAfter) { if err != nil { ErrorLog(s.fdst, "%v", ErrorNotDeleting) } else { err = s.deleteFiles(false) } } // Return errors in the precedence // fatalErr // error from above // error from a copy // noRetryErr s.processError(err) if s.fatalErr != nil { return s.fatalErr } if s.err != nil { return s.err } return s.noRetryErr } // Sync fsrc into fdst func Sync(fdst, fsrc Fs) error { return newSyncCopyMove(fdst, fsrc, true, false).run() } // CopyDir copies fsrc into fdst func CopyDir(fdst, fsrc Fs) error { return newSyncCopyMove(fdst, fsrc, false, false).run() } // moveDir moves fsrc into fdst func moveDir(fdst, fsrc Fs) error { return newSyncCopyMove(fdst, fsrc, false, true).run() } // MoveDir moves fsrc into fdst func MoveDir(fdst, fsrc Fs) error { if Same(fdst, fsrc) { ErrorLog(fdst, "Nothing to do as source and destination are the same") return nil } // The two remotes mustn't overlap if Overlapping(fdst, fsrc) { err := ErrorCantMoveOverlapping ErrorLog(fdst, "%v", err) return err } // First attempt to use DirMover if exists, same Fs and no filters are active if fdstDirMover, ok := fdst.(DirMover); ok && fsrc.Name() == fdst.Name() && Config.Filter.InActive() { if Config.DryRun { Log(fdst, "Not doing server side directory move as --dry-run") return nil } Debug(fdst, "Using server side directory move") err := fdstDirMover.DirMove(fsrc) switch err { case ErrorCantDirMove, ErrorDirExists: Debug(fdst, "Server side directory move failed - fallback to file moves: %v", err) case nil: Debug(fdst, "Server side directory move succeeded") return nil default: Stats.Error() ErrorLog(fdst, "Server side directory move failed: %v", err) return err } } // Otherwise move the files one by one return moveDir(fdst, fsrc) }