tuwunel_api/client/
threads.rs1use axum::extract::State;
2use futures::{StreamExt, TryStreamExt};
3use ruma::api::client::threads::get_threads;
4use tuwunel_core::{
5 Result, at,
6 matrix::{
7 Event,
8 pdu::{PduCount, PduEvent},
9 },
10 result::FlatOk,
11};
12
13use crate::Ruma;
14
15pub(crate) async fn get_threads_route(
17 State(services): State<crate::State>,
18 ref body: Ruma<get_threads::v1::Request>,
19) -> Result<get_threads::v1::Response> {
20 let limit = body
22 .limit
23 .map(usize::try_from)
24 .flat_ok()
25 .unwrap_or(10)
26 .min(100);
27
28 let from: PduCount = body
29 .from
30 .as_deref()
31 .map(str::parse)
32 .transpose()?
33 .unwrap_or_else(PduCount::max);
34
35 let threads: Vec<(PduCount, PduEvent)> = services
36 .threads
37 .threads_until(body.sender_user(), &body.room_id, from, &body.include)
38 .try_filter_map(async |(count, pdu)| {
39 Ok(services
40 .state_accessor
41 .user_can_see_event(body.sender_user(), &body.room_id, &pdu.event_id)
42 .await
43 .then_some((count, pdu)))
44 })
45 .take(limit)
46 .try_collect()
47 .await?;
48
49 Ok(get_threads::v1::Response {
50 next_batch: threads
51 .last()
52 .map(at!(0))
53 .as_ref()
54 .map(ToString::to_string),
55
56 chunk: threads
57 .into_iter()
58 .map(at!(1))
59 .map(Event::into_format)
60 .collect(),
61 })
62}