Skip to main content

tuwunel_api/server/
send.rs

1use std::{
2	collections::BTreeMap,
3	net::IpAddr,
4	sync::atomic::{AtomicBool, Ordering},
5	time::{Duration, Instant},
6};
7
8use axum::extract::State;
9use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
10use ruma::{
11	CanonicalJsonObject, CanonicalJsonValue, MilliSecondsSinceUnixEpoch, OwnedEventId,
12	OwnedRoomId, OwnedUserId, RoomId, ServerName, TransactionId, UserId,
13	api::{
14		error::ErrorKind,
15		federation::transactions::{
16			edu::{
17				DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent,
18				PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap, SigningKeyUpdateContent,
19				TypingContent,
20			},
21			send_transaction_message,
22		},
23	},
24	events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
25	int,
26	serde::Raw,
27	to_device::DeviceIdOrAllDevices,
28	uint,
29};
30use tuwunel_core::{
31	Err, Error, Result, debug,
32	debug::INFO_SPAN_LEVEL,
33	debug_warn, defer, err, error,
34	itertools::Itertools,
35	result::LogErr,
36	smallvec::SmallVec,
37	trace,
38	utils::{
39		debug::str_truncated,
40		future::TryExtExt,
41		millis_since_unix_epoch,
42		stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, automatic_width},
43	},
44	warn,
45};
46use tuwunel_service::{
47	Services,
48	rooms::state_res::{is_topologically_sorted_in_place, topological_sort},
49	sending::{EDU_LIMIT, PDU_LIMIT},
50};
51
52use crate::{ClientIp, Ruma};
53
54type ResolvedMap = BTreeMap<OwnedEventId, Result>;
55type RoomsPdus = SmallVec<[RoomPdus; 1]>;
56type RoomPdus = (OwnedRoomId, TxnPdus);
57type TxnPdus = SmallVec<[(usize, Pdu); 1]>;
58type Pdu = (OwnedRoomId, OwnedEventId, CanonicalJsonObject);
59
60/// # `PUT /_matrix/federation/v1/send/{txnId}`
61///
62/// Push EDUs and PDUs to this server.
63#[tracing::instrument(
64	name = "txn",
65	level = INFO_SPAN_LEVEL,
66	skip_all,
67	fields(
68		txn = str_truncated(body.transaction_id.as_str(), 20),
69		origin = body.origin().as_str(),
70		%client,
71	),
72)]
73pub(crate) async fn send_transaction_message_route(
74	State(services): State<crate::State>,
75	ClientIp(client): ClientIp,
76	body: Ruma<send_transaction_message::v1::Request>,
77) -> Result<send_transaction_message::v1::Response> {
78	if body.origin() != body.body.origin {
79		return Err!(Request(Forbidden(
80			"Not allowed to send transactions on behalf of other servers"
81		)));
82	}
83
84	if body.pdus.len() > PDU_LIMIT {
85		return Err!(Request(Forbidden(
86			"Not allowed to send more than {PDU_LIMIT} PDUs in one transaction"
87		)));
88	}
89
90	if body.edus.len() > EDU_LIMIT {
91		return Err!(Request(Forbidden(
92			"Not allowed to send more than {EDU_LIMIT} EDUs in one transaction"
93		)));
94	}
95
96	let txn_start_time = Instant::now();
97	trace!(
98		pdus = body.pdus.len(),
99		edus = body.edus.len(),
100		elapsed = ?txn_start_time.elapsed(),
101		"Starting txn",
102	);
103
104	let pdus = body
105		.pdus
106		.iter()
107		.stream()
108		.enumerate()
109		.broad_filter_map(|(i, pdu)| {
110			services
111				.event_handler
112				.parse_incoming_pdu(pdu)
113				.inspect_err(move |e| debug_warn!("Could not parse PDU[{i}]: {e}"))
114				.map_ok(move |pdu| (i, pdu))
115				.ok()
116		});
117
118	let edus = body
119		.edus
120		.iter()
121		.stream()
122		.enumerate()
123		.ready_filter_map(|(i, edu)| {
124			serde_json::from_str(edu.json().get())
125				.inspect_err(|e| debug_warn!("Could not parse EDU[{i}]: {e}"))
126				.map(|edu| (i, edu))
127				.ok()
128		});
129
130	let results = handle(
131		&services,
132		&client,
133		body.origin(),
134		&body.transaction_id,
135		txn_start_time,
136		pdus,
137		edus,
138	)
139	.await?;
140
141	debug!(
142		pdus = body.pdus.len(),
143		edus = body.edus.len(),
144		elapsed = ?txn_start_time.elapsed(),
145		"Finished txn",
146	);
147
148	for (id, result) in &results {
149		if let Err(e) = result
150			&& matches!(e, Error::BadRequest(ErrorKind::NotFound, _))
151		{
152			warn!("Incoming PDU failed {id}: {e:?}");
153		}
154	}
155
156	Ok(send_transaction_message::v1::Response {
157		pdus: results
158			.into_iter()
159			.map(|(e, r)| (e, r.map_err(error::sanitized_message)))
160			.collect(),
161	})
162}
163
164async fn handle(
165	services: &Services,
166	client: &IpAddr,
167	origin: &ServerName,
168	txn_id: &TransactionId,
169	started: Instant,
170	pdus: impl Stream<Item = (usize, Pdu)> + Send,
171	edus: impl Stream<Item = (usize, Edu)> + Send,
172) -> Result<ResolvedMap> {
173	let results = handle_pdus(services, client, origin, txn_id, started, pdus).await?;
174
175	handle_edus(services, client, origin, txn_id, edus).await?;
176
177	Ok(results)
178}
179
180async fn handle_pdus(
181	services: &Services,
182	client: &IpAddr,
183	origin: &ServerName,
184	txn_id: &TransactionId,
185	started: Instant,
186	pdus: impl Stream<Item = (usize, Pdu)> + Send,
187) -> Result<ResolvedMap> {
188	pdus.collect()
189		.map(Ok)
190		.map_ok(|pdus: TxnPdus| {
191			pdus.into_iter()
192				.sorted_by(|(_, (room_a, ..)), (_, (room_b, ..))| room_a.cmp(room_b))
193				.into_grouping_map_by(|(_, (room_id, ..))| room_id.clone())
194				.collect()
195				.into_iter()
196				.try_stream()
197		})
198		.try_flatten_stream()
199		.try_collect::<RoomsPdus>()
200		.map_ok(IntoIterator::into_iter)
201		.map_ok(IterStream::try_stream)
202		.try_flatten_stream()
203		.broad_and_then(async |(room_id, pdus)| {
204			handle_room(services, client, origin, txn_id, started, room_id, pdus)
205				.map_ok(ResolvedMap::into_iter)
206				.map_ok(IterStream::try_stream)
207				.await
208		})
209		.try_flatten()
210		.try_collect()
211		.await
212}
213
214#[tracing::instrument(
215	name = "room",
216	level = INFO_SPAN_LEVEL,
217	skip_all,
218	fields(%room_id)
219)]
220async fn handle_room(
221	services: &Services,
222	_client: &IpAddr,
223	origin: &ServerName,
224	txn_id: &TransactionId,
225	txn_start_time: Instant,
226	ref room_id: OwnedRoomId,
227	pdus: TxnPdus,
228) -> Result<ResolvedMap> {
229	let pdus = sort_pdus(pdus).await;
230
231	services
232		.event_handler
233		.mutex_federation
234		.lock(room_id)
235		.then(async |_lock| {
236			pdus.into_iter()
237				.enumerate()
238				.try_stream()
239				.and_then(async |pdu| {
240					services.server.check_running().map(|()| pdu) // interruption point
241				})
242				.and_then(|(ri, (ti, (room_id, event_id, value)))| {
243					let meta = (origin, txn_id, txn_start_time, ti);
244					let pdu = (ri, (room_id, event_id, value));
245					handle_pdu(services, meta, pdu).map(Ok)
246				})
247				.try_collect()
248				.await
249		})
250		.await
251}
252
253/// Reorder a room's transaction PDUs so each event follows the in-batch events
254/// it references. An already-ordered batch is returned unchanged; references to
255/// events outside the batch are non-edges. The sort is an optimization, so a
256/// failure falls back to the arrival order.
257async fn sort_pdus(mut pdus: TxnPdus) -> TxnPdus {
258	if already_sorted(&pdus) {
259		return pdus;
260	}
261
262	let event_ids: BTreeMap<&str, &OwnedEventId> = pdus
263		.iter()
264		.map(|(_, (_, event_id, _))| (event_id.as_str(), event_id))
265		.collect();
266
267	let graph = pdus
268		.iter()
269		.map(|(_, (_, event_id, value))| {
270			let references = prev_event_ids(value)
271				.filter_map(|prev| event_ids.get(prev).copied())
272				.map(ToOwned::to_owned)
273				.collect();
274
275			(event_id.clone(), references)
276		})
277		.collect();
278
279	// Causal order alone matters here, so the tie-break inputs are constant.
280	let query = async |_event_id: OwnedEventId| {
281		Ok((int!(0).into(), MilliSecondsSinceUnixEpoch(uint!(0))))
282	};
283
284	let Ok(order) = topological_sort(graph, &query).await else {
285		return pdus;
286	};
287
288	let position: BTreeMap<&str, usize> = order
289		.iter()
290		.enumerate()
291		.map(|(i, event_id)| (event_id.as_str(), i))
292		.collect();
293
294	pdus.sort_by_key(|(_, (_, event_id, _))| position.get(event_id.as_str()).copied());
295	pdus
296}
297
298/// Whether the batch is already in causal order, in which case the sort can be
299/// skipped.
300fn already_sorted(pdus: &[(usize, Pdu)]) -> bool {
301	is_topologically_sorted_in_place(
302		pdus,
303		|(_, (_, id, _))| id.as_str(),
304		|(_, (_, _, value))| prev_event_ids(value),
305	)
306}
307
308/// The `prev_events` of a PDU held as canonical JSON.
309fn prev_event_ids(value: &CanonicalJsonObject) -> impl Iterator<Item = &str> + '_ {
310	value
311		.get("prev_events")
312		.and_then(CanonicalJsonValue::as_array)
313		.into_iter()
314		.flatten()
315		.filter_map(CanonicalJsonValue::as_str)
316}
317
318#[tracing::instrument(
319	name = "pdu",
320	level = INFO_SPAN_LEVEL,
321	skip_all,
322	fields(%event_id, %ti, %ri)
323)]
324async fn handle_pdu(
325	services: &Services,
326	(origin, txn_id, txn_start_time, ti): (&ServerName, &TransactionId, Instant, usize),
327	(ri, (ref room_id, event_id, value)): (usize, Pdu),
328) -> (OwnedEventId, Result) {
329	let pdu_start_time = Instant::now();
330	let completed: AtomicBool = Default::default();
331	defer! {{
332		if completed.load(Ordering::Acquire) {
333			return;
334		}
335
336		if pdu_start_time.elapsed() >= Duration::from_secs(services.config.client_request_timeout) {
337			error!(
338				%origin, %txn_id, %room_id, %event_id, %ri, %ti,
339				elapsed = ?pdu_start_time.elapsed(),
340				"Incoming transaction processing timed out.",
341			);
342		} else {
343			debug_warn!(
344				%origin, %txn_id, %room_id, %event_id, %ri, %ti,
345				elapsed = ?pdu_start_time.elapsed(),
346				"Incoming transaction processing interrupted.",
347			);
348		}
349	}}
350
351	let result = services
352		.event_handler
353		.handle_incoming_pdu(origin, room_id, &event_id, value, true)
354		.map_ok(|_| ())
355		.boxed()
356		.await;
357
358	completed.store(true, Ordering::Release);
359	debug!(
360		%event_id, ri, ti,
361		pdu_elapsed = ?pdu_start_time.elapsed(),
362		txn_elapsed = ?txn_start_time.elapsed(),
363		"Finished PDU",
364	);
365
366	(event_id.clone(), result)
367}
368
369#[tracing::instrument(name = "edus", level = "debug", skip_all)]
370async fn handle_edus(
371	services: &Services,
372	client: &IpAddr,
373	origin: &ServerName,
374	txn_id: &TransactionId,
375	edus: impl Stream<Item = (usize, Edu)> + Send,
376) -> Result {
377	edus.for_each_concurrent(automatic_width(), |(i, edu)| {
378		handle_edu(services, client, origin, txn_id, i, edu)
379	})
380	.await;
381
382	Ok(())
383}
384
385#[tracing::instrument(
386	name = "edu",
387	level = "debug",
388	skip_all,
389	fields(%i),
390)]
391async fn handle_edu(
392	services: &Services,
393	client: &IpAddr,
394	origin: &ServerName,
395	_txn_id: &TransactionId,
396	i: usize,
397	edu: Edu,
398) {
399	match edu {
400		| Edu::Presence(presence) if services.server.config.allow_incoming_presence =>
401			handle_edu_presence(services, client, origin, presence).await,
402
403		| Edu::Receipt(receipt)
404			if services
405				.server
406				.config
407				.allow_incoming_read_receipts =>
408			handle_edu_receipt(services, client, origin, receipt).await,
409
410		| Edu::Typing(typing) if services.server.config.allow_incoming_typing =>
411			handle_edu_typing(services, client, origin, typing).await,
412
413		| Edu::DeviceListUpdate(content) =>
414			handle_edu_device_list_update(services, client, origin, content).await,
415
416		| Edu::DirectToDevice(content) =>
417			handle_edu_direct_to_device(services, client, origin, content).await,
418
419		| Edu::SigningKeyUpdate(content) =>
420			handle_edu_signing_key_update(services, client, origin, content).await,
421
422		| Edu::_Custom(ref _custom) => debug_warn!(?i, ?edu, "received custom/unknown EDU"),
423
424		| _ => trace!(?i, ?edu, "skipped"),
425	}
426}
427
428async fn handle_edu_presence(
429	services: &Services,
430	_client: &IpAddr,
431	origin: &ServerName,
432	presence: PresenceContent,
433) {
434	presence
435		.push
436		.into_iter()
437		.stream()
438		.for_each_concurrent(automatic_width(), |update| {
439			handle_edu_presence_update(services, origin, update)
440		})
441		.await;
442}
443
444async fn handle_edu_presence_update(
445	services: &Services,
446	origin: &ServerName,
447	update: PresenceUpdate,
448) {
449	if update.user_id.server_name() != origin {
450		debug_warn!(
451			%update.user_id, %origin,
452			"received presence EDU for user not belonging to origin"
453		);
454		return;
455	}
456
457	services
458		.presence
459		.set_presence_from_federation(
460			&update.user_id,
461			&update.presence,
462			update.currently_active,
463			update.last_active_ago,
464			update.status_msg.clone(),
465		)
466		.await
467		.log_err()
468		.ok();
469}
470
471async fn handle_edu_receipt(
472	services: &Services,
473	_client: &IpAddr,
474	origin: &ServerName,
475	receipt: ReceiptContent,
476) {
477	receipt
478		.receipts
479		.into_iter()
480		.stream()
481		.for_each_concurrent(automatic_width(), |(room_id, room_updates)| {
482			handle_edu_receipt_room(services, origin, room_id, room_updates)
483		})
484		.await;
485}
486
487async fn handle_edu_receipt_room(
488	services: &Services,
489	origin: &ServerName,
490	room_id: OwnedRoomId,
491	room_updates: ReceiptMap,
492) {
493	if services
494		.event_handler
495		.acl_check(origin, &room_id)
496		.await
497		.is_err()
498	{
499		debug_warn!(
500			%origin, %room_id,
501			"received read receipt EDU from ACL'd server"
502		);
503		return;
504	}
505
506	let room_id = &room_id;
507	room_updates
508		.read
509		.into_iter()
510		.stream()
511		.for_each_concurrent(automatic_width(), async |(user_id, user_updates)| {
512			handle_edu_receipt_room_user(services, origin, room_id, &user_id, user_updates).await;
513		})
514		.await;
515}
516
517async fn handle_edu_receipt_room_user(
518	services: &Services,
519	origin: &ServerName,
520	room_id: &RoomId,
521	user_id: &UserId,
522	user_updates: ReceiptData,
523) {
524	if user_id.server_name() != origin {
525		debug_warn!(
526			%user_id, %origin,
527			"received read receipt EDU for user not belonging to origin"
528		);
529		return;
530	}
531
532	if !services
533		.state_cache
534		.server_in_room(origin, room_id)
535		.await
536	{
537		debug_warn!(
538			%user_id, %room_id, %origin,
539			"received read receipt EDU from server who does not have a member in the room",
540		);
541		return;
542	}
543
544	let data = &user_updates.data;
545	user_updates
546		.event_ids
547		.into_iter()
548		.stream()
549		.for_each_concurrent(automatic_width(), async |event_id| {
550			let user_data = [(user_id.to_owned(), data.clone())];
551			let receipts = [(ReceiptType::Read, BTreeMap::from(user_data))];
552			let content = [(event_id.clone(), BTreeMap::from(receipts))];
553			services
554				.read_receipt
555				.readreceipt_update(user_id, room_id, &ReceiptEvent {
556					content: ReceiptEventContent(content.into()),
557					room_id: room_id.to_owned(),
558				})
559				.await;
560		})
561		.await;
562}
563
564async fn handle_edu_typing(
565	services: &Services,
566	_client: &IpAddr,
567	origin: &ServerName,
568	typing: TypingContent,
569) {
570	if typing.user_id.server_name() != origin {
571		debug_warn!(
572			%typing.user_id, %origin,
573			"received typing EDU for user not belonging to origin"
574		);
575		return;
576	}
577
578	if services
579		.event_handler
580		.acl_check(typing.user_id.server_name(), &typing.room_id)
581		.await
582		.is_err()
583	{
584		debug_warn!(
585			%typing.user_id, %typing.room_id, %origin,
586			"received typing EDU for ACL'd user's server"
587		);
588		return;
589	}
590
591	if !services
592		.state_cache
593		.is_joined(&typing.user_id, &typing.room_id)
594		.await
595	{
596		debug_warn!(
597			%typing.user_id, %typing.room_id, %origin,
598			"received typing EDU for user not in room"
599		);
600		return;
601	}
602
603	if typing.typing {
604		let secs = services.server.config.typing_federation_timeout_s;
605		let timeout = millis_since_unix_epoch().saturating_add(secs.saturating_mul(1000));
606
607		services
608			.typing
609			.typing_add(&typing.user_id, &typing.room_id, timeout)
610			.await
611			.log_err()
612			.ok();
613	} else {
614		services
615			.typing
616			.typing_remove(&typing.user_id, &typing.room_id)
617			.await
618			.log_err()
619			.ok();
620	}
621}
622
623async fn handle_edu_device_list_update(
624	services: &Services,
625	_client: &IpAddr,
626	origin: &ServerName,
627	content: DeviceListUpdateContent,
628) {
629	let DeviceListUpdateContent { user_id, .. } = content;
630
631	if user_id.server_name() != origin {
632		debug_warn!(
633			%user_id, %origin,
634			"received device list update EDU for user not belonging to origin"
635		);
636		return;
637	}
638
639	services
640		.users
641		.mark_device_key_update(&user_id)
642		.await;
643}
644
645async fn handle_edu_direct_to_device(
646	services: &Services,
647	_client: &IpAddr,
648	origin: &ServerName,
649	content: DirectDeviceContent,
650) {
651	let DirectDeviceContent {
652		ref sender,
653		ref ev_type,
654		ref message_id,
655		messages,
656	} = content;
657
658	if sender.server_name() != origin {
659		debug_warn!(
660			%sender, %origin,
661			"received direct to device EDU for user not belonging to origin"
662		);
663		return;
664	}
665
666	// Check if this is a new transaction id
667	if services
668		.transaction_ids
669		.existing_txnid(sender, None, message_id)
670		.await
671		.is_ok()
672	{
673		return;
674	}
675
676	// process messages concurrently for different users
677	let ev_type = ev_type.to_string();
678	messages
679		.into_iter()
680		.stream()
681		.for_each_concurrent(automatic_width(), |(target_user_id, map)| {
682			handle_edu_direct_to_device_user(services, target_user_id, sender, &ev_type, map)
683		})
684		.await;
685
686	// Save transaction id with empty data
687	services
688		.transaction_ids
689		.add_txnid(sender, None, message_id, &[]);
690}
691
692async fn handle_edu_direct_to_device_user<Event: Send + Sync>(
693	services: &Services,
694	target_user_id: OwnedUserId,
695	sender: &UserId,
696	ev_type: &str,
697	map: BTreeMap<DeviceIdOrAllDevices, Raw<Event>>,
698) {
699	map.into_iter()
700		.stream()
701		.ready_filter_map(|(tid, raw)| {
702			raw.deserialize_as()
703				.map_err(|e| {
704					err!(Request(InvalidParam(error!("To-Device event is invalid: {e}"))))
705				})
706				.ok()
707				.map(|ev| (tid, ev))
708		})
709		.for_each_concurrent(automatic_width(), |(tid, ev)| {
710			handle_edu_direct_to_device_event(services, &target_user_id, sender, tid, ev_type, ev)
711		})
712		.await;
713}
714
715async fn handle_edu_direct_to_device_event(
716	services: &Services,
717	target_user_id: &UserId,
718	sender: &UserId,
719	target_device_id_maybe: DeviceIdOrAllDevices,
720	ev_type: &str,
721	event: serde_json::Value,
722) {
723	match target_device_id_maybe {
724		| DeviceIdOrAllDevices::DeviceId(ref target_device_id) => {
725			services.users.add_to_device_event(
726				sender,
727				target_user_id,
728				target_device_id,
729				ev_type,
730				&event,
731			);
732		},
733
734		| DeviceIdOrAllDevices::AllDevices => {
735			services
736				.users
737				.all_device_ids(target_user_id)
738				.ready_for_each(|target_device_id| {
739					services.users.add_to_device_event(
740						sender,
741						target_user_id,
742						target_device_id,
743						ev_type,
744						&event,
745					);
746				})
747				.await;
748		},
749	}
750}
751
752async fn handle_edu_signing_key_update(
753	services: &Services,
754	_client: &IpAddr,
755	origin: &ServerName,
756	content: SigningKeyUpdateContent,
757) {
758	let SigningKeyUpdateContent { user_id, master_key, self_signing_key } = content;
759
760	if user_id.server_name() != origin {
761		debug_warn!(
762			%user_id, %origin,
763			"received signing key update EDU from server that does not belong to user's server"
764		);
765		return;
766	}
767
768	services
769		.users
770		.add_cross_signing_keys(&user_id, &master_key, &self_signing_key, &None, true)
771		.await
772		.log_err()
773		.ok();
774}
775
776#[cfg(test)]
777mod tests {
778	use ruma::{CanonicalJsonObject, OwnedEventId, event_id, room_id};
779	use serde_json::json;
780
781	use super::{Pdu, TxnPdus, already_sorted, prev_event_ids, sort_pdus};
782
783	fn pdu(index: usize, id: &OwnedEventId, prev: &[&OwnedEventId]) -> (usize, Pdu) {
784		let prev_events: Vec<&str> = prev.iter().map(|e| e.as_str()).collect();
785		let value: CanonicalJsonObject =
786			serde_json::from_value(json!({ "prev_events": prev_events }))
787				.expect("valid canonical json");
788
789		(index, (room_id!("!r:example.com").to_owned(), id.clone(), value))
790	}
791
792	fn ids() -> (OwnedEventId, OwnedEventId, OwnedEventId) {
793		(
794			event_id!("$a:example.com").to_owned(),
795			event_id!("$b:example.com").to_owned(),
796			event_id!("$c:example.com").to_owned(),
797		)
798	}
799
800	fn order(pdus: &[(usize, Pdu)]) -> Vec<&str> {
801		pdus.iter()
802			.map(|(_, (_, id, _))| id.as_str())
803			.collect()
804	}
805
806	#[test]
807	fn sorted_when_parents_lead() {
808		let (a, b, c) = ids();
809		let pdus = [pdu(0, &a, &[]), pdu(1, &b, &[&a]), pdu(2, &c, &[&b])];
810
811		assert!(already_sorted(&pdus));
812	}
813
814	#[test]
815	fn unsorted_when_child_leads() {
816		let (a, b, _c) = ids();
817		let pdus = [pdu(0, &b, &[&a]), pdu(1, &a, &[])];
818
819		assert!(!already_sorted(&pdus));
820	}
821
822	#[test]
823	fn sorted_ignores_out_of_batch_references() {
824		let (a, b, c) = ids();
825		let pdus = [pdu(0, &b, &[&c]), pdu(1, &a, &[&c])];
826
827		assert!(already_sorted(&pdus));
828	}
829
830	#[tokio::test]
831	async fn sort_orders_parents_before_children() {
832		let (a, b, c) = ids();
833		let pdus: TxnPdus = [pdu(0, &c, &[&b]), pdu(1, &b, &[&a]), pdu(2, &a, &[])]
834			.into_iter()
835			.collect();
836
837		let sorted = sort_pdus(pdus).await;
838
839		assert_eq!(order(&sorted), ["$a:example.com", "$b:example.com", "$c:example.com"]);
840	}
841
842	#[tokio::test]
843	async fn sort_is_noop_when_already_ordered() {
844		let (a, b, c) = ids();
845		let pdus: TxnPdus = [pdu(0, &a, &[]), pdu(1, &b, &[&a]), pdu(2, &c, &[&b])]
846			.into_iter()
847			.collect();
848
849		let sorted = sort_pdus(pdus.clone()).await;
850
851		assert_eq!(order(&sorted), order(&pdus));
852	}
853
854	#[tokio::test]
855	async fn sort_preserves_duplicates() {
856		let (a, b, _c) = ids();
857		let pdus: TxnPdus = [pdu(0, &b, &[&a]), pdu(1, &a, &[]), pdu(2, &b, &[&a])]
858			.into_iter()
859			.collect();
860
861		let sorted = sort_pdus(pdus).await;
862
863		assert_eq!(sorted.len(), 3);
864	}
865
866	#[tokio::test]
867	async fn sort_preserves_a_cycle() {
868		let (a, b, _c) = ids();
869		let pdus: TxnPdus = [pdu(0, &a, &[&b]), pdu(1, &b, &[&a])]
870			.into_iter()
871			.collect();
872
873		let sorted = sort_pdus(pdus).await;
874
875		assert_eq!(sorted.len(), 2);
876	}
877
878	#[test]
879	fn prev_event_ids_reads_the_array() {
880		let (a, b, _c) = ids();
881		let (_, (_, _, value)) = pdu(0, &a, &[&b]);
882
883		let prev: Vec<&str> = prev_event_ids(&value).collect();
884
885		assert_eq!(prev, ["$b:example.com"]);
886	}
887
888	#[test]
889	fn prev_event_ids_empty_when_absent() {
890		let value = CanonicalJsonObject::new();
891
892		assert_eq!(prev_event_ids(&value).count(), 0);
893	}
894}