From a603efeaf4c3328640901ce29cd4d9505316fe7c Mon Sep 17 00:00:00 2001 From: David Sze Date: Thu, 24 Aug 2023 23:40:46 -0400 Subject: [PATCH] box: add polling support --- backend/box/api/types.go | 64 +++++++-- backend/box/box.go | 290 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 326 insertions(+), 28 deletions(-) diff --git a/backend/box/api/types.go b/backend/box/api/types.go index 6ff87761c..f8e92638f 100644 --- a/backend/box/api/types.go +++ b/backend/box/api/types.go @@ -63,7 +63,7 @@ var _ error = (*Error)(nil) // ItemFields are the fields needed for FileInfo var ItemFields = "type,id,sequence_id,etag,sha1,name,size,created_at,modified_at,content_created_at,content_modified_at,item_status,shared_link,owned_by" -// Types of things in Item +// Types of things in Item/ItemMini const ( ItemTypeFolder = "folder" ItemTypeFile = "file" @@ -72,20 +72,31 @@ const ( ItemStatusDeleted = "deleted" ) +// ItemMini is a subset of the elements in a full Item returned by some API calls +type ItemMini struct { + Type string `json:"type"` + ID string `json:"id"` + SequenceID int64 `json:"sequence_id,string"` + Etag string `json:"etag"` + SHA1 string `json:"sha1"` + Name string `json:"name"` +} + // Item describes a folder or a file as returned by Get Folder Items and others type Item struct { - Type string `json:"type"` - ID string `json:"id"` - SequenceID string `json:"sequence_id"` - Etag string `json:"etag"` - SHA1 string `json:"sha1"` - Name string `json:"name"` - Size float64 `json:"size"` // box returns this in xEyy format for very large numbers - see #2261 - CreatedAt Time `json:"created_at"` - ModifiedAt Time `json:"modified_at"` - ContentCreatedAt Time `json:"content_created_at"` - ContentModifiedAt Time `json:"content_modified_at"` - ItemStatus string `json:"item_status"` // active, trashed if the file has been moved to the trash, and deleted if the file has been permanently deleted + Type string `json:"type"` + ID string `json:"id"` + SequenceID int64 `json:"sequence_id,string"` + Etag string `json:"etag"` + SHA1 string `json:"sha1"` + Name string `json:"name"` + Size float64 `json:"size"` // box returns this in xEyy format for very large numbers - see #2261 + CreatedAt Time `json:"created_at"` + ModifiedAt Time `json:"modified_at"` + ContentCreatedAt Time `json:"content_created_at"` + ContentModifiedAt Time `json:"content_modified_at"` + ItemStatus string `json:"item_status"` // active, trashed if the file has been moved to the trash, and deleted if the file has been permanently deleted + Parent ItemMini `json:"parent"` SharedLink struct { URL string `json:"url,omitempty"` Access string `json:"access,omitempty"` @@ -281,3 +292,30 @@ type User struct { Address string `json:"address"` AvatarURL string `json:"avatar_url"` } + +// FileTreeChangeEventTypes are the events that can require cache invalidation +var FileTreeChangeEventTypes = map[string]struct{}{ + "ITEM_COPY": {}, + "ITEM_CREATE": {}, + "ITEM_MAKE_CURRENT_VERSION": {}, + "ITEM_MODIFY": {}, + "ITEM_MOVE": {}, + "ITEM_RENAME": {}, + "ITEM_TRASH": {}, + "ITEM_UNDELETE_VIA_TRASH": {}, + "ITEM_UPLOAD": {}, +} + +// Event is an array element in the response returned from /events +type Event struct { + EventType string `json:"event_type"` + EventID string `json:"event_id"` + Source Item `json:"source"` +} + +// Events is returned from /events +type Events struct { + ChunkSize int64 `json:"chunk_size"` + Entries []Event `json:"entries"` + NextStreamPosition int64 `json:"next_stream_position"` +} diff --git a/backend/box/box.go b/backend/box/box.go index 7e34ae7bb..d2853480a 100644 --- a/backend/box/box.go +++ b/backend/box/box.go @@ -264,17 +264,26 @@ type Options struct { OwnedBy string `config:"owned_by"` } +// ItemMeta defines metadata we cache for each Item ID +type ItemMeta struct { + SequenceID int64 // the most recent event processed for this item + ParentID string // ID of the parent directory of this item + Name string // leaf name of this item +} + // Fs represents a remote box type Fs struct { - name string // name of this remote - root string // the path we are working on - opt Options // parsed options - features *fs.Features // optional features - srv *rest.Client // the connection to the server - dirCache *dircache.DirCache // Map of directory path to directory id - pacer *fs.Pacer // pacer for API calls - tokenRenewer *oauthutil.Renew // renew the token on expiry - uploadToken *pacer.TokenDispenser // control concurrency + name string // name of this remote + root string // the path we are working on + opt Options // parsed options + features *fs.Features // optional features + srv *rest.Client // the connection to the server + dirCache *dircache.DirCache // Map of directory path to directory id + pacer *fs.Pacer // pacer for API calls + tokenRenewer *oauthutil.Renew // renew the token on expiry + uploadToken *pacer.TokenDispenser // control concurrency + itemMetaCacheMu *sync.Mutex // protects itemMetaCache + itemMetaCache map[string]ItemMeta // map of Item ID to selected metadata } // Object describes a box object @@ -422,12 +431,14 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e ci := fs.GetConfig(ctx) f := &Fs{ - name: name, - root: root, - opt: *opt, - srv: rest.NewClient(client).SetRoot(rootURL), - pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), - uploadToken: pacer.NewTokenDispenser(ci.Transfers), + name: name, + root: root, + opt: *opt, + srv: rest.NewClient(client).SetRoot(rootURL), + pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), + uploadToken: pacer.NewTokenDispenser(ci.Transfers), + itemMetaCacheMu: new(sync.Mutex), + itemMetaCache: make(map[string]ItemMeta), } f.features = (&fs.Features{ CaseInsensitive: true, @@ -682,6 +693,17 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e } entries = append(entries, o) } + + // Cache some metadata for this Item to help us process events later + // on. In particular, the box event API does not provide the old path + // of the Item when it is renamed/deleted/moved/etc. + f.itemMetaCacheMu.Lock() + cachedItemMeta, found := f.itemMetaCache[info.ID] + if !found || cachedItemMeta.SequenceID < info.SequenceID { + f.itemMetaCache[info.ID] = ItemMeta{SequenceID: info.SequenceID, ParentID: directoryID, Name: info.Name} + } + f.itemMetaCacheMu.Unlock() + return false }) if err != nil { @@ -1152,6 +1174,244 @@ func (f *Fs) CleanUp(ctx context.Context) (err error) { return err } +// ChangeNotify calls the passed function with a path that has had changes. +// If the implementation uses polling, it should adhere to the given interval. +// +// Automatically restarts itself in case of unexpected behavior of the remote. +// +// Close the returned channel to stop being notified. +func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) { + go func() { + // get the `stream_position` early so all changes from now on get processed + streamPosition, err := f.changeNotifyStreamPosition(ctx) + if err != nil { + fs.Infof(f, "Failed to get StreamPosition: %s", err) + } + + var ticker *time.Ticker + var tickerC <-chan time.Time + for { + select { + case pollInterval, ok := <-pollIntervalChan: + if !ok { + if ticker != nil { + ticker.Stop() + } + return + } + if ticker != nil { + ticker.Stop() + ticker, tickerC = nil, nil + } + if pollInterval != 0 { + ticker = time.NewTicker(pollInterval) + tickerC = ticker.C + } + case <-tickerC: + if streamPosition == "" { + streamPosition, err = f.changeNotifyStreamPosition(ctx) + if err != nil { + fs.Infof(f, "Failed to get StreamPosition: %s", err) + continue + } + } + streamPosition, err = f.changeNotifyRunner(ctx, notifyFunc, streamPosition) + if err != nil { + fs.Infof(f, "Change notify listener failure: %s", err) + } + } + } + }() +} + +func (f *Fs) changeNotifyStreamPosition(ctx context.Context) (streamPosition string, err error) { + opts := rest.Opts{ + Method: "GET", + Path: "/events", + Parameters: fieldsValue(), + } + opts.Parameters.Set("stream_position", "now") + opts.Parameters.Set("stream_type", "changes") + + var result api.Events + var resp *http.Response + err = f.pacer.Call(func() (bool, error) { + resp, err = f.srv.CallJSON(ctx, &opts, nil, &result) + return shouldRetry(ctx, resp, err) + }) + if err != nil { + return "", err + } + + return strconv.FormatInt(result.NextStreamPosition, 10), nil +} + +// Attempts to construct the full path for an object, given the ID of its +// parent directory and the name of the object. +// +// Can return "" if the parentID is not currently in the directory cache. +func (f *Fs) getFullPath(parentID string, childName string) (fullPath string) { + fullPath = "" + name := f.opt.Enc.ToStandardName(childName) + if parentID != "" { + if parentDir, ok := f.dirCache.GetInv(parentID); ok { + if len(parentDir) > 0 { + fullPath = parentDir + "/" + name + } else { + fullPath = name + } + } + } else { + // No parent, this object is at the root + fullPath = name + } + return fullPath +} + +func (f *Fs) changeNotifyRunner(ctx context.Context, notifyFunc func(string, fs.EntryType), streamPosition string) (nextStreamPosition string, err error) { + nextStreamPosition = streamPosition + + // box can send duplicate Event IDs; filter any in a single notify run + processedEventIDs := make(map[string]bool) + + for { + limit := f.opt.ListChunk + + // box only allows a max of 500 events + if limit > 500 { + limit = 500 + } + + opts := rest.Opts{ + Method: "GET", + Path: "/events", + Parameters: fieldsValue(), + } + opts.Parameters.Set("stream_position", nextStreamPosition) + opts.Parameters.Set("stream_type", "changes") + opts.Parameters.Set("limit", strconv.Itoa(limit)) + + var result api.Events + var resp *http.Response + fs.Debugf(f, "Checking for changes on remote (next_stream_position: %q)", nextStreamPosition) + err = f.pacer.Call(func() (bool, error) { + resp, err = f.srv.CallJSON(ctx, &opts, nil, &result) + return shouldRetry(ctx, resp, err) + }) + if err != nil { + return "", err + } + + if result.ChunkSize != int64(len(result.Entries)) { + return "", fmt.Errorf("invalid response to event request, chunk_size (%v) not equal to number of entries (%v)", result.ChunkSize, len(result.Entries)) + } + + nextStreamPosition = strconv.FormatInt(result.NextStreamPosition, 10) + if result.ChunkSize == 0 { + return nextStreamPosition, nil + } + + type pathToClear struct { + path string + entryType fs.EntryType + } + var pathsToClear []pathToClear + newEventIDs := 0 + for _, entry := range result.Entries { + if entry.EventID == "" || processedEventIDs[entry.EventID] { // missing Event ID, or already saw this one + continue + } + processedEventIDs[entry.EventID] = true + newEventIDs++ + + if entry.Source.ID == "" { // missing File or Folder ID + continue + } + if entry.Source.Type != api.ItemTypeFile && entry.Source.Type != api.ItemTypeFolder { // event is not for a file or folder + continue + } + + // Only interested in event types that result in a file tree change + if _, found := api.FileTreeChangeEventTypes[entry.EventType]; !found { + continue + } + + f.itemMetaCacheMu.Lock() + itemMeta, cachedItemMetaFound := f.itemMetaCache[entry.Source.ID] + if cachedItemMetaFound { + if itemMeta.SequenceID >= entry.Source.SequenceID { + // Item in the cache has the same or newer SequenceID than + // this event. Ignore this event, it must be old. + f.itemMetaCacheMu.Unlock() + continue + } + + // This event is newer. Delete its entry from the cache, + // we'll notify about its change below, then it's up to a + // future list operation to repopulate the cache. + delete(f.itemMetaCache, entry.Source.ID) + } + f.itemMetaCacheMu.Unlock() + + entryType := fs.EntryDirectory + if entry.Source.Type == api.ItemTypeFile { + entryType = fs.EntryObject + } + + // The box event only includes the new path for the object (e.g. + // the path after the object was moved). If there was an old path + // saved in our cache, it must be cleared. + if cachedItemMetaFound { + path := f.getFullPath(itemMeta.ParentID, itemMeta.Name) + if path != "" { + pathsToClear = append(pathsToClear, pathToClear{path: path, entryType: entryType}) + } + + // If this is a directory, also delete it from the dir cache. + // This will effectively invalidate the item metadata cache + // entries for all descendents of this directory, since we + // will no longer be able to construct a full path for them. + // This is exactly what we want, since we don't want to notify + // on the paths of these descendents if one of their ancestors + // has been renamed/deleted. + if entry.Source.Type == api.ItemTypeFolder { + f.dirCache.FlushDir(path) + } + } + + // If the item is "active", then it is not trashed or deleted, so + // it potentially has a valid parent. + // + // Construct the new path of the object, based on the Parent ID + // and its name. If we get an empty result, it means we don't + // currently know about this object so notification is unnecessary. + if entry.Source.ItemStatus == api.ItemStatusActive { + path := f.getFullPath(entry.Source.Parent.ID, entry.Source.Name) + if path != "" { + pathsToClear = append(pathsToClear, pathToClear{path: path, entryType: entryType}) + } + } + } + + // box can sometimes repeatedly return the same Event IDs within a + // short period of time. If it stops giving us new ones, treat it + // the same as if it returned us none at all. + if newEventIDs == 0 { + return nextStreamPosition, nil + } + + notifiedPaths := make(map[string]bool) + for _, p := range pathsToClear { + if _, ok := notifiedPaths[p.path]; ok { + continue + } + notifiedPaths[p.path] = true + notifyFunc(p.path, p.entryType) + } + fs.Debugf(f, "Received %v events, resulting in %v paths and %v notifications", len(result.Entries), len(pathsToClear), len(notifiedPaths)) + } +} + // DirCacheFlush resets the directory cache - used in testing as an // optional interface func (f *Fs) DirCacheFlush() {