Skip to main content

tuwunel_api/client/sync/
mod.rs

1mod v3;
2mod v5;
3
4use futures::{FutureExt, StreamExt, pin_mut};
5use ruma::{RoomId, UserId};
6use tuwunel_core::{
7	Error, PduCount, Result,
8	matrix::pdu::PduEvent,
9	utils::stream::{BroadbandExt, ReadyExt},
10};
11use tuwunel_service::Services;
12
13pub(crate) use self::{
14	v3::{calculate_heroes, sync_events_route},
15	v5::sync_events_v5_route,
16};
17
18async fn load_timeline(
19	services: &Services,
20	sender_user: &UserId,
21	room_id: &RoomId,
22	roomsincecount: PduCount,
23	next_batch: Option<PduCount>,
24	limit: usize,
25) -> Result<(Vec<(PduCount, PduEvent)>, bool, PduCount), Error> {
26	let last_timeline_count = services
27		.timeline
28		.last_timeline_count(Some(sender_user), room_id, next_batch)
29		.await?;
30
31	if last_timeline_count <= roomsincecount {
32		return Ok((Vec::new(), false, last_timeline_count));
33	}
34
35	let non_timeline_pdus = services
36		.timeline
37		.pdus_rev(Some(sender_user), room_id, None)
38		.ready_filter_map(Result::ok)
39		.ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max))
40		.ready_take_while(|&(pducount, _)| pducount > roomsincecount);
41
42	// Take the last events for the timeline
43	pin_mut!(non_timeline_pdus);
44	let timeline_pdus: Vec<_> = non_timeline_pdus
45		.by_ref()
46		.take(limit)
47		.collect()
48		.map(|mut pdus: Vec<_>| {
49			pdus.reverse();
50			pdus
51		})
52		.await;
53
54	// They /sync response doesn't always return all messages, so we say the output
55	// is limited unless there are events in non_timeline_pdus
56	let limited = non_timeline_pdus.next().await.is_some();
57
58	Ok((timeline_pdus, limited, last_timeline_count))
59}
60
61async fn share_encrypted_room(
62	services: &Services,
63	sender_user: &UserId,
64	user_id: &UserId,
65	ignore_room: Option<&RoomId>,
66) -> bool {
67	services
68		.state_cache
69		.get_shared_rooms(sender_user, user_id)
70		.ready_filter(|&room_id| Some(room_id) != ignore_room)
71		.map(ToOwned::to_owned)
72		.broad_any(async |other_room_id| {
73			services
74				.state_accessor
75				.is_encrypted_room(&other_room_id)
76				.await
77		})
78		.await
79}