diff --git a/fs/sync.go b/fs/sync.go index c7e2e3111..e2d2bf8d4 100644 --- a/fs/sync.go +++ b/fs/sync.go @@ -10,6 +10,8 @@ import ( "github.com/pkg/errors" ) +var oldSyncMethod = BoolP("old-sync-method", "", false, "Temporary flag to select old sync method") + type syncCopyMove struct { // parameters fdst Fs @@ -19,6 +21,8 @@ type syncCopyMove struct { dir string // internal state noTraverse bool // if set don't trafevers the dst + deletersWg sync.WaitGroup // for delete before go routine + deleteFilesCh chan Object // channel to receive deletes if delete before trackRenames bool // set if we should do server side renames dstFilesMu sync.Mutex // protect dstFiles dstFiles map[string]Object // dst files, always filled @@ -61,6 +65,7 @@ func newSyncCopyMove(fdst, fsrc Fs, deleteMode DeleteMode, DoMove bool) (*syncCo abort: make(chan struct{}), toBeChecked: make(ObjectPairChan, Config.Transfers), toBeUploaded: make(ObjectPairChan, Config.Transfers), + deleteFilesCh: make(chan Object, Config.Checkers), trackRenames: Config.TrackRenames, commonHash: fsrc.Hashes().Overlap(fdst.Hashes()).GetOne(), toBeRenamed: make(ObjectPairChan, Config.Transfers), @@ -450,6 +455,28 @@ func (s *syncCopyMove) stopTrackRenames() { s.trackRenamesWg.Wait() } +// This starts the background deletion of files for --delete-during +func (s *syncCopyMove) startDeleters() { + if s.deleteMode != DeleteModeDuring && s.deleteMode != DeleteModeOnly { + return + } + s.deletersWg.Add(1) + go func() { + defer s.deletersWg.Done() + err := deleteFilesWithBackupDir(s.deleteFilesCh, s.backupDir) + s.processError(err) + }() +} + +// This stops the background deleters +func (s *syncCopyMove) stopDeleters() { + if s.deleteMode != DeleteModeDuring && s.deleteMode != DeleteModeOnly { + return + } + close(s.deleteFilesCh) + s.deletersWg.Wait() +} + // This deletes the files in the dstFiles map. If checkSrcMap is set // then it checks to see if they exist first in srcFiles the source // file map, otherwise it unconditionally deletes them. If @@ -598,7 +625,7 @@ func (s *syncCopyMove) tryRename(src Object) bool { // If DoMove is true then files will be moved instead of copied // // dir is the start directory, "" for root -func (s *syncCopyMove) run() error { +func (s *syncCopyMove) runRecursive() error { if Same(s.fdst, s.fsrc) { ErrorLog(s.fdst, "Nothing to do as source and destination are the same") return nil @@ -721,6 +748,298 @@ func (s *syncCopyMove) run() error { return s.currentError() } +// listDirJob describe a directory listing that needs to be done +type listDirJob struct { + remote string + srcDepth int + dstDepth int + noSrc bool + noDst bool +} + +// Syncs fsrc into fdst +// +// If Delete is true then it deletes any files in fdst that aren't in fsrc +// +// If DoMove is true then files will be moved instead of copied +// +// dir is the start directory, "" for root +func (s *syncCopyMove) runDirAtATime() error { + srcDepth := Config.MaxDepth + if srcDepth < 0 { + srcDepth = MaxLevel + } + dstDepth := srcDepth + if Config.Filter.DeleteExcluded { + dstDepth = MaxLevel + } + + if Same(s.fdst, s.fsrc) { + ErrorLog(s.fdst, "Nothing to do as source and destination are the same") + return nil + } + + // Make the destination directory + err := Mkdir(s.fdst, s.dir) + if err != nil { + return err + } + + // Start background checking and transferring pipeline + s.startCheckers() + s.startRenamers() + s.startTransfers() + s.startDeleters() + s.dstFiles = make(map[string]Object) + + // Start some directory listing go routines + var wg sync.WaitGroup // sync closing of go routines + var traversing sync.WaitGroup // running directory traversals + in := make(chan listDirJob, Config.Checkers) + s.startTrackRenames() + for i := 0; i < Config.Checkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + if s.aborting() { + return + } + select { + case job, ok := <-in: + if !ok { + return + } + jobs := s._runDirAtATime(job) + if len(jobs) > 0 { + traversing.Add(len(jobs)) + go func() { + // Now we have traversed this directory, send these + // jobs off for traversal in the background + for _, newJob := range jobs { + in <- newJob + } + }() + } + traversing.Done() + case <-s.abort: + return + } + } + }() + } + + // Start the process + traversing.Add(1) + in <- listDirJob{ + remote: s.dir, + srcDepth: srcDepth - 1, + dstDepth: dstDepth - 1, + } + traversing.Wait() + close(in) + wg.Wait() + + s.stopTrackRenames() + if s.trackRenames { + // Build the map of the remaining dstFiles by hash + s.makeRenameMap() + // Attempt renames for all the files which don't have a matching dst + for _, src := range s.renameCheck { + s.toBeRenamed <- ObjectPair{src, nil} + } + } + + // Stop background checking and transferring pipeline + s.stopCheckers() + s.stopRenamers() + s.stopTransfers() + s.stopDeleters() + + // Delete files after + if s.deleteMode == DeleteModeAfter { + if s.currentError() != nil { + ErrorLog(s.fdst, "%v", ErrorNotDeleting) + } else { + s.processError(s.deleteFiles(false)) + } + } + return s.currentError() +} + +// Have an object which is in the destination only +func (s *syncCopyMove) dstOnly(dst BasicInfo, job listDirJob, jobs *[]listDirJob) { + if s.deleteMode == DeleteModeOff { + return + } + switch x := dst.(type) { + case Object: + switch s.deleteMode { + case DeleteModeAfter: + // record object as needs deleting + s.dstFilesMu.Lock() + s.dstFiles[x.Remote()] = x + s.dstFilesMu.Unlock() + case DeleteModeDuring, DeleteModeOnly: + s.deleteFilesCh <- x + default: + panic(fmt.Sprintf("unexpected delete mode %d", s.deleteMode)) + } + case *Dir: + // Do the same thing to the entire contents of the directory + if job.dstDepth > 0 { + *jobs = append(*jobs, listDirJob{ + remote: dst.Remote(), + dstDepth: job.dstDepth - 1, + noSrc: true, + }) + } + default: + panic("Bad object in DirEntries") + + } +} + +// Have an object which is in the source only +func (s *syncCopyMove) srcOnly(src BasicInfo, job listDirJob, jobs *[]listDirJob) { + if s.deleteMode == DeleteModeOnly { + return + } + switch x := src.(type) { + case Object: + if s.trackRenames { + // Save object to check for a rename later + s.trackRenamesCh <- x + } else { + // No need to check since doesn't exist + s.toBeUploaded <- ObjectPair{x, nil} + } + case *Dir: + // Do the same thing to the entire contents of the directory + if job.srcDepth > 0 { + *jobs = append(*jobs, listDirJob{ + remote: src.Remote(), + srcDepth: job.srcDepth - 1, + noDst: true, + }) + } + default: + panic("Bad object in DirEntries") + } +} + +// Given a src and a dst, transfer the src to dst +func (s *syncCopyMove) transfer(dst, src BasicInfo, job listDirJob, jobs *[]listDirJob) { + switch srcX := src.(type) { + case Object: + if s.deleteMode == DeleteModeOnly { + return + } + dstX, ok := dst.(Object) + if ok { + s.toBeChecked <- ObjectPair{srcX, dstX} + } else { + // FIXME src is file, dst is directory + err := errors.New("can't overwrite directory with file") + ErrorLog(srcX, "%v", err) + s.processError(err) + } + case *Dir: + // Do the same thing to the entire contents of the directory + dstX, ok := dst.(*Dir) + if ok { + if job.srcDepth > 0 && job.dstDepth > 0 { + *jobs = append(*jobs, listDirJob{ + remote: src.Remote(), + srcDepth: job.srcDepth - 1, + dstDepth: job.dstDepth - 1, + }) + } + } else { + // FIXME src is dir, dst is file + err := errors.New("can't overwrite file with directory") + ErrorLog(dstX, "%v", err) + s.processError(err) + } + default: + panic("Bad object in DirEntries") + } +} + +// returns errors using processError +func (s *syncCopyMove) _runDirAtATime(job listDirJob) (jobs []listDirJob) { + var ( + srcList, dstList DirEntries + srcListErr, dstListErr error + wg sync.WaitGroup + ) + + // List the src and dst directories + if !job.noSrc { + wg.Add(1) + go func() { + defer wg.Done() + srcList, srcListErr = ListDirSorted(s.fsrc, false, job.remote) + }() + } + if !job.noDst { + wg.Add(1) + go func() { + defer wg.Done() + dstList, dstListErr = ListDirSorted(s.fdst, Config.Filter.DeleteExcluded, job.remote) + }() + } + + // Wait for listings to complete and report errors + wg.Wait() + if srcListErr != nil { + s.processError(errors.Wrapf(srcListErr, "error reading source directory %q", job.remote)) + return nil + } + if dstListErr == ErrorDirNotFound { + // Copy the stuff anyway + } else if dstListErr != nil { + s.processError(errors.Wrapf(srcListErr, "error reading destination directory %q", job.remote)) + return nil + } + + // Process the two listings, matching up the items in the two sorted slices + for iSrc, iDst := 0, 0; ; iSrc, iDst = iSrc+1, iDst+1 { + if s.aborting() { + return nil + } + var src, dst BasicInfo + if iSrc < len(srcList) { + src = srcList[iSrc] + } + if iDst < len(dstList) { + dst = dstList[iDst] + } + if src == nil && dst == nil { + break + } + if src != nil && dst != nil { + if src.Remote() < dst.Remote() { + dst = nil + iDst-- // retry the dst + } else if src.Remote() > dst.Remote() { + src = nil + iSrc-- // retry the src + } + } + // Debug(nil, "src = %v, dst = %v", src, dst) + switch { + case src == nil: + s.dstOnly(dst, job, &jobs) + case dst == nil: + s.srcOnly(src, job, &jobs) + default: + s.transfer(dst, src, job, &jobs) + } + } + return jobs +} + // Syncs fsrc into fdst // // If Delete is true then it deletes any files in fdst that aren't in fsrc @@ -732,11 +1051,31 @@ func runSyncCopyMove(fdst, fsrc Fs, deleteMode DeleteMode, DoMove bool) error { if deleteMode != DeleteModeOff && DoMove { return errors.New("can't delete and move at the same time") } + // Run an extra pass to delete only + if !*oldSyncMethod && Config.DeleteMode == DeleteModeBefore { + if Config.TrackRenames { + return errors.New("can't use --delete-before with --track-renames") + } + // only delete stuff during in this pass + do, err := newSyncCopyMove(fdst, fsrc, DeleteModeOnly, false) + if err != nil { + return err + } + err = do.runDirAtATime() + if err != nil { + return err + } + // Next pass does a copy only + deleteMode = DeleteModeOff + } do, err := newSyncCopyMove(fdst, fsrc, deleteMode, DoMove) if err != nil { return err } - return do.run() + if *oldSyncMethod { + return do.runRecursive() + } + return do.runDirAtATime() } // Sync fsrc into fdst diff --git a/fs/sync_test.go b/fs/sync_test.go index 975835b6c..58747bf6d 100644 --- a/fs/sync_test.go +++ b/fs/sync_test.go @@ -457,6 +457,23 @@ func TestSyncAfterRemovingAFileAndAddingAFile(t *testing.T) { fstest.CheckItems(t, r.fremote, file1, file3) } +// Sync after removing a file and adding a file +func TestSyncAfterRemovingAFileAndAddingAFileSubDir(t *testing.T) { + r := NewRun(t) + defer r.Finalise() + file1 := r.WriteFile("a/potato2", "------------------------------------------------------------", t1) + file2 := r.WriteObject("b/potato", "SMALLER BUT SAME DATE", t2) + file3 := r.WriteBoth("c/non empty space", "AhHa!", t2) + fstest.CheckItems(t, r.fremote, file2, file3) + fstest.CheckItems(t, r.flocal, file1, file3) + + fs.Stats.ResetCounters() + err := fs.Sync(r.fremote, r.flocal) + require.NoError(t, err) + fstest.CheckItems(t, r.flocal, file1, file3) + fstest.CheckItems(t, r.fremote, file1, file3) +} + // Sync after removing a file and adding a file with IO Errors func TestSyncAfterRemovingAFileAndAddingAFileWithErrors(t *testing.T) { r := NewRun(t)