Skip to main content

tuwunel_api/client/
threads.rs

1use axum::extract::State;
2use futures::{StreamExt, TryStreamExt};
3use ruma::api::client::threads::get_threads;
4use tuwunel_core::{
5	Result, at,
6	matrix::{
7		Event,
8		pdu::{PduCount, PduEvent},
9	},
10	result::FlatOk,
11	utils::stream::TryWidebandExt,
12};
13
14use crate::Ruma;
15
16/// # `GET /_matrix/client/r0/rooms/{roomId}/threads`
17pub(crate) async fn get_threads_route(
18	State(services): State<crate::State>,
19	ref body: Ruma<get_threads::v1::Request>,
20) -> Result<get_threads::v1::Response> {
21	// Use limit or else 10, with maximum 100
22	let limit = body
23		.limit
24		.map(usize::try_from)
25		.flat_ok()
26		.unwrap_or(10)
27		.min(100);
28
29	let from: PduCount = body
30		.from
31		.as_deref()
32		.map(str::parse)
33		.transpose()?
34		.unwrap_or_else(PduCount::max);
35
36	let threads: Vec<(PduCount, PduEvent)> = services
37		.threads
38		.threads_until(body.sender_user(), &body.room_id, from, &body.include)
39		.try_filter_map(async |(count, pdu)| {
40			Ok(services
41				.state_accessor
42				.user_can_see_event(body.sender_user(), &body.room_id, &pdu.event_id)
43				.await
44				.then_some((count, pdu)))
45		})
46		.take(limit)
47		.wide_and_then(async |(count, pdu)| {
48			let pdu = services
49				.pdu_metadata
50				.bundle_aggregations(body.sender_user(), pdu)
51				.await;
52
53			Ok((count, pdu))
54		})
55		.try_collect()
56		.await?;
57
58	Ok(get_threads::v1::Response {
59		next_batch: threads
60			.last()
61			.map(at!(0))
62			.as_ref()
63			.map(ToString::to_string),
64
65		chunk: threads
66			.into_iter()
67			.map(at!(1))
68			.map(Event::into_format)
69			.collect(),
70	})
71}