fs: update ChangeNotifier interface

This introduces a channel to the ChangeNotify function, which can be
used to update the poll-interval and cleanly exit the polling function.
This commit is contained in:
Fabian Möller 2018-08-25 21:28:57 +02:00
parent 96ce49ec4e
commit 1eec59e091
7 changed files with 103 additions and 53 deletions

View File

@ -1240,24 +1240,38 @@ func (o *Object) MimeType() string {
// Automatically restarts itself in case of unexpected behaviour of the remote. // Automatically restarts itself in case of unexpected behaviour of the remote.
// //
// Close the returned channel to stop being notified. // Close the returned channel to stop being notified.
func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollInterval time.Duration) chan bool { func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) {
checkpoint := f.opt.Checkpoint checkpoint := f.opt.Checkpoint
quit := make(chan bool)
go func() { go func() {
var ticker *time.Ticker
var tickerC <-chan time.Time
for { for {
checkpoint = f.changeNotifyRunner(notifyFunc, checkpoint)
if err := config.SetValueAndSave(f.name, "checkpoint", checkpoint); err != nil {
fs.Debugf(f, "Unable to save checkpoint: %v", err)
}
select { select {
case <-quit: case pollInterval, ok := <-pollIntervalChan:
return if !ok {
case <-time.After(pollInterval): if ticker != nil {
ticker.Stop()
}
return
}
if pollInterval == 0 {
if ticker != nil {
ticker.Stop()
ticker, tickerC = nil, nil
}
} else {
ticker = time.NewTicker(pollInterval)
tickerC = ticker.C
}
case <-tickerC:
checkpoint = f.changeNotifyRunner(notifyFunc, checkpoint)
if err := config.SetValueAndSave(f.name, "checkpoint", checkpoint); err != nil {
fs.Debugf(f, "Unable to save checkpoint: %v", err)
}
} }
} }
}() }()
return quit
} }
func (f *Fs) changeNotifyRunner(notifyFunc func(string, fs.EntryType), checkpoint string) string { func (f *Fs) changeNotifyRunner(notifyFunc func(string, fs.EntryType), checkpoint string) string {

View File

@ -415,7 +415,9 @@ func NewFs(name, rootPath string, m configmap.Mapper) (fs.Fs, error) {
}() }()
if doChangeNotify := wrappedFs.Features().ChangeNotify; doChangeNotify != nil { if doChangeNotify := wrappedFs.Features().ChangeNotify; doChangeNotify != nil {
doChangeNotify(f.receiveChangeNotify, time.Duration(f.opt.ChunkCleanInterval)) pollInterval := make(chan time.Duration, 1)
pollInterval <- time.Duration(f.opt.ChunkCleanInterval)
doChangeNotify(f.receiveChangeNotify, pollInterval)
} }
f.features = (&fs.Features{ f.features = (&fs.Features{
@ -780,12 +782,15 @@ func (f *Fs) notifyChangeUpstream(remote string, entryType fs.EntryType) {
// ChangeNotify can subsribe multiple callers // ChangeNotify can subsribe multiple callers
// this is coupled with the wrapped fs ChangeNotify (if it supports it) // this is coupled with the wrapped fs ChangeNotify (if it supports it)
// and also notifies other caches (i.e VFS) to clear out whenever something changes // and also notifies other caches (i.e VFS) to clear out whenever something changes
func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollInterval time.Duration) chan bool { func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollInterval <-chan time.Duration) {
f.parentsForgetMu.Lock() f.parentsForgetMu.Lock()
defer f.parentsForgetMu.Unlock() defer f.parentsForgetMu.Unlock()
fs.Debugf(f, "subscribing to ChangeNotify") fs.Debugf(f, "subscribing to ChangeNotify")
f.parentsForgetFn = append(f.parentsForgetFn, notifyFunc) f.parentsForgetFn = append(f.parentsForgetFn, notifyFunc)
return make(chan bool) go func() {
for range pollInterval {
}
}()
} }
// Name of the remote (as passed into NewFs) // Name of the remote (as passed into NewFs)

View File

@ -165,7 +165,7 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) {
doChangeNotify := wrappedFs.Features().ChangeNotify doChangeNotify := wrappedFs.Features().ChangeNotify
if doChangeNotify != nil { if doChangeNotify != nil {
f.features.ChangeNotify = func(notifyFunc func(string, fs.EntryType), pollInterval time.Duration) chan bool { f.features.ChangeNotify = func(notifyFunc func(string, fs.EntryType), pollInterval <-chan time.Duration) {
wrappedNotifyFunc := func(path string, entryType fs.EntryType) { wrappedNotifyFunc := func(path string, entryType fs.EntryType) {
decrypted, err := f.DecryptFileName(path) decrypted, err := f.DecryptFileName(path)
if err != nil { if err != nil {
@ -174,7 +174,7 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) {
} }
notifyFunc(decrypted, entryType) notifyFunc(decrypted, entryType)
} }
return doChangeNotify(wrappedNotifyFunc, pollInterval) doChangeNotify(wrappedNotifyFunc, pollInterval)
} }
} }

View File

@ -1660,25 +1660,50 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error {
// Automatically restarts itself in case of unexpected behaviour of the remote. // Automatically restarts itself in case of unexpected behaviour of the remote.
// //
// Close the returned channel to stop being notified. // Close the returned channel to stop being notified.
func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollInterval time.Duration) chan bool { func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) {
quit := make(chan bool)
go func() { go func() {
select { // get the StartPageToken early so all changes from now on get processed
case <-quit: startPageToken, err := f.changeNotifyStartPageToken()
return if err != nil {
default: fs.Infof(f, "Failed to get StartPageToken: %s", err)
for { }
f.changeNotifyRunner(notifyFunc, pollInterval) var ticker *time.Ticker
fs.Debugf(f, "Notify listener service ran into issues, restarting shortly.") var tickerC <-chan time.Time
time.Sleep(pollInterval) for {
select {
case pollInterval, ok := <-pollIntervalChan:
if !ok {
if ticker != nil {
ticker.Stop()
}
return
}
if ticker != nil {
ticker.Stop()
ticker, tickerC = nil, nil
}
if pollInterval != 0 {
ticker = time.NewTicker(pollInterval)
tickerC = ticker.C
}
case <-tickerC:
if startPageToken == "" {
startPageToken, err = f.changeNotifyStartPageToken()
if err != nil {
fs.Infof(f, "Failed to get StartPageToken: %s", err)
continue
}
}
fs.Debugf(f, "Checking for changes on remote")
startPageToken, err = f.changeNotifyRunner(notifyFunc, startPageToken)
if err != nil {
fs.Infof(f, "Change notify listener failure: %s", err)
}
} }
} }
}() }()
return quit
} }
func (f *Fs) changeNotifyStartPageToken() (pageToken string, err error) {
func (f *Fs) changeNotifyRunner(notifyFunc func(string, fs.EntryType), pollInterval time.Duration) {
var err error
var startPageToken *drive.StartPageToken var startPageToken *drive.StartPageToken
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
startPageToken, err = f.svc.Changes.GetStartPageToken(). startPageToken, err = f.svc.Changes.GetStartPageToken().
@ -1687,13 +1712,14 @@ func (f *Fs) changeNotifyRunner(notifyFunc func(string, fs.EntryType), pollInter
return shouldRetry(err) return shouldRetry(err)
}) })
if err != nil { if err != nil {
fs.Debugf(f, "Failed to get StartPageToken: %v", err)
return return
} }
pageToken := startPageToken.StartPageToken return startPageToken.StartPageToken, nil
}
func (f *Fs) changeNotifyRunner(notifyFunc func(string, fs.EntryType), startPageToken string) (newStartPageToken string, err error) {
pageToken := startPageToken
for { for {
fs.Debugf(f, "Checking for changes on remote")
var changeList *drive.ChangeList var changeList *drive.ChangeList
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
@ -1711,7 +1737,6 @@ func (f *Fs) changeNotifyRunner(notifyFunc func(string, fs.EntryType), pollInter
return shouldRetry(err) return shouldRetry(err)
}) })
if err != nil { if err != nil {
fs.Debugf(f, "Failed to get Changes: %v", err)
return return
} }
@ -1763,15 +1788,12 @@ func (f *Fs) changeNotifyRunner(notifyFunc func(string, fs.EntryType), pollInter
notifyFunc(entry.path, entry.entryType) notifyFunc(entry.path, entry.entryType)
} }
if changeList.NewStartPageToken != "" { switch {
pageToken = changeList.NewStartPageToken case changeList.NewStartPageToken != "":
fs.Debugf(f, "All changes were processed. Waiting for more.") return changeList.NewStartPageToken, nil
time.Sleep(pollInterval) case changeList.NextPageToken != "":
} else if changeList.NextPageToken != "" {
pageToken = changeList.NextPageToken pageToken = changeList.NextPageToken
fs.Debugf(f, "There are more changes pending, checking now.") default:
} else {
fs.Debugf(f, "Did not get any page token, something went wrong! %+v", changeList)
return return
} }
} }

View File

@ -424,7 +424,7 @@ type Features struct {
// ChangeNotify calls the passed function with a path // ChangeNotify calls the passed function with a path
// that has had changes. If the implementation // that has had changes. If the implementation
// uses polling, it should adhere to the given interval. // uses polling, it should adhere to the given interval.
ChangeNotify func(func(string, EntryType), time.Duration) chan bool ChangeNotify func(func(string, EntryType), <-chan time.Duration)
// UnWrap returns the Fs that this Fs is wrapping // UnWrap returns the Fs that this Fs is wrapping
UnWrap func() Fs UnWrap func() Fs
@ -724,7 +724,13 @@ type ChangeNotifier interface {
// ChangeNotify calls the passed function with a path // ChangeNotify calls the passed function with a path
// that has had changes. If the implementation // that has had changes. If the implementation
// uses polling, it should adhere to the given interval. // uses polling, it should adhere to the given interval.
ChangeNotify(func(string, EntryType), time.Duration) chan bool // At least one value will be written to the channel,
// specifying the initial value and updated values might
// follow. A 0 Duration should pause the polling.
// The ChangeNotify implemantion must empty the channel
// regulary. When the channel gets closed, the implemantion
// should stop polling and release resources.
ChangeNotify(func(string, EntryType), <-chan time.Duration)
} }
// UnWrapper is an optional interfaces for Fs // UnWrapper is an optional interfaces for Fs

View File

@ -770,9 +770,10 @@ func Run(t *testing.T, opt *Opt) {
err := operations.Mkdir(remote, "dir") err := operations.Mkdir(remote, "dir")
require.NoError(t, err) require.NoError(t, err)
pollInterval := make(chan time.Duration)
dirChanges := []string{} dirChanges := []string{}
objChanges := []string{} objChanges := []string{}
quitChannel := doChangeNotify(func(x string, e fs.EntryType) { doChangeNotify(func(x string, e fs.EntryType) {
fs.Debugf(nil, "doChangeNotify(%q, %+v)", x, e) fs.Debugf(nil, "doChangeNotify(%q, %+v)", x, e)
if strings.HasPrefix(x, file1.Path[:5]) || strings.HasPrefix(x, file2.Path[:5]) { if strings.HasPrefix(x, file1.Path[:5]) || strings.HasPrefix(x, file2.Path[:5]) {
fs.Debugf(nil, "Ignoring notify for file1 or file2: %q, %v", x, e) fs.Debugf(nil, "Ignoring notify for file1 or file2: %q, %v", x, e)
@ -783,8 +784,9 @@ func Run(t *testing.T, opt *Opt) {
} else if e == fs.EntryObject { } else if e == fs.EntryObject {
objChanges = append(objChanges, x) objChanges = append(objChanges, x)
} }
}, time.Second) }, pollInterval)
defer func() { close(quitChannel) }() defer func() { close(pollInterval) }()
pollInterval <- time.Second
var dirs []string var dirs []string
for _, idx := range []int{1, 3, 2} { for _, idx := range []int{1, 3, 2} {

View File

@ -176,6 +176,7 @@ type VFS struct {
usageMu sync.Mutex usageMu sync.Mutex
usageTime time.Time usageTime time.Time
usage *fs.Usage usage *fs.Usage
pollChan chan time.Duration
} }
// Options is options for creating the vfs // Options is options for creating the vfs
@ -223,13 +224,13 @@ func New(f fs.Fs, opt *Options) *VFS {
// Create root directory // Create root directory
vfs.root = newDir(vfs, f, nil, fsDir) vfs.root = newDir(vfs, f, nil, fsDir)
// Start polling if required // Start polling function
if vfs.Opt.PollInterval > 0 { if do := vfs.f.Features().ChangeNotify; do != nil {
if do := vfs.f.Features().ChangeNotify; do != nil { vfs.pollChan = make(chan time.Duration)
do(vfs.notifyFunc, vfs.Opt.PollInterval) do(vfs.notifyFunc, vfs.pollChan)
} else { vfs.pollChan <- vfs.Opt.PollInterval
fs.Infof(f, "poll-interval is not supported by this remote") } else {
} fs.Infof(f, "poll-interval is not supported by this remote")
} }
vfs.SetCacheMode(vfs.Opt.CacheMode) vfs.SetCacheMode(vfs.Opt.CacheMode)