FIX: correctly re-conciliate message bus backlog (#22020)

We have been struggling a lot on this lately as it's almost impossible to write a decent test for this.

The important things which need to happen:
- fetch the unread/mention state and last message bus channel ids of each chat channels
- stop all subscriptions
- restart global chat subscriptions
- update channels with new state and ensure the message bus ids are updated
- restart subscriptions of each chat channel

As a followup we need to start implementing a standard way to query for a resource state. Something similar to: `/channels/tracking` and `/channels/:id/tracking`

Each of these endpoints would return a state similar to:

```json
{
  tracking: { ... },
  message_bus_ids: { ... }
}
This commit is contained in:
Joffrey JAFFEUX 2023-06-09 09:00:24 +02:00 committed by GitHub
parent e08f713668
commit dbf3ff1738
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -113,6 +113,11 @@ export default class Chat extends Service {
// NOTE: channels is more than a simple array, it also contains
// tracking and membership data, see Chat::StructuredChannelSerializer
this.chatApi.listCurrentUserChannels().then((channelsView) => {
this.chatSubscriptionsManager.stopChannelsSubscriptions();
this.chatSubscriptionsManager.startChannelsSubscriptions(
channelsView.meta.message_bus_last_ids
);
[
...channelsView.public_channels,
...channelsView.direct_message_channels,
@ -120,6 +125,9 @@ export default class Chat extends Service {
this.chatChannelsManager
.find(channelObject.id, { fetchIfNotFound: false })
.then((channel) => {
if (!channel) {
return;
}
// TODO (martin) We need to do something here for thread tracking
// state as well on presence change, otherwise we will be back in
// the same place as the channels were.
@ -127,17 +135,18 @@ export default class Chat extends Service {
// At some point it would likely be better to just fetch an
// endpoint that gives you all channel tracking state and the
// thread tracking state for the current channel.
if (channel) {
channel.meta.message_bus_last_ids =
channelObject.meta.message_bus_last_ids;
channel.updateMembership(channelObject.current_user_membership);
const channelTrackingState =
channelsView.tracking.channel_tracking[channel.id];
channel.tracking.unreadCount =
channelTrackingState.unread_count;
channel.tracking.mentionCount =
channelTrackingState.mention_count;
}
// ensures we have the latest message bus ids
channel.meta.message_bus_last_ids =
channelObject.meta.message_bus_last_ids;
const state = channelsView.tracking.channel_tracking[channel.id];
channel.tracking.unreadCount = state.unread_count;
channel.tracking.mentionCount = state.mention_count;
channel.updateMembership(channelObject.current_user_membership);
this.chatSubscriptionsManager.startChannelSubscription(channel);
});
});
});