1mod bump_stamp;
2mod heroes;
3
4use std::collections::{BTreeMap, HashSet};
5
6use futures::{
7 FutureExt, StreamExt, TryFutureExt, TryStreamExt,
8 future::{join, join3, join4},
9};
10use ruma::{
11 JsOption, OwnedRoomId, UserId,
12 api::client::sync::sync_events::{UnreadNotificationsCount, v5::response},
13 events::{StateEventType, TimelineEventType, room::member::MembershipState},
14};
15use tuwunel_core::{
16 Result, at, err, error, is_equal_to,
17 itertools::Itertools,
18 matrix::{Event, StateKey, pdu::PduCount},
19 ref_at,
20 utils::{
21 BoolExt, IterStream, ReadyExt, TryFutureExtExt, math::usize_from_ruma, result::FlatOk,
22 stream::BroadbandExt,
23 },
24};
25use tuwunel_service::sync::Room;
26
27use self::{bump_stamp::room_bump_stamp, heroes::calculate_heroes};
28use super::{super::load_timeline, Connection, SyncInfo, Window, WindowRoom};
29use crate::client::{annotate_membership, ignored_filter, with_membership};
30
31#[tracing::instrument(
32 name = "rooms",
33 level = "debug",
34 skip_all,
35 fields(
36 next_batch = conn.next_batch,
37 window = window.len(),
38 )
39)]
40pub(super) async fn handle(
41 sync_info: SyncInfo<'_>,
42 conn: &Connection,
43 window: &Window,
44) -> Result<BTreeMap<OwnedRoomId, response::Room>> {
45 window
46 .iter()
47 .stream()
48 .broad_filter_map(async |(room_id, room)| {
49 handle_room(sync_info, conn, room)
50 .map_ok(move |room| (room_id.clone(), room))
51 .inspect_err(|e| error!(?room_id, "sync handler: {e:?}"))
52 .await
53 .ok()
54 })
55 .collect()
56 .map(Ok)
57 .await
58}
59
60#[tracing::instrument(
61 name = "room",
62 level = "debug",
63 skip_all,
64 fields(room_id, roomsince)
65)]
66async fn handle_room(
67 SyncInfo { services, sender_user, .. }: SyncInfo<'_>,
68 conn: &Connection,
69 WindowRoom {
70 lists, membership, room_id, last_count, ..
71 }: &WindowRoom,
72) -> Result<response::Room> {
73 let &Room { roomsince, .. } = conn
74 .rooms
75 .get(room_id)
76 .ok_or_else(|| err!("Missing connection state for {room_id}"))?;
77
78 debug_assert!(
79 *last_count > roomsince || *last_count == 0 || roomsince == 0,
80 "Stale room shouldn't be in the window"
81 );
82
83 if matches!(*membership, Some(MembershipState::Leave | MembershipState::Ban)) {
84 return Ok(response::Room {
85 initial: roomsince.eq(&0).then_some(true),
86 lists: lists.clone(),
87 membership: membership.clone(),
88 prev_batch: Some(conn.next_batch.to_string().into()),
89 limited: true,
90 required_state: vec![
91 services
92 .state_accessor
93 .room_state_get(room_id, &StateEventType::RoomMember, sender_user.as_str())
94 .map_ok(Event::into_format)
95 .await?,
96 ],
97
98 ..Default::default()
99 });
100 }
101
102 let is_invite = *membership == Some(MembershipState::Invite);
103
104 let encrypted = services
105 .state_accessor
106 .is_encrypted_room(room_id)
107 .await;
108
109 let default_details = (0_usize, HashSet::new());
110 let (timeline_limit, required_state) = lists
111 .iter()
112 .filter_map(|list_id| conn.lists.get(list_id))
113 .map(|list| &list.room_details)
114 .chain(conn.subscriptions.get(room_id).into_iter())
115 .fold(default_details, |(mut timeline_limit, mut required_state), config| {
116 let limit = usize_from_ruma(config.timeline_limit);
117
118 timeline_limit = timeline_limit.max(limit);
119 required_state.extend(config.required_state.clone());
120
121 (timeline_limit, required_state)
122 });
123
124 let timeline = is_invite.is_false().then_async(|| {
125 load_timeline(
126 services,
127 sender_user,
128 room_id,
129 PduCount::Normal(roomsince),
130 Some(PduCount::from(conn.next_batch)),
131 timeline_limit,
132 )
133 });
134
135 let (timeline_pdus, limited, last_timeline_count) = timeline
136 .await
137 .flat_ok()
138 .unwrap_or_else(|| (Vec::new(), true, PduCount::default()));
139
140 let required_state = required_state
141 .into_iter()
142 .filter(|_| !timeline_pdus.is_empty())
143 .collect::<Vec<_>>();
144
145 let prev_batch = timeline_pdus
146 .first()
147 .map(at!(0))
148 .map(PduCount::into_unsigned)
149 .as_ref()
150 .map(ToString::to_string);
151
152 let bump_stamp = room_bump_stamp(
153 services,
154 sender_user,
155 room_id,
156 PduCount::Normal(roomsince),
157 PduCount::from(conn.next_batch),
158 last_timeline_count,
159 )
160 .await;
161
162 let num_live = roomsince
163 .ne(&0)
164 .and_is(limited || timeline_pdus.len() >= timeline_limit)
165 .then_async(|| {
166 services
167 .timeline
168 .pdus(None, room_id, Some(roomsince.into()))
169 .count()
170 .map(TryInto::try_into)
171 .map(Result::ok)
172 });
173
174 let lazy = required_state
175 .iter()
176 .any(is_equal_to!(&(StateEventType::RoomMember, "$LAZY".into())));
177
178 let timeline_senders = timeline_pdus
179 .iter()
180 .filter(|_| lazy)
181 .map(ref_at!(1))
182 .map(Event::sender)
183 .map(UserId::as_str);
184
185 let timeline_member_targets = timeline_pdus
186 .iter()
187 .filter(|_| lazy)
188 .map(ref_at!(1))
189 .filter(|event| *event.event_type() == TimelineEventType::RoomMember)
190 .filter_map(Event::state_key);
191
192 let timeline_senders: Vec<_> = timeline_senders
193 .chain(timeline_member_targets)
194 .sorted_unstable()
195 .dedup()
196 .collect();
197
198 let timeline_senders = timeline_senders
199 .iter()
200 .map(|sender| (StateEventType::RoomMember, StateKey::from_str(sender)))
201 .stream();
202
203 let wildcard_state = required_state
204 .iter()
205 .filter(|(_, state_key)| state_key == "*")
206 .map(|(event_type, _)| {
207 services
208 .state_accessor
209 .room_state_keys(room_id, event_type)
210 .map_ok(|state_key| (event_type.clone(), state_key))
211 .ready_filter_map(Result::ok)
212 })
213 .stream()
214 .flatten();
215
216 let required_state = required_state
217 .iter()
218 .cloned()
219 .stream()
220 .chain(wildcard_state)
221 .chain(timeline_senders)
222 .broad_filter_map(async |state| {
223 let state_key: StateKey = match state.1.as_str() {
224 | "$LAZY" | "*" => return None,
225 | "$ME" => sender_user.as_str().into(),
226 | _ => state.1.clone(),
227 };
228
229 let mut pdu = services
230 .state_accessor
231 .room_state_get(room_id, &state.0, &state_key)
232 .map_ok(Event::into_pdu)
233 .ok()
234 .await?;
235
236 annotate_membership(services, &mut pdu, sender_user, encrypted).await;
237
238 Some(Event::into_format(pdu))
239 })
240 .collect();
241
242 let invite_state = is_invite.then_async(|| {
244 services
245 .state_cache
246 .invite_state(sender_user, room_id)
247 .ok()
248 });
249
250 let room_name = services
251 .state_accessor
252 .get_name(room_id)
253 .map_ok(Into::into)
254 .map(Result::ok);
255
256 let room_avatar = services
257 .state_accessor
258 .get_avatar(room_id)
259 .map_ok(|content| content.url)
260 .ok()
261 .map(Option::flatten);
262
263 let highlight_count = services
264 .pusher
265 .highlight_count(sender_user, room_id)
266 .map(TryInto::try_into)
267 .map(Result::ok);
268
269 let notification_count = services
270 .pusher
271 .notification_count(sender_user, room_id)
272 .map(TryInto::try_into)
273 .map(Result::ok);
274
275 let joined_count = services
276 .state_cache
277 .room_joined_count(room_id)
278 .map_ok(TryInto::try_into)
279 .map_ok(Result::ok)
280 .map(FlatOk::flat_ok);
281
282 let invited_count = services
283 .state_cache
284 .room_invited_count(room_id)
285 .map_ok(TryInto::try_into)
286 .map_ok(Result::ok)
287 .map(FlatOk::flat_ok);
288
289 let is_dm = services
290 .state_accessor
291 .is_direct(room_id, sender_user)
292 .map(|is_dm| is_dm.then_some(is_dm));
293
294 let last_read_count = services
295 .pusher
296 .last_notification_read(sender_user, room_id);
297
298 let timeline = timeline_pdus
299 .iter()
300 .stream()
301 .filter_map(|item| ignored_filter(services, item.clone(), sender_user))
302 .map(at!(1))
303 .broad_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
304 .map(Event::into_format)
305 .collect();
306
307 let meta = join3(room_name, room_avatar, is_dm);
308 let events = join4(timeline, num_live, required_state, invite_state);
309 let member_counts = join(joined_count, invited_count);
310 let notification_counts = join3(highlight_count, notification_count, last_read_count);
311 let (
312 (room_name, room_avatar, is_dm),
313 (timeline, num_live, required_state, invite_state),
314 (joined_count, invited_count),
315 (highlight_count, notification_count, _last_notification_read),
316 ) = join4(meta, events, member_counts, notification_counts)
317 .boxed()
318 .await;
319
320 let heroes = services
321 .config
322 .calculate_heroes
323 .then_async(|| {
324 calculate_heroes(
325 services,
326 sender_user,
327 room_id,
328 room_name.as_ref(),
329 room_avatar.as_deref(),
330 )
331 })
332 .await
333 .unwrap_or_default();
334
335 let (heroes, heroes_name, heroes_avatar) = heroes;
336
337 Ok(response::Room {
338 initial: roomsince.eq(&0).then_some(true),
339 lists: lists.clone(),
340 membership: membership.clone(),
341 name: room_name.or(heroes_name),
342 avatar: JsOption::from_option(room_avatar.or(heroes_avatar)),
343 is_dm,
344 heroes,
345 required_state,
346 invite_state: invite_state.flatten(),
347 prev_batch: prev_batch.as_deref().map(Into::into),
348 num_live: num_live.flatten(),
349 limited,
350 timeline,
351 bump_stamp,
352 joined_count,
353 invited_count,
354 unread_notifications: UnreadNotificationsCount { highlight_count, notification_count },
355 })
356}