tuwunel_api/client/sync/
mod.rs1mod v3;
2mod v5;
3
4use futures::{FutureExt, StreamExt, pin_mut};
5use ruma::{RoomId, UserId};
6use tuwunel_core::{
7 Error, PduCount, Result,
8 matrix::pdu::PduEvent,
9 utils::stream::{BroadbandExt, ReadyExt},
10};
11use tuwunel_service::Services;
12
13pub(crate) use self::{
14 v3::{calculate_heroes, sync_events_route},
15 v5::sync_events_v5_route,
16};
17
18async fn load_timeline(
19 services: &Services,
20 sender_user: &UserId,
21 room_id: &RoomId,
22 roomsincecount: PduCount,
23 next_batch: Option<PduCount>,
24 limit: usize,
25) -> Result<(Vec<(PduCount, PduEvent)>, bool, PduCount), Error> {
26 let last_timeline_count = services
27 .timeline
28 .last_timeline_count(Some(sender_user), room_id, next_batch)
29 .await?;
30
31 if last_timeline_count <= roomsincecount {
32 return Ok((Vec::new(), false, last_timeline_count));
33 }
34
35 let non_timeline_pdus = services
36 .timeline
37 .pdus_rev(Some(sender_user), room_id, None)
38 .ready_filter_map(Result::ok)
39 .ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max))
40 .ready_take_while(|&(pducount, _)| pducount > roomsincecount);
41
42 pin_mut!(non_timeline_pdus);
44 let timeline_pdus: Vec<_> = non_timeline_pdus
45 .by_ref()
46 .take(limit)
47 .collect()
48 .map(|mut pdus: Vec<_>| {
49 pdus.reverse();
50 pdus
51 })
52 .await;
53
54 let limited = non_timeline_pdus.next().await.is_some();
57
58 Ok((timeline_pdus, limited, last_timeline_count))
59}
60
61async fn share_encrypted_room(
62 services: &Services,
63 sender_user: &UserId,
64 user_id: &UserId,
65 ignore_room: Option<&RoomId>,
66) -> bool {
67 services
68 .state_cache
69 .get_shared_rooms(sender_user, user_id)
70 .ready_filter(|&room_id| Some(room_id) != ignore_room)
71 .map(ToOwned::to_owned)
72 .broad_any(async |other_room_id| {
73 services
74 .state_accessor
75 .is_encrypted_room(&other_room_id)
76 .await
77 })
78 .await
79}