Skip to main content

tuwunel_api/client/
threads.rs

1use axum::extract::State;
2use futures::{StreamExt, TryStreamExt};
3use ruma::api::client::threads::get_threads;
4use tuwunel_core::{
5	Result, at,
6	matrix::{
7		Event,
8		pdu::{PduCount, PduEvent},
9	},
10	result::FlatOk,
11};
12
13use crate::Ruma;
14
15/// # `GET /_matrix/client/r0/rooms/{roomId}/threads`
16pub(crate) async fn get_threads_route(
17	State(services): State<crate::State>,
18	ref body: Ruma<get_threads::v1::Request>,
19) -> Result<get_threads::v1::Response> {
20	// Use limit or else 10, with maximum 100
21	let limit = body
22		.limit
23		.map(usize::try_from)
24		.flat_ok()
25		.unwrap_or(10)
26		.min(100);
27
28	let from: PduCount = body
29		.from
30		.as_deref()
31		.map(str::parse)
32		.transpose()?
33		.unwrap_or_else(PduCount::max);
34
35	let threads: Vec<(PduCount, PduEvent)> = services
36		.threads
37		.threads_until(body.sender_user(), &body.room_id, from, &body.include)
38		.try_filter_map(async |(count, pdu)| {
39			Ok(services
40				.state_accessor
41				.user_can_see_event(body.sender_user(), &body.room_id, &pdu.event_id)
42				.await
43				.then_some((count, pdu)))
44		})
45		.take(limit)
46		.try_collect()
47		.await?;
48
49	Ok(get_threads::v1::Response {
50		next_batch: threads
51			.last()
52			.map(at!(0))
53			.as_ref()
54			.map(ToString::to_string),
55
56		chunk: threads
57			.into_iter()
58			.map(at!(1))
59			.map(Event::into_format)
60			.collect(),
61	})
62}