tuwunel_service/rooms/state_accessor/
room_state.rs1use futures::{Stream, StreamExt, TryFutureExt};
2use ruma::{OwnedEventId, RoomId, events::StateEventType};
3use serde::Deserialize;
4use tuwunel_core::{
5 Result, err, implement,
6 matrix::{Event, Pdu, StateKey},
7};
8
9#[implement(super::Service)]
11pub async fn room_state_get_content<T>(
12 &self,
13 room_id: &RoomId,
14 event_type: &StateEventType,
15 state_key: &str,
16) -> Result<T>
17where
18 T: for<'de> Deserialize<'de> + Send,
19{
20 self.room_state_get(room_id, event_type, state_key)
21 .await
22 .and_then(|event| event.get_content())
23}
24
25#[implement(super::Service)]
27#[tracing::instrument(skip(self), level = "debug")]
28pub fn room_state_type_pdus<'a>(
29 &'a self,
30 room_id: &'a RoomId,
31 event_type: &'a StateEventType,
32) -> impl Stream<Item = Result<impl Event>> + Send + 'a {
33 self.services
34 .state
35 .get_room_shortstatehash(room_id)
36 .map_ok(|shortstatehash| {
37 self.state_type_pdus(shortstatehash, event_type)
38 .map(Ok)
39 .boxed()
40 })
41 .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
42 .try_flatten_stream()
43}
44
45#[implement(super::Service)]
47#[tracing::instrument(skip(self), level = "debug")]
48pub fn room_state_full<'a>(
49 &'a self,
50 room_id: &'a RoomId,
51) -> impl Stream<Item = Result<((StateEventType, StateKey), impl Event)>> + Send + 'a {
52 self.services
53 .state
54 .get_room_shortstatehash(room_id)
55 .map_ok(|shortstatehash| self.state_full(shortstatehash).map(Ok).boxed())
56 .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
57 .try_flatten_stream()
58}
59
60#[implement(super::Service)]
62#[tracing::instrument(skip(self), level = "debug")]
63pub fn room_state_full_pdus<'a>(
64 &'a self,
65 room_id: &'a RoomId,
66) -> impl Stream<Item = Result<impl Event>> + Send + 'a {
67 self.services
68 .state
69 .get_room_shortstatehash(room_id)
70 .map_ok(|shortstatehash| {
71 self.state_full_pdus(shortstatehash)
72 .map(Ok)
73 .boxed()
74 })
75 .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
76 .try_flatten_stream()
77}
78
79#[implement(super::Service)]
82#[tracing::instrument(skip(self), level = "debug")]
83pub async fn room_state_get_id(
84 &self,
85 room_id: &RoomId,
86 event_type: &StateEventType,
87 state_key: &str,
88) -> Result<OwnedEventId> {
89 self.services
90 .state
91 .get_room_shortstatehash(room_id)
92 .and_then(|shortstatehash| self.state_get_id(shortstatehash, event_type, state_key))
93 .await
94}
95
96#[implement(super::Service)]
99#[tracing::instrument(skip(self), level = "debug")]
100pub fn room_state_keys_with_ids<'a>(
101 &'a self,
102 room_id: &'a RoomId,
103 event_type: &'a StateEventType,
104) -> impl Stream<Item = Result<(StateKey, OwnedEventId)>> + Send + 'a {
105 self.services
106 .state
107 .get_room_shortstatehash(room_id)
108 .map_ok(|shortstatehash| {
109 self.state_keys_with_ids(shortstatehash, event_type)
110 .map(Ok)
111 .boxed()
112 })
113 .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
114 .try_flatten_stream()
115}
116
117#[implement(super::Service)]
119#[tracing::instrument(skip(self), level = "debug")]
120pub fn room_state_keys<'a>(
121 &'a self,
122 room_id: &'a RoomId,
123 event_type: &'a StateEventType,
124) -> impl Stream<Item = Result<StateKey>> + Send + 'a {
125 self.services
126 .state
127 .get_room_shortstatehash(room_id)
128 .map_ok(|shortstatehash| {
129 self.state_keys(shortstatehash, event_type)
130 .map(Ok)
131 .boxed()
132 })
133 .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
134 .try_flatten_stream()
135}
136
137#[implement(super::Service)]
140#[tracing::instrument(skip(self), level = "debug")]
141pub async fn room_state_get(
142 &self,
143 room_id: &RoomId,
144 event_type: &StateEventType,
145 state_key: &str,
146) -> Result<Pdu> {
147 self.services
148 .state
149 .get_room_shortstatehash(room_id)
150 .and_then(|shortstatehash| self.state_get(shortstatehash, event_type, state_key))
151 .await
152}