diff --git a/backend/cache/cache.go b/backend/cache/cache.go index e40611a0a..439c3ac13 100644 --- a/backend/cache/cache.go +++ b/backend/cache/cache.go @@ -337,6 +337,9 @@ func NewFs(name, rootPath string) (fs.Fs, error) { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGHUP) atexit.Register(func() { + if plexURL != "" { + f.plexConnector.closeWebsocket() + } f.StopBackgroundRunners() }) go func() { diff --git a/backend/cache/handle.go b/backend/cache/handle.go index 05e629532..3a5c54a3a 100644 --- a/backend/cache/handle.go +++ b/backend/cache/handle.go @@ -146,34 +146,17 @@ func (r *Handle) scaleWorkers(desired int) { } } -func (r *Handle) requestExternalConfirmation() { - // if there's no external confirmation available - // then we skip this step - if len(r.workers) >= r.cacheFs().totalMaxWorkers || - !r.cacheFs().plexConnector.isConnected() { - return - } - go r.cacheFs().plexConnector.isPlayingAsync(r.cachedObject, r.confirmReading) -} - func (r *Handle) confirmExternalReading() { // if we have a max value of workers // or there's no external confirmation available // then we skip this step - if len(r.workers) >= r.cacheFs().totalMaxWorkers || + if len(r.workers) > 1 || !r.cacheFs().plexConnector.isConnected() { return } - - select { - case confirmed := <-r.confirmReading: - if !confirmed { - return - } - default: + if !r.cacheFs().plexConnector.isPlaying(r.cachedObject) { return } - fs.Infof(r, "confirmed reading by external reader") r.scaleWorkers(r.cacheFs().totalMaxWorkers) } @@ -209,8 +192,6 @@ func (r *Handle) queueOffset(offset int64) { r.seenOffsets[o] = true r.preloadQueue <- o } - - r.requestExternalConfirmation() } } @@ -294,7 +275,6 @@ func (r *Handle) Read(p []byte) (n int, err error) { // first reading if !r.reading { r.reading = true - r.requestExternalConfirmation() } // reached EOF if r.offset >= r.cachedObject.Size() { diff --git a/backend/cache/plex.go b/backend/cache/plex.go index a9395a3ce..e9e83dcfb 100644 --- a/backend/cache/plex.go +++ b/backend/cache/plex.go @@ -17,21 +17,49 @@ import ( "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs/config" + "github.com/patrickmn/go-cache" + "golang.org/x/net/websocket" ) const ( // defPlexLoginURL is the default URL for Plex login - defPlexLoginURL = "https://plex.tv/users/sign_in.json" + defPlexLoginURL = "https://plex.tv/users/sign_in.json" + defPlexNotificationURL = "%s/:/websockets/notifications?X-Plex-Token=%s" ) +// PlaySessionStateNotification is part of the API response of Plex +type PlaySessionStateNotification struct { + SessionKey string `json:"sessionKey"` + GUID string `json:"guid"` + Key string `json:"key"` + ViewOffset int64 `json:"viewOffset"` + State string `json:"state"` + TranscodeSession string `json:"transcodeSession"` +} + +// NotificationContainer is part of the API response of Plex +type NotificationContainer struct { + Type string `json:"type"` + Size int `json:"size"` + PlaySessionState []PlaySessionStateNotification `json:"PlaySessionStateNotification"` +} + +// PlexNotification is part of the API response of Plex +type PlexNotification struct { + Container NotificationContainer `json:"NotificationContainer"` +} + // plexConnector is managing the cache integration with Plex type plexConnector struct { - url *url.URL - username string - password string - token string - f *Fs - mu sync.Mutex + url *url.URL + username string + password string + token string + f *Fs + mu sync.Mutex + running bool + runningMu sync.Mutex + stateCache *cache.Cache } // newPlexConnector connects to a Plex server and generates a token @@ -42,11 +70,12 @@ func newPlexConnector(f *Fs, plexURL, username, password string) (*plexConnector } pc := &plexConnector{ - f: f, - url: u, - username: username, - password: password, - token: "", + f: f, + url: u, + username: username, + password: password, + token: "", + stateCache: cache.New(time.Hour, time.Minute), } return pc, nil @@ -60,14 +89,80 @@ func newPlexConnectorWithToken(f *Fs, plexURL, token string) (*plexConnector, er } pc := &plexConnector{ - f: f, - url: u, - token: token, + f: f, + url: u, + token: token, + stateCache: cache.New(time.Hour, time.Minute), } + pc.listenWebsocket() return pc, nil } +func (p *plexConnector) closeWebsocket() { + p.runningMu.Lock() + defer p.runningMu.Unlock() + fs.Infof("plex", "stopped Plex watcher") + p.running = false +} + +func (p *plexConnector) listenWebsocket() { + u := strings.Replace(p.url.String(), "http://", "ws://", 1) + u = strings.Replace(u, "https://", "ws://", 1) + conn, err := websocket.Dial(fmt.Sprintf(defPlexNotificationURL, strings.TrimRight(u, "/"), p.token), + "", "http://localhost") + if err != nil { + fs.Errorf("plex", "%v", err) + return + } + + p.running = true + go func() { + for { + if !p.isConnected() { + break + } + + notif := &PlexNotification{} + err := websocket.JSON.Receive(conn, notif) + if err != nil { + fs.Debugf("plex", "%v", err) + time.Sleep(time.Second) + continue + } + // we're only interested in play events + if notif.Container.Type == "playing" { + // we loop through each of them + for _, v := range notif.Container.PlaySessionState { + // event type of playing + if v.State == "playing" { + // if it's not cached get the details and cache them + if _, found := p.stateCache.Get(v.Key); !found { + req, err := http.NewRequest("GET", fmt.Sprintf("%s%s", p.url.String(), v.Key), nil) + if err != nil { + continue + } + p.fillDefaultHeaders(req) + resp, err := http.DefaultClient.Do(req) + if err != nil { + continue + } + var data []byte + data, err = ioutil.ReadAll(resp.Body) + if err != nil { + continue + } + p.stateCache.Set(v.Key, data, cache.DefaultExpiration) + } + } else if v.State == "stopped" { + p.stateCache.Delete(v.Key) + } + } + } + } + }() +} + // fillDefaultHeaders will add common headers to requests func (p *plexConnector) fillDefaultHeaders(req *http.Request) { req.Header.Add("X-Plex-Client-Identifier", fmt.Sprintf("rclone (%v)", p.f.String())) @@ -115,13 +210,16 @@ func (p *plexConnector) authenticate() error { config.SaveConfig() fs.Infof(p.f.Name(), "Connected to Plex server: %v", p.url.String()) } + p.listenWebsocket() return nil } // isConnected checks if this rclone is authenticated to Plex func (p *plexConnector) isConnected() bool { - return p.token != "" + p.runningMu.Lock() + defer p.runningMu.Unlock() + return p.running } // isConfigured checks if this rclone is configured to use a Plex server @@ -142,64 +240,8 @@ func (p *plexConnector) isPlaying(co *Object) bool { } isPlaying := false - req, err := http.NewRequest("GET", fmt.Sprintf("%s/status/sessions", p.url.String()), nil) - if err != nil { - return false - } - p.fillDefaultHeaders(req) - resp, err := http.DefaultClient.Do(req) - if err != nil { - return false - } - var data map[string]interface{} - err = json.NewDecoder(resp.Body).Decode(&data) - if err != nil { - return false - } - sizeGen, ok := get(data, "MediaContainer", "size") - if !ok { - return false - } - size, ok := sizeGen.(float64) - if !ok || size < float64(1) { - return false - } - videosGen, ok := get(data, "MediaContainer", "Video") - if !ok { - fs.Debugf("plex", "empty videos: %v", data) - return false - } - videos, ok := videosGen.([]interface{}) - if !ok || len(videos) < 1 { - fs.Debugf("plex", "empty videos: %v", data) - return false - } - for _, v := range videos { - keyGen, ok := get(v, "key") - if !ok { - fs.Debugf("plex", "failed to find: key") - continue - } - key, ok := keyGen.(string) - if !ok { - fs.Debugf("plex", "failed to understand: key") - continue - } - req, err := http.NewRequest("GET", fmt.Sprintf("%s%s", p.url.String(), key), nil) - if err != nil { - return false - } - p.fillDefaultHeaders(req) - resp, err := http.DefaultClient.Do(req) - if err != nil { - return false - } - var data []byte - data, err = ioutil.ReadAll(resp.Body) - if err != nil { - return false - } - if bytes.Contains(data, []byte(remote)) { + for _, v := range p.stateCache.Items() { + if bytes.Contains(v.Object.([]byte), []byte(remote)) { isPlaying = true break } @@ -208,12 +250,6 @@ func (p *plexConnector) isPlaying(co *Object) bool { return isPlaying } -func (p *plexConnector) isPlayingAsync(co *Object, response chan bool) { - time.Sleep(time.Second) // FIXME random guess here - res := p.isPlaying(co) - response <- res -} - // adapted from: https://stackoverflow.com/a/28878037 (credit) func get(m interface{}, path ...interface{}) (interface{}, bool) { for _, p := range path {