Skip to main content

tuwunel_service/threepid/
binding.rs

1use futures::{Stream, StreamExt};
2use ruma::{
3	MilliSecondsSinceUnixEpoch, OwnedUserId, UserId,
4	thirdparty::{Medium, ThirdPartyIdentifier, ThirdPartyIdentifierInit},
5};
6use tuwunel_core::{Result, implement, utils::stream::TryIgnore};
7use tuwunel_database::{Cbor, Deserialized, Ignore, Interfix};
8
9use super::Binding;
10
11/// Persist a binding in both directions: the forward `(user, email)` row with
12/// its metadata, and the reverse `email -> user` lookup.
13#[implement(super::Service)]
14#[tracing::instrument(
15	level = "debug",
16	skip(self),
17	fields(
18		%user_id,
19	),
20)]
21pub async fn put_binding(
22	&self,
23	user_id: &UserId,
24	email_canon: &str,
25	medium: Medium,
26	validated_at: MilliSecondsSinceUnixEpoch,
27	added_at: MilliSecondsSinceUnixEpoch,
28) {
29	let binding = Binding { medium, validated_at, added_at };
30
31	self.db
32		.userid_email
33		.put((user_id, email_canon), Cbor(binding));
34
35	self.db.email_userid.raw_put(email_canon, user_id);
36}
37
38/// All third-party identifiers bound to `user_id`, lazily decoded from the
39/// `(user, email)` prefix scan.
40#[implement(super::Service)]
41#[tracing::instrument(
42	level = "debug",
43	skip(self),
44	fields(
45		%user_id,
46	),
47)]
48pub fn get_bindings<'a>(
49	&'a self,
50	user_id: &'a UserId,
51) -> impl Stream<Item = ThirdPartyIdentifier> + Send + 'a {
52	type KeyVal = ((Ignore, String), Cbor<Binding>);
53
54	self.db
55		.userid_email
56		.stream_prefix(&(user_id, Interfix))
57		.ignore_err()
58		.map(|((_, address), Cbor(binding)): KeyVal| {
59			ThirdPartyIdentifierInit {
60				address,
61				medium: binding.medium,
62				validated_at: binding.validated_at,
63				added_at: binding.added_at,
64			}
65			.into()
66		})
67}
68
69/// Remove a binding in both directions; blind-delete, tolerant of an absent
70/// row. The reverse lookup is removed only when it still maps to this user, so
71/// one user's delete cannot wipe another's reverse row.
72#[implement(super::Service)]
73#[tracing::instrument(
74	level = "debug",
75	skip(self),
76	fields(
77		%user_id,
78	),
79)]
80pub async fn del_binding(&self, user_id: &UserId, email_canon: &str) {
81	self.db.userid_email.del((user_id, email_canon));
82
83	if self
84		.user_id_for_email(email_canon)
85		.await
86		.ok()
87		.flatten()
88		.is_some_and(|bound| bound == user_id)
89	{
90		self.db.email_userid.remove(email_canon);
91	}
92}
93
94/// The user bound to a canonical email address, if any.
95#[implement(super::Service)]
96#[tracing::instrument(level = "debug", skip(self))]
97pub async fn user_id_for_email(&self, email_canon: &str) -> Result<Option<OwnedUserId>> {
98	self.db
99		.email_userid
100		.get(email_canon)
101		.await
102		.ok()
103		.map(|handle| handle.deserialized())
104		.transpose()
105}
106
107/// Whether a canonical email address is already bound to some user.
108#[implement(super::Service)]
109#[tracing::instrument(level = "debug", skip(self))]
110pub async fn address_in_use(&self, email_canon: &str) -> bool {
111	self.db
112		.email_userid
113		.get(email_canon)
114		.await
115		.is_ok()
116}