1use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::join3};
2use ruma::{
3 MxcUri, OwnedMxcUri, OwnedRoomId, RoomId, UserId,
4 events::room::member::{MembershipState, RoomMemberEventContent},
5 profile::ProfileFieldValue,
6 serde::Raw,
7};
8use tuwunel_core::{
9 Result, implement,
10 matrix::PduBuilder,
11 utils::{
12 future::TryExtExt,
13 stream::{IterStream, TryIgnore},
14 },
15};
16use tuwunel_database::{Deserialized, Ignore, Interfix, Json};
17
18#[derive(Copy, Clone, Debug, Eq, PartialEq)]
22pub enum Propagation {
23 All,
25
26 Unchanged,
30
31 None,
33}
34
35#[inline]
39#[must_use]
40pub fn propagation_default(preserve_room_profile_overrides: bool) -> Propagation {
41 if preserve_room_profile_overrides {
42 Propagation::Unchanged
43 } else {
44 Propagation::All
45 }
46}
47
48#[implement(super::Service)]
49pub async fn update_displayname(
50 &self,
51 user_id: &UserId,
52 displayname: Option<&str>,
53 rooms: &[OwnedRoomId],
54 propagation: Propagation,
55) {
56 let (current_avatar_url, current_blurhash, current_displayname) = join3(
57 self.services.users.avatar_url(user_id).ok(),
58 self.services.users.blurhash(user_id).ok(),
59 self.services.users.displayname(user_id).ok(),
60 )
61 .await;
62
63 if displayname == current_displayname.as_deref() {
64 return;
65 }
66
67 self.services
68 .users
69 .set_displayname(user_id, displayname);
70
71 if matches!(propagation, Propagation::None) {
72 return;
73 }
74
75 let make_pdu = || {
76 PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
77 displayname: displayname.map(ToOwned::to_owned),
78 membership: MembershipState::Join,
79 avatar_url: current_avatar_url.clone(),
80 blurhash: current_blurhash.clone(),
81 join_authorized_via_users_server: None,
82 reason: None,
83 is_direct: None,
84 third_party_invite: None,
85 })
86 };
87
88 let keep = async |room_id: &RoomId| match propagation {
89 | Propagation::All => true,
90 | Propagation::None => false,
91 | Propagation::Unchanged =>
92 self.member_displayname(room_id, user_id)
93 .await
94 .as_deref() == current_displayname.as_deref(),
95 };
96
97 let rooms = rooms
98 .iter()
99 .try_stream()
100 .try_filter(|room_id: &&OwnedRoomId| keep(room_id))
101 .and_then(async |room_id: &OwnedRoomId| Ok((make_pdu(), room_id)))
102 .ignore_err();
103
104 self.update_all_rooms(user_id, rooms)
105 .boxed()
106 .await;
107}
108
109#[implement(super::Service)]
112pub fn set_displayname(&self, user_id: &UserId, displayname: Option<&str>) {
113 if let Some(displayname) = displayname {
114 self.db
115 .userid_displayname
116 .insert(user_id, displayname);
117 } else {
118 self.db.userid_displayname.remove(user_id);
119 }
120}
121
122#[implement(super::Service)]
124pub async fn displayname(&self, user_id: &UserId) -> Result<String> {
125 self.db
126 .userid_displayname
127 .get(user_id)
128 .await
129 .deserialized()
130}
131
132#[implement(super::Service)]
133pub async fn update_avatar_url(
134 &self,
135 user_id: &UserId,
136 avatar_url: Option<&MxcUri>,
137 blurhash: Option<&str>,
138 rooms: &[OwnedRoomId],
139 propagation: Propagation,
140) {
141 let (current_avatar_url, current_blurhash, current_displayname) = join3(
142 self.services.users.avatar_url(user_id).ok(),
143 self.services.users.blurhash(user_id).ok(),
144 self.services.users.displayname(user_id).ok(),
145 )
146 .await;
147
148 if current_avatar_url.as_deref() == avatar_url && current_blurhash.as_deref() == blurhash {
149 return;
150 }
151
152 self.services
153 .users
154 .set_avatar_url(user_id, avatar_url);
155 self.services
156 .users
157 .set_blurhash(user_id, blurhash);
158
159 if matches!(propagation, Propagation::None) {
160 return;
161 }
162
163 let make_pdu = || {
164 PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
165 avatar_url: avatar_url.map(ToOwned::to_owned),
166 blurhash: blurhash.map(ToOwned::to_owned),
167 membership: MembershipState::Join,
168 displayname: current_displayname.clone(),
169 join_authorized_via_users_server: None,
170 reason: None,
171 is_direct: None,
172 third_party_invite: None,
173 })
174 };
175
176 let keep = async |room_id: &RoomId| match propagation {
177 | Propagation::All => true,
178 | Propagation::None => false,
179 | Propagation::Unchanged =>
180 self.member_avatar_url(room_id, user_id)
181 .await
182 .as_deref() == current_avatar_url.as_deref(),
183 };
184
185 let rooms = rooms
186 .iter()
187 .try_stream()
188 .try_filter(|room_id: &&OwnedRoomId| keep(room_id))
189 .and_then(async |room_id: &OwnedRoomId| Ok((make_pdu(), room_id)))
190 .ignore_err();
191
192 self.update_all_rooms(user_id, rooms)
193 .boxed()
194 .await;
195}
196
197#[implement(super::Service)]
199pub fn set_avatar_url(&self, user_id: &UserId, avatar_url: Option<&MxcUri>) {
200 if let Some(avatar_url) = avatar_url {
201 self.db
202 .userid_avatarurl
203 .insert(user_id, avatar_url);
204 } else {
205 self.db.userid_avatarurl.remove(user_id);
206 }
207}
208
209#[implement(super::Service)]
211pub async fn avatar_url(&self, user_id: &UserId) -> Result<OwnedMxcUri> {
212 self.db
213 .userid_avatarurl
214 .get(user_id)
215 .await
216 .deserialized()
217}
218
219#[implement(super::Service)]
221pub fn set_blurhash(&self, user_id: &UserId, blurhash: Option<&str>) {
222 if let Some(blurhash) = blurhash {
223 self.db.userid_blurhash.insert(user_id, blurhash);
224 } else {
225 self.db.userid_blurhash.remove(user_id);
226 }
227}
228
229#[implement(super::Service)]
231pub async fn blurhash(&self, user_id: &UserId) -> Result<String> {
232 self.db
233 .userid_blurhash
234 .get(user_id)
235 .await
236 .deserialized()
237}
238
239#[implement(super::Service)]
241pub fn set_timezone(&self, user_id: &UserId, timezone: Option<&str>) {
242 let key = (user_id, "m.tz");
243
244 if let Some(timezone) = timezone {
245 self.db
246 .useridprofilekey_value
247 .put_raw(key, timezone);
248 } else {
249 self.db.useridprofilekey_value.del(key);
250 }
251}
252
253#[implement(super::Service)]
255pub async fn timezone(&self, user_id: &UserId) -> Result<String> {
256 let stable_key = (user_id, "m.tz");
258 let unstable_key = (user_id, "us.cloke.msc4175.tz");
259 self.db
260 .useridprofilekey_value
261 .qry(&stable_key)
262 .or_else(|_| self.db.useridprofilekey_value.qry(&unstable_key))
263 .await
264 .deserialized()
265}
266
267#[implement(super::Service)]
269pub fn all_profile_keys<'a>(
270 &'a self,
271 user_id: &'a UserId,
272) -> impl Stream<Item = (String, Raw<ProfileFieldValue>)> + 'a + Send {
273 type KeyVal = ((Ignore, String), Raw<ProfileFieldValue>);
274
275 let prefix = (user_id, Interfix);
276 self.db
277 .useridprofilekey_value
278 .stream_prefix(&prefix)
279 .ignore_err()
280 .map(|((_, key), val): KeyVal| (key, val))
281}
282
283#[implement(super::Service)]
285pub fn set_profile_key(
286 &self,
287 user_id: &UserId,
288 profile_key: &str,
289 profile_key_value: Option<&serde_json::Value>,
290) {
291 let key = (user_id, profile_key);
292
293 if let Some(value) = profile_key_value {
294 self.db
295 .useridprofilekey_value
296 .put(key, Json(value));
297 } else {
298 self.db.useridprofilekey_value.del(key);
299 }
300}
301
302#[implement(super::Service)]
304pub async fn profile_key(
305 &self,
306 user_id: &UserId,
307 profile_key: &str,
308) -> Result<Raw<ProfileFieldValue>> {
309 let key = (user_id, profile_key);
310 self.db
311 .useridprofilekey_value
312 .qry(&key)
313 .await
314 .deserialized()
315}
316
317#[implement(super::Service)]
320async fn member_displayname(&self, room_id: &RoomId, user_id: &UserId) -> Option<String> {
321 self.services
322 .state_accessor
323 .get_member(room_id, user_id)
324 .await
325 .ok()
326 .and_then(|m: RoomMemberEventContent| m.displayname)
327}
328
329#[implement(super::Service)]
332async fn member_avatar_url(&self, room_id: &RoomId, user_id: &UserId) -> Option<OwnedMxcUri> {
333 self.services
334 .state_accessor
335 .get_member(room_id, user_id)
336 .await
337 .ok()
338 .and_then(|m: RoomMemberEventContent| m.avatar_url)
339}