Skip to main content

tuwunel_service/rooms/lazy_loading/
mod.rs

1//! Lazy Loading
2
3use std::{collections::HashSet, sync::Arc};
4
5use futures::{Stream, StreamExt, pin_mut};
6use ruma::{DeviceId, OwnedUserId, RoomId, UserId, api::client::filter::LazyLoadOptions};
7use tuwunel_core::{
8	Result, implement,
9	utils::{IterStream, ReadyExt, stream::TryIgnore},
10};
11use tuwunel_database::{Database, Deserialized, Handle, Interfix, Map, Qry};
12
13pub struct Service {
14	db: Data,
15}
16
17struct Data {
18	lazyloadedids: Arc<Map>,
19	db: Arc<Database>,
20}
21
22pub trait Options: Send + Sync {
23	fn is_enabled(&self) -> bool;
24	fn include_redundant_members(&self) -> bool;
25}
26
27#[derive(Clone, Debug)]
28pub struct Context<'a> {
29	pub user_id: &'a UserId,
30	pub device_id: Option<&'a DeviceId>,
31	pub room_id: &'a RoomId,
32	pub token: Option<u64>,
33	pub options: Option<&'a LazyLoadOptions>,
34	pub mode: Mode,
35}
36
37#[derive(Clone, Copy, Debug, Eq, PartialEq)]
38pub enum Mode {
39	Read,
40	Update,
41	Prefetch,
42}
43
44#[derive(Clone, Copy, Debug, Eq, PartialEq)]
45pub enum Status {
46	Unseen,
47	Seen(u64),
48}
49
50pub type Witness = HashSet<OwnedUserId>;
51type Key<'a> = (&'a UserId, Option<&'a DeviceId>, &'a RoomId, &'a UserId);
52
53impl crate::Service for Service {
54	fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
55		Ok(Arc::new(Self {
56			db: Data {
57				lazyloadedids: args.db["lazyloadedids"].clone(),
58				db: args.db.clone(),
59			},
60		}))
61	}
62
63	fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
64}
65
66#[implement(Service)]
67#[tracing::instrument(skip(self), level = "debug")]
68pub async fn reset(&self, ctx: &Context<'_>) {
69	let prefix = (ctx.user_id, ctx.device_id, ctx.room_id, Interfix);
70	self.db
71		.lazyloadedids
72		.keys_prefix_raw(&prefix)
73		.ignore_err()
74		.ready_for_each(|key| self.db.lazyloadedids.remove(key))
75		.await;
76}
77
78#[implement(Service)]
79#[tracing::instrument(name = "retain", level = "debug", skip_all)]
80pub async fn witness_retain(&self, senders: Witness, ctx: &Context<'_>) -> Witness {
81	debug_assert!(
82		ctx.options.is_none_or(Options::is_enabled),
83		"lazy loading should be enabled by your options"
84	);
85
86	let include_redundant = cfg!(feature = "element_hacks")
87		|| ctx
88			.options
89			.is_some_and(Options::include_redundant_members);
90
91	let witness = self
92		.witness(ctx, senders.iter().map(AsRef::as_ref))
93		.zip(senders.iter().stream());
94
95	pin_mut!(witness);
96	let _cork = self.db.db.cork();
97	let mut senders = Witness::with_capacity(senders.len());
98	while let Some((status, sender)) = witness.next().await {
99		if ctx.mode == Mode::Prefetch {
100			continue;
101		}
102
103		if include_redundant || status == Status::Unseen {
104			senders.insert(sender.into());
105			continue;
106		}
107
108		if let Status::Seen(seen) = status
109			&& (seen == 0 || ctx.token == Some(seen))
110		{
111			senders.insert(sender.into());
112			continue;
113		}
114	}
115
116	senders
117}
118
119#[implement(Service)]
120fn witness<'a, I>(
121	&'a self,
122	ctx: &'a Context<'a>,
123	senders: I,
124) -> impl Stream<Item = Status> + Send + 'a
125where
126	I: Iterator<Item = &'a UserId> + Send + Clone + 'a,
127{
128	senders
129		.clone()
130		.stream()
131		.map(|sender| make_key(ctx, sender))
132		.qry(&self.db.lazyloadedids)
133		.map(into_status)
134		.zip(senders.stream())
135		.map(move |(status, sender)| {
136			if matches!(ctx.mode, Mode::Update) {
137				self.update(ctx, &status, sender);
138			}
139
140			status
141		})
142}
143
144#[implement(Service)]
145fn update(&self, ctx: &Context<'_>, status: &Status, sender: &UserId) {
146	if matches!(status, Status::Unseen) {
147		self.db
148			.lazyloadedids
149			.put_aput::<8, _, _>(make_key(ctx, sender), 0_u64);
150	} else if matches!(status, Status::Seen(0)) {
151		self.db
152			.lazyloadedids
153			.put_aput::<8, _, _>(make_key(ctx, sender), ctx.token.unwrap_or(0_u64));
154	}
155}
156
157fn into_status(result: Result<Handle<'_>>) -> Status {
158	match result.and_then(|handle| handle.deserialized()) {
159		| Ok(seen) => Status::Seen(seen),
160		| Err(_) => Status::Unseen,
161	}
162}
163
164fn make_key<'a>(ctx: &'a Context<'a>, sender: &'a UserId) -> Key<'a> {
165	(ctx.user_id, ctx.device_id, ctx.room_id, sender)
166}
167
168impl Options for LazyLoadOptions {
169	fn include_redundant_members(&self) -> bool {
170		if let Self::Enabled { include_redundant_members } = self {
171			*include_redundant_members
172		} else {
173			false
174		}
175	}
176
177	fn is_enabled(&self) -> bool { !self.is_disabled() }
178}