From 115d24e1f7a9a4e6bd612c7be4e2ba691ffe81b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=B6ller?= Date: Mon, 16 Oct 2017 21:54:53 +0200 Subject: [PATCH] amazonclouddrive: implement DirChangeNotify Use the Changes API to invalidate cache entries. The latest retrieved checkpoint is stored in the config file to allow fast resumption after restart. --- backend/amazonclouddrive/amazonclouddrive.go | 107 ++++++++++++++++++- 1 file changed, 102 insertions(+), 5 deletions(-) diff --git a/backend/amazonclouddrive/amazonclouddrive.go b/backend/amazonclouddrive/amazonclouddrive.go index 836cf4be2..569f97bff 100644 --- a/backend/amazonclouddrive/amazonclouddrive.go +++ b/backend/amazonclouddrive/amazonclouddrive.go @@ -19,6 +19,7 @@ import ( "net/http" "path" "regexp" + "sort" "strings" "time" @@ -1206,14 +1207,110 @@ func (o *Object) MimeType() string { return "" } +// DirChangeNotify polls for changes from the remote and hands the path to the +// given function. Only changes that can be resolved to a path through the +// DirCache will handled. +// +// Automatically restarts itself in case of unexpected behaviour of the remote. +// +// Close the returned channel to stop being notified. +func (f *Fs) DirChangeNotify(notifyFunc func(string), pollInterval time.Duration) chan bool { + checkpoint := config.FileGet(f.name, "checkpoint") + + quit := make(chan bool) + go func() { + for { + checkpoint = f.dirchangeNotifyRunner(notifyFunc, checkpoint) + if err := config.SetValueAndSave(f.name, "checkpoint", checkpoint); err != nil { + fs.Debugf(f, "Unable to save checkpoint: %v", err) + } + select { + case <-quit: + return + case <-time.After(pollInterval): + } + } + }() + return quit +} + +func (f *Fs) dirchangeNotifyRunner(notifyFunc func(string), checkpoint string) string { + var err error + var resp *http.Response + var reachedEnd bool + var csCount int + var nodeCount int + + fs.Debugf(f, "Checking for changes on remote (Checkpoint %q)", checkpoint) + err = f.pacer.CallNoRetry(func() (bool, error) { + resp, err = f.c.Changes.GetChangesFunc(&acd.ChangesOptions{ + Checkpoint: checkpoint, + IncludePurged: true, + }, func(changeSet *acd.ChangeSet, err error) error { + if err != nil { + return err + } + + pathsToClear := make([]string, 0) + csCount++ + nodeCount += len(changeSet.Nodes) + if changeSet.End { + reachedEnd = true + } + if changeSet.Checkpoint != "" { + checkpoint = changeSet.Checkpoint + } + for _, node := range changeSet.Nodes { + if path, ok := f.dirCache.GetInv(*node.Id); ok { + pathsToClear = append(pathsToClear, path) + } + + for _, parent := range node.Parents { + if path, ok := f.dirCache.GetInv(parent); ok { + pathsToClear = append(pathsToClear, path) + } + } + } + + lastNotifiedPath := "" + sort.Strings(pathsToClear) + for _, path := range pathsToClear { + if lastNotifiedPath != "" && (path == lastNotifiedPath || strings.HasPrefix(path+"/", lastNotifiedPath)) { + continue + } + lastNotifiedPath = path + notifyFunc(path) + } + + return nil + }) + return false, err + }) + fs.Debugf(f, "Got %d ChangeSets with %d Nodes", csCount, nodeCount) + + if err != nil && err != io.ErrUnexpectedEOF { + fs.Debugf(f, "Failed to get Changes: %v", err) + return checkpoint + } + + if reachedEnd { + reachedEnd = false + fs.Debugf(f, "All changes were processed. Waiting for more.") + } else if checkpoint == "" { + fs.Debugf(f, "Did not get any checkpoint, something went wrong! %+v", resp) + } + return checkpoint +} + // Check the interfaces are satisfied var ( _ fs.Fs = (*Fs)(nil) _ fs.Purger = (*Fs)(nil) // _ fs.Copier = (*Fs)(nil) - _ fs.Mover = (*Fs)(nil) - _ fs.DirMover = (*Fs)(nil) - _ fs.DirCacheFlusher = (*Fs)(nil) - _ fs.Object = (*Object)(nil) - _ fs.MimeTyper = &Object{} + _ fs.Mover = (*Fs)(nil) + _ fs.DirMover = (*Fs)(nil) + _ fs.DirCacheFlusher = (*Fs)(nil) + _ fs.DirChangeNotifier = (*Fs)(nil) + _ fs.Object = (*Object)(nil) + _ fs.MimeTyper = &Object{} )