Skip to main content

tuwunel_service/rooms/state_accessor/
room_state.rs

1use futures::{Stream, StreamExt, TryFutureExt};
2use ruma::{OwnedEventId, RoomId, events::StateEventType};
3use serde::Deserialize;
4use tuwunel_core::{
5	Result, err, implement,
6	matrix::{Event, Pdu, StateKey},
7};
8
9/// Returns a single PDU from `room_id` with key (`event_type`,`state_key`).
10#[implement(super::Service)]
11pub async fn room_state_get_content<T>(
12	&self,
13	room_id: &RoomId,
14	event_type: &StateEventType,
15	state_key: &str,
16) -> Result<T>
17where
18	T: for<'de> Deserialize<'de> + Send,
19{
20	self.room_state_get(room_id, event_type, state_key)
21		.await
22		.and_then(|event| event.get_content())
23}
24
25/// Returns the room state events for a specific type.
26#[implement(super::Service)]
27#[tracing::instrument(skip(self), level = "debug")]
28pub fn room_state_type_pdus<'a>(
29	&'a self,
30	room_id: &'a RoomId,
31	event_type: &'a StateEventType,
32) -> impl Stream<Item = Result<impl Event>> + Send + 'a {
33	self.services
34		.state
35		.get_room_shortstatehash(room_id)
36		.map_ok(|shortstatehash| {
37			self.state_type_pdus(shortstatehash, event_type)
38				.map(Ok)
39		})
40		.map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
41		.try_flatten_stream()
42}
43
44/// Returns the full room state.
45#[implement(super::Service)]
46#[tracing::instrument(skip(self), level = "debug")]
47pub fn room_state_full<'a>(
48	&'a self,
49	room_id: &'a RoomId,
50) -> impl Stream<Item = Result<((StateEventType, StateKey), impl Event)>> + Send + 'a {
51	self.services
52		.state
53		.get_room_shortstatehash(room_id)
54		.map_ok(|shortstatehash| self.state_full(shortstatehash).map(Ok))
55		.map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
56		.try_flatten_stream()
57}
58
59/// Returns the full room state pdus
60#[implement(super::Service)]
61#[tracing::instrument(skip(self), level = "debug")]
62pub fn room_state_full_pdus<'a>(
63	&'a self,
64	room_id: &'a RoomId,
65) -> impl Stream<Item = Result<impl Event>> + Send + 'a {
66	self.services
67		.state
68		.get_room_shortstatehash(room_id)
69		.map_ok(|shortstatehash| self.state_full_pdus(shortstatehash).map(Ok))
70		.map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
71		.try_flatten_stream()
72}
73
74/// Returns a single EventId from `room_id` with key (`event_type`,
75/// `state_key`).
76#[implement(super::Service)]
77#[tracing::instrument(skip(self), level = "debug")]
78pub async fn room_state_get_id(
79	&self,
80	room_id: &RoomId,
81	event_type: &StateEventType,
82	state_key: &str,
83) -> Result<OwnedEventId> {
84	self.services
85		.state
86		.get_room_shortstatehash(room_id)
87		.and_then(|shortstatehash| self.state_get_id(shortstatehash, event_type, state_key))
88		.await
89}
90
91/// Iterates the state_keys for an event_type in the state joined by the
92/// `event_id` from the current state.
93#[implement(super::Service)]
94#[tracing::instrument(skip(self), level = "debug")]
95pub fn room_state_keys_with_ids<'a>(
96	&'a self,
97	room_id: &'a RoomId,
98	event_type: &'a StateEventType,
99) -> impl Stream<Item = Result<(StateKey, OwnedEventId)>> + Send + 'a {
100	self.services
101		.state
102		.get_room_shortstatehash(room_id)
103		.map_ok(|shortstatehash| {
104			self.state_keys_with_ids(shortstatehash, event_type)
105				.map(Ok)
106		})
107		.map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
108		.try_flatten_stream()
109}
110
111/// Iterates the state_keys for an event_type in the state
112#[implement(super::Service)]
113#[tracing::instrument(skip(self), level = "debug")]
114pub fn room_state_keys<'a>(
115	&'a self,
116	room_id: &'a RoomId,
117	event_type: &'a StateEventType,
118) -> impl Stream<Item = Result<StateKey>> + Send + 'a {
119	self.services
120		.state
121		.get_room_shortstatehash(room_id)
122		.map_ok(|shortstatehash| {
123			self.state_keys(shortstatehash, event_type)
124				.map(Ok)
125		})
126		.map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
127		.try_flatten_stream()
128}
129
130/// Returns a single PDU from `room_id` with key (`event_type`,
131/// `state_key`).
132#[implement(super::Service)]
133#[tracing::instrument(skip(self), level = "debug")]
134pub async fn room_state_get(
135	&self,
136	room_id: &RoomId,
137	event_type: &StateEventType,
138	state_key: &str,
139) -> Result<Pdu> {
140	self.services
141		.state
142		.get_room_shortstatehash(room_id)
143		.and_then(|shortstatehash| self.state_get(shortstatehash, event_type, state_key))
144		.await
145}