tuwunel_service/rooms/state_accessor/
room_state.rs1use futures::{Stream, StreamExt, TryFutureExt};
2use ruma::{OwnedEventId, RoomId, events::StateEventType};
3use serde::Deserialize;
4use tuwunel_core::{
5 Result, err, implement,
6 matrix::{Event, Pdu, StateKey},
7};
8
9#[implement(super::Service)]
11pub async fn room_state_get_content<T>(
12 &self,
13 room_id: &RoomId,
14 event_type: &StateEventType,
15 state_key: &str,
16) -> Result<T>
17where
18 T: for<'de> Deserialize<'de> + Send,
19{
20 self.room_state_get(room_id, event_type, state_key)
21 .await
22 .and_then(|event| event.get_content())
23}
24
25#[implement(super::Service)]
27#[tracing::instrument(skip(self), level = "debug")]
28pub fn room_state_type_pdus<'a>(
29 &'a self,
30 room_id: &'a RoomId,
31 event_type: &'a StateEventType,
32) -> impl Stream<Item = Result<impl Event>> + Send + 'a {
33 self.services
34 .state
35 .get_room_shortstatehash(room_id)
36 .map_ok(|shortstatehash| {
37 self.state_type_pdus(shortstatehash, event_type)
38 .map(Ok)
39 })
40 .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
41 .try_flatten_stream()
42}
43
44#[implement(super::Service)]
46#[tracing::instrument(skip(self), level = "debug")]
47pub fn room_state_full<'a>(
48 &'a self,
49 room_id: &'a RoomId,
50) -> impl Stream<Item = Result<((StateEventType, StateKey), impl Event)>> + Send + 'a {
51 self.services
52 .state
53 .get_room_shortstatehash(room_id)
54 .map_ok(|shortstatehash| self.state_full(shortstatehash).map(Ok))
55 .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
56 .try_flatten_stream()
57}
58
59#[implement(super::Service)]
61#[tracing::instrument(skip(self), level = "debug")]
62pub fn room_state_full_pdus<'a>(
63 &'a self,
64 room_id: &'a RoomId,
65) -> impl Stream<Item = Result<impl Event>> + Send + 'a {
66 self.services
67 .state
68 .get_room_shortstatehash(room_id)
69 .map_ok(|shortstatehash| self.state_full_pdus(shortstatehash).map(Ok))
70 .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
71 .try_flatten_stream()
72}
73
74#[implement(super::Service)]
77#[tracing::instrument(skip(self), level = "debug")]
78pub async fn room_state_get_id(
79 &self,
80 room_id: &RoomId,
81 event_type: &StateEventType,
82 state_key: &str,
83) -> Result<OwnedEventId> {
84 self.services
85 .state
86 .get_room_shortstatehash(room_id)
87 .and_then(|shortstatehash| self.state_get_id(shortstatehash, event_type, state_key))
88 .await
89}
90
91#[implement(super::Service)]
94#[tracing::instrument(skip(self), level = "debug")]
95pub fn room_state_keys_with_ids<'a>(
96 &'a self,
97 room_id: &'a RoomId,
98 event_type: &'a StateEventType,
99) -> impl Stream<Item = Result<(StateKey, OwnedEventId)>> + Send + 'a {
100 self.services
101 .state
102 .get_room_shortstatehash(room_id)
103 .map_ok(|shortstatehash| {
104 self.state_keys_with_ids(shortstatehash, event_type)
105 .map(Ok)
106 })
107 .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
108 .try_flatten_stream()
109}
110
111#[implement(super::Service)]
113#[tracing::instrument(skip(self), level = "debug")]
114pub fn room_state_keys<'a>(
115 &'a self,
116 room_id: &'a RoomId,
117 event_type: &'a StateEventType,
118) -> impl Stream<Item = Result<StateKey>> + Send + 'a {
119 self.services
120 .state
121 .get_room_shortstatehash(room_id)
122 .map_ok(|shortstatehash| {
123 self.state_keys(shortstatehash, event_type)
124 .map(Ok)
125 })
126 .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
127 .try_flatten_stream()
128}
129
130#[implement(super::Service)]
133#[tracing::instrument(skip(self), level = "debug")]
134pub async fn room_state_get(
135 &self,
136 room_id: &RoomId,
137 event_type: &StateEventType,
138 state_key: &str,
139) -> Result<Pdu> {
140 self.services
141 .state
142 .get_room_shortstatehash(room_id)
143 .and_then(|shortstatehash| self.state_get(shortstatehash, event_type, state_key))
144 .await
145}