Skip to main content

tuwunel_service/rooms/state_accessor/
room_state.rs

1use futures::{Stream, StreamExt, TryFutureExt};
2use ruma::{OwnedEventId, RoomId, events::StateEventType};
3use serde::Deserialize;
4use tuwunel_core::{
5	Result, err, implement,
6	matrix::{Event, Pdu, StateKey},
7};
8
9/// Returns a single PDU from `room_id` with key (`event_type`,`state_key`).
10#[implement(super::Service)]
11pub async fn room_state_get_content<T>(
12	&self,
13	room_id: &RoomId,
14	event_type: &StateEventType,
15	state_key: &str,
16) -> Result<T>
17where
18	T: for<'de> Deserialize<'de> + Send,
19{
20	self.room_state_get(room_id, event_type, state_key)
21		.await
22		.and_then(|event| event.get_content())
23}
24
25/// Returns the room state events for a specific type.
26#[implement(super::Service)]
27#[tracing::instrument(skip(self), level = "debug")]
28pub fn room_state_type_pdus<'a>(
29	&'a self,
30	room_id: &'a RoomId,
31	event_type: &'a StateEventType,
32) -> impl Stream<Item = Result<impl Event>> + Send + 'a {
33	self.services
34		.state
35		.get_room_shortstatehash(room_id)
36		.map_ok(|shortstatehash| {
37			self.state_type_pdus(shortstatehash, event_type)
38				.map(Ok)
39				.boxed()
40		})
41		.map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
42		.try_flatten_stream()
43}
44
45/// Returns the full room state.
46#[implement(super::Service)]
47#[tracing::instrument(skip(self), level = "debug")]
48pub fn room_state_full<'a>(
49	&'a self,
50	room_id: &'a RoomId,
51) -> impl Stream<Item = Result<((StateEventType, StateKey), impl Event)>> + Send + 'a {
52	self.services
53		.state
54		.get_room_shortstatehash(room_id)
55		.map_ok(|shortstatehash| self.state_full(shortstatehash).map(Ok).boxed())
56		.map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
57		.try_flatten_stream()
58}
59
60/// Returns the full room state pdus
61#[implement(super::Service)]
62#[tracing::instrument(skip(self), level = "debug")]
63pub fn room_state_full_pdus<'a>(
64	&'a self,
65	room_id: &'a RoomId,
66) -> impl Stream<Item = Result<impl Event>> + Send + 'a {
67	self.services
68		.state
69		.get_room_shortstatehash(room_id)
70		.map_ok(|shortstatehash| {
71			self.state_full_pdus(shortstatehash)
72				.map(Ok)
73				.boxed()
74		})
75		.map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
76		.try_flatten_stream()
77}
78
79/// Returns a single EventId from `room_id` with key (`event_type`,
80/// `state_key`).
81#[implement(super::Service)]
82#[tracing::instrument(skip(self), level = "debug")]
83pub async fn room_state_get_id(
84	&self,
85	room_id: &RoomId,
86	event_type: &StateEventType,
87	state_key: &str,
88) -> Result<OwnedEventId> {
89	self.services
90		.state
91		.get_room_shortstatehash(room_id)
92		.and_then(|shortstatehash| self.state_get_id(shortstatehash, event_type, state_key))
93		.await
94}
95
96/// Iterates the state_keys for an event_type in the state joined by the
97/// `event_id` from the current state.
98#[implement(super::Service)]
99#[tracing::instrument(skip(self), level = "debug")]
100pub fn room_state_keys_with_ids<'a>(
101	&'a self,
102	room_id: &'a RoomId,
103	event_type: &'a StateEventType,
104) -> impl Stream<Item = Result<(StateKey, OwnedEventId)>> + Send + 'a {
105	self.services
106		.state
107		.get_room_shortstatehash(room_id)
108		.map_ok(|shortstatehash| {
109			self.state_keys_with_ids(shortstatehash, event_type)
110				.map(Ok)
111				.boxed()
112		})
113		.map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
114		.try_flatten_stream()
115}
116
117/// Iterates the state_keys for an event_type in the state
118#[implement(super::Service)]
119#[tracing::instrument(skip(self), level = "debug")]
120pub fn room_state_keys<'a>(
121	&'a self,
122	room_id: &'a RoomId,
123	event_type: &'a StateEventType,
124) -> impl Stream<Item = Result<StateKey>> + Send + 'a {
125	self.services
126		.state
127		.get_room_shortstatehash(room_id)
128		.map_ok(|shortstatehash| {
129			self.state_keys(shortstatehash, event_type)
130				.map(Ok)
131				.boxed()
132		})
133		.map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
134		.try_flatten_stream()
135}
136
137/// Returns a single PDU from `room_id` with key (`event_type`,
138/// `state_key`).
139#[implement(super::Service)]
140#[tracing::instrument(skip(self), level = "debug")]
141pub async fn room_state_get(
142	&self,
143	room_id: &RoomId,
144	event_type: &StateEventType,
145	state_key: &str,
146) -> Result<Pdu> {
147	self.services
148		.state
149		.get_room_shortstatehash(room_id)
150		.and_then(|shortstatehash| self.state_get(shortstatehash, event_type, state_key))
151		.await
152}