tuwunel_api/client/
threads.rs1use axum::extract::State;
2use futures::{StreamExt, TryStreamExt};
3use ruma::api::client::threads::get_threads;
4use tuwunel_core::{
5 Result, at,
6 matrix::{
7 Event,
8 pdu::{PduCount, PduEvent},
9 },
10 result::FlatOk,
11 utils::stream::TryWidebandExt,
12};
13
14use crate::Ruma;
15
16pub(crate) async fn get_threads_route(
18 State(services): State<crate::State>,
19 ref body: Ruma<get_threads::v1::Request>,
20) -> Result<get_threads::v1::Response> {
21 let limit = body
23 .limit
24 .map(usize::try_from)
25 .flat_ok()
26 .unwrap_or(10)
27 .min(100);
28
29 let from: PduCount = body
30 .from
31 .as_deref()
32 .map(str::parse)
33 .transpose()?
34 .unwrap_or_else(PduCount::max);
35
36 let threads: Vec<(PduCount, PduEvent)> = services
37 .threads
38 .threads_until(body.sender_user(), &body.room_id, from, &body.include)
39 .try_filter_map(async |(count, pdu)| {
40 Ok(services
41 .state_accessor
42 .user_can_see_event(body.sender_user(), &body.room_id, &pdu.event_id)
43 .await
44 .then_some((count, pdu)))
45 })
46 .take(limit)
47 .wide_and_then(async |(count, pdu)| {
48 let pdu = services
49 .pdu_metadata
50 .bundle_aggregations(body.sender_user(), pdu)
51 .await;
52
53 Ok((count, pdu))
54 })
55 .try_collect()
56 .await?;
57
58 Ok(get_threads::v1::Response {
59 next_batch: threads
60 .last()
61 .map(at!(0))
62 .as_ref()
63 .map(ToString::to_string),
64
65 chunk: threads
66 .into_iter()
67 .map(at!(1))
68 .map(Event::into_format)
69 .collect(),
70 })
71}