1use std::{
2 collections::BTreeMap,
3 net::IpAddr,
4 sync::atomic::{AtomicBool, Ordering},
5 time::{Duration, Instant},
6};
7
8use axum::extract::State;
9use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
10use ruma::{
11 CanonicalJsonObject, CanonicalJsonValue, MilliSecondsSinceUnixEpoch, OwnedEventId,
12 OwnedRoomId, OwnedUserId, RoomId, ServerName, TransactionId, UserId,
13 api::{
14 error::ErrorKind,
15 federation::transactions::{
16 edu::{
17 DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent,
18 PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap, SigningKeyUpdateContent,
19 TypingContent,
20 },
21 send_transaction_message,
22 },
23 },
24 events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
25 int,
26 serde::Raw,
27 to_device::DeviceIdOrAllDevices,
28 uint,
29};
30use tuwunel_core::{
31 Err, Error, Result, debug,
32 debug::INFO_SPAN_LEVEL,
33 debug_warn, defer, err, error,
34 itertools::Itertools,
35 result::LogErr,
36 smallvec::SmallVec,
37 trace,
38 utils::{
39 debug::str_truncated,
40 future::TryExtExt,
41 millis_since_unix_epoch,
42 stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, automatic_width},
43 },
44 warn,
45};
46use tuwunel_service::{
47 Services,
48 rooms::state_res::{is_topologically_sorted_in_place, topological_sort},
49 sending::{EDU_LIMIT, PDU_LIMIT},
50};
51
52use crate::{ClientIp, Ruma};
53
54type ResolvedMap = BTreeMap<OwnedEventId, Result>;
55type RoomsPdus = SmallVec<[RoomPdus; 1]>;
56type RoomPdus = (OwnedRoomId, TxnPdus);
57type TxnPdus = SmallVec<[(usize, Pdu); 1]>;
58type Pdu = (OwnedRoomId, OwnedEventId, CanonicalJsonObject);
59
60#[tracing::instrument(
64 name = "txn",
65 level = INFO_SPAN_LEVEL,
66 skip_all,
67 fields(
68 txn = str_truncated(body.transaction_id.as_str(), 20),
69 origin = body.origin().as_str(),
70 %client,
71 ),
72)]
73pub(crate) async fn send_transaction_message_route(
74 State(services): State<crate::State>,
75 ClientIp(client): ClientIp,
76 body: Ruma<send_transaction_message::v1::Request>,
77) -> Result<send_transaction_message::v1::Response> {
78 if body.origin() != body.body.origin {
79 return Err!(Request(Forbidden(
80 "Not allowed to send transactions on behalf of other servers"
81 )));
82 }
83
84 if body.pdus.len() > PDU_LIMIT {
85 return Err!(Request(Forbidden(
86 "Not allowed to send more than {PDU_LIMIT} PDUs in one transaction"
87 )));
88 }
89
90 if body.edus.len() > EDU_LIMIT {
91 return Err!(Request(Forbidden(
92 "Not allowed to send more than {EDU_LIMIT} EDUs in one transaction"
93 )));
94 }
95
96 let txn_start_time = Instant::now();
97 trace!(
98 pdus = body.pdus.len(),
99 edus = body.edus.len(),
100 elapsed = ?txn_start_time.elapsed(),
101 "Starting txn",
102 );
103
104 let pdus = body
105 .pdus
106 .iter()
107 .stream()
108 .enumerate()
109 .broad_filter_map(|(i, pdu)| {
110 services
111 .event_handler
112 .parse_incoming_pdu(pdu)
113 .inspect_err(move |e| debug_warn!("Could not parse PDU[{i}]: {e}"))
114 .map_ok(move |pdu| (i, pdu))
115 .ok()
116 });
117
118 let edus = body
119 .edus
120 .iter()
121 .stream()
122 .enumerate()
123 .ready_filter_map(|(i, edu)| {
124 serde_json::from_str(edu.json().get())
125 .inspect_err(|e| debug_warn!("Could not parse EDU[{i}]: {e}"))
126 .map(|edu| (i, edu))
127 .ok()
128 });
129
130 let results = handle(
131 &services,
132 &client,
133 body.origin(),
134 &body.transaction_id,
135 txn_start_time,
136 pdus,
137 edus,
138 )
139 .await?;
140
141 debug!(
142 pdus = body.pdus.len(),
143 edus = body.edus.len(),
144 elapsed = ?txn_start_time.elapsed(),
145 "Finished txn",
146 );
147
148 for (id, result) in &results {
149 if let Err(e) = result
150 && matches!(e, Error::BadRequest(ErrorKind::NotFound, _))
151 {
152 warn!("Incoming PDU failed {id}: {e:?}");
153 }
154 }
155
156 Ok(send_transaction_message::v1::Response {
157 pdus: results
158 .into_iter()
159 .map(|(e, r)| (e, r.map_err(error::sanitized_message)))
160 .collect(),
161 })
162}
163
164async fn handle(
165 services: &Services,
166 client: &IpAddr,
167 origin: &ServerName,
168 txn_id: &TransactionId,
169 started: Instant,
170 pdus: impl Stream<Item = (usize, Pdu)> + Send,
171 edus: impl Stream<Item = (usize, Edu)> + Send,
172) -> Result<ResolvedMap> {
173 let results = handle_pdus(services, client, origin, txn_id, started, pdus).await?;
174
175 handle_edus(services, client, origin, txn_id, edus).await?;
176
177 Ok(results)
178}
179
180async fn handle_pdus(
181 services: &Services,
182 client: &IpAddr,
183 origin: &ServerName,
184 txn_id: &TransactionId,
185 started: Instant,
186 pdus: impl Stream<Item = (usize, Pdu)> + Send,
187) -> Result<ResolvedMap> {
188 pdus.collect()
189 .map(Ok)
190 .map_ok(|pdus: TxnPdus| {
191 pdus.into_iter()
192 .sorted_by(|(_, (room_a, ..)), (_, (room_b, ..))| room_a.cmp(room_b))
193 .into_grouping_map_by(|(_, (room_id, ..))| room_id.clone())
194 .collect()
195 .into_iter()
196 .try_stream()
197 })
198 .try_flatten_stream()
199 .try_collect::<RoomsPdus>()
200 .map_ok(IntoIterator::into_iter)
201 .map_ok(IterStream::try_stream)
202 .try_flatten_stream()
203 .broad_and_then(async |(room_id, pdus)| {
204 handle_room(services, client, origin, txn_id, started, room_id, pdus)
205 .map_ok(ResolvedMap::into_iter)
206 .map_ok(IterStream::try_stream)
207 .await
208 })
209 .try_flatten()
210 .try_collect()
211 .await
212}
213
214#[tracing::instrument(
215 name = "room",
216 level = INFO_SPAN_LEVEL,
217 skip_all,
218 fields(%room_id)
219)]
220async fn handle_room(
221 services: &Services,
222 _client: &IpAddr,
223 origin: &ServerName,
224 txn_id: &TransactionId,
225 txn_start_time: Instant,
226 ref room_id: OwnedRoomId,
227 pdus: TxnPdus,
228) -> Result<ResolvedMap> {
229 let pdus = sort_pdus(pdus).await;
230
231 services
232 .event_handler
233 .mutex_federation
234 .lock(room_id)
235 .then(async |_lock| {
236 pdus.into_iter()
237 .enumerate()
238 .try_stream()
239 .and_then(async |pdu| {
240 services.server.check_running().map(|()| pdu) })
242 .and_then(|(ri, (ti, (room_id, event_id, value)))| {
243 let meta = (origin, txn_id, txn_start_time, ti);
244 let pdu = (ri, (room_id, event_id, value));
245 handle_pdu(services, meta, pdu).map(Ok)
246 })
247 .try_collect()
248 .await
249 })
250 .await
251}
252
253async fn sort_pdus(mut pdus: TxnPdus) -> TxnPdus {
258 if already_sorted(&pdus) {
259 return pdus;
260 }
261
262 let event_ids: BTreeMap<&str, &OwnedEventId> = pdus
263 .iter()
264 .map(|(_, (_, event_id, _))| (event_id.as_str(), event_id))
265 .collect();
266
267 let graph = pdus
268 .iter()
269 .map(|(_, (_, event_id, value))| {
270 let references = prev_event_ids(value)
271 .filter_map(|prev| event_ids.get(prev).copied())
272 .map(ToOwned::to_owned)
273 .collect();
274
275 (event_id.clone(), references)
276 })
277 .collect();
278
279 let query = async |_event_id: OwnedEventId| {
281 Ok((int!(0).into(), MilliSecondsSinceUnixEpoch(uint!(0))))
282 };
283
284 let Ok(order) = topological_sort(graph, &query).await else {
285 return pdus;
286 };
287
288 let position: BTreeMap<&str, usize> = order
289 .iter()
290 .enumerate()
291 .map(|(i, event_id)| (event_id.as_str(), i))
292 .collect();
293
294 pdus.sort_by_key(|(_, (_, event_id, _))| position.get(event_id.as_str()).copied());
295 pdus
296}
297
298fn already_sorted(pdus: &[(usize, Pdu)]) -> bool {
301 is_topologically_sorted_in_place(
302 pdus,
303 |(_, (_, id, _))| id.as_str(),
304 |(_, (_, _, value))| prev_event_ids(value),
305 )
306}
307
308fn prev_event_ids(value: &CanonicalJsonObject) -> impl Iterator<Item = &str> + '_ {
310 value
311 .get("prev_events")
312 .and_then(CanonicalJsonValue::as_array)
313 .into_iter()
314 .flatten()
315 .filter_map(CanonicalJsonValue::as_str)
316}
317
318#[tracing::instrument(
319 name = "pdu",
320 level = INFO_SPAN_LEVEL,
321 skip_all,
322 fields(%event_id, %ti, %ri)
323)]
324async fn handle_pdu(
325 services: &Services,
326 (origin, txn_id, txn_start_time, ti): (&ServerName, &TransactionId, Instant, usize),
327 (ri, (ref room_id, event_id, value)): (usize, Pdu),
328) -> (OwnedEventId, Result) {
329 let pdu_start_time = Instant::now();
330 let completed: AtomicBool = Default::default();
331 defer! {{
332 if completed.load(Ordering::Acquire) {
333 return;
334 }
335
336 if pdu_start_time.elapsed() >= Duration::from_secs(services.config.client_request_timeout) {
337 error!(
338 %origin, %txn_id, %room_id, %event_id, %ri, %ti,
339 elapsed = ?pdu_start_time.elapsed(),
340 "Incoming transaction processing timed out.",
341 );
342 } else {
343 debug_warn!(
344 %origin, %txn_id, %room_id, %event_id, %ri, %ti,
345 elapsed = ?pdu_start_time.elapsed(),
346 "Incoming transaction processing interrupted.",
347 );
348 }
349 }}
350
351 let result = services
352 .event_handler
353 .handle_incoming_pdu(origin, room_id, &event_id, value, true)
354 .map_ok(|_| ())
355 .boxed()
356 .await;
357
358 completed.store(true, Ordering::Release);
359 debug!(
360 %event_id, ri, ti,
361 pdu_elapsed = ?pdu_start_time.elapsed(),
362 txn_elapsed = ?txn_start_time.elapsed(),
363 "Finished PDU",
364 );
365
366 (event_id.clone(), result)
367}
368
369#[tracing::instrument(name = "edus", level = "debug", skip_all)]
370async fn handle_edus(
371 services: &Services,
372 client: &IpAddr,
373 origin: &ServerName,
374 txn_id: &TransactionId,
375 edus: impl Stream<Item = (usize, Edu)> + Send,
376) -> Result {
377 edus.for_each_concurrent(automatic_width(), |(i, edu)| {
378 handle_edu(services, client, origin, txn_id, i, edu)
379 })
380 .await;
381
382 Ok(())
383}
384
385#[tracing::instrument(
386 name = "edu",
387 level = "debug",
388 skip_all,
389 fields(%i),
390)]
391async fn handle_edu(
392 services: &Services,
393 client: &IpAddr,
394 origin: &ServerName,
395 _txn_id: &TransactionId,
396 i: usize,
397 edu: Edu,
398) {
399 match edu {
400 | Edu::Presence(presence) if services.server.config.allow_incoming_presence =>
401 handle_edu_presence(services, client, origin, presence).await,
402
403 | Edu::Receipt(receipt)
404 if services
405 .server
406 .config
407 .allow_incoming_read_receipts =>
408 handle_edu_receipt(services, client, origin, receipt).await,
409
410 | Edu::Typing(typing) if services.server.config.allow_incoming_typing =>
411 handle_edu_typing(services, client, origin, typing).await,
412
413 | Edu::DeviceListUpdate(content) =>
414 handle_edu_device_list_update(services, client, origin, content).await,
415
416 | Edu::DirectToDevice(content) =>
417 handle_edu_direct_to_device(services, client, origin, content).await,
418
419 | Edu::SigningKeyUpdate(content) =>
420 handle_edu_signing_key_update(services, client, origin, content).await,
421
422 | Edu::_Custom(ref _custom) => debug_warn!(?i, ?edu, "received custom/unknown EDU"),
423
424 | _ => trace!(?i, ?edu, "skipped"),
425 }
426}
427
428async fn handle_edu_presence(
429 services: &Services,
430 _client: &IpAddr,
431 origin: &ServerName,
432 presence: PresenceContent,
433) {
434 presence
435 .push
436 .into_iter()
437 .stream()
438 .for_each_concurrent(automatic_width(), |update| {
439 handle_edu_presence_update(services, origin, update)
440 })
441 .await;
442}
443
444async fn handle_edu_presence_update(
445 services: &Services,
446 origin: &ServerName,
447 update: PresenceUpdate,
448) {
449 if update.user_id.server_name() != origin {
450 debug_warn!(
451 %update.user_id, %origin,
452 "received presence EDU for user not belonging to origin"
453 );
454 return;
455 }
456
457 services
458 .presence
459 .set_presence_from_federation(
460 &update.user_id,
461 &update.presence,
462 update.currently_active,
463 update.last_active_ago,
464 update.status_msg.clone(),
465 )
466 .await
467 .log_err()
468 .ok();
469}
470
471async fn handle_edu_receipt(
472 services: &Services,
473 _client: &IpAddr,
474 origin: &ServerName,
475 receipt: ReceiptContent,
476) {
477 receipt
478 .receipts
479 .into_iter()
480 .stream()
481 .for_each_concurrent(automatic_width(), |(room_id, room_updates)| {
482 handle_edu_receipt_room(services, origin, room_id, room_updates)
483 })
484 .await;
485}
486
487async fn handle_edu_receipt_room(
488 services: &Services,
489 origin: &ServerName,
490 room_id: OwnedRoomId,
491 room_updates: ReceiptMap,
492) {
493 if services
494 .event_handler
495 .acl_check(origin, &room_id)
496 .await
497 .is_err()
498 {
499 debug_warn!(
500 %origin, %room_id,
501 "received read receipt EDU from ACL'd server"
502 );
503 return;
504 }
505
506 let room_id = &room_id;
507 room_updates
508 .read
509 .into_iter()
510 .stream()
511 .for_each_concurrent(automatic_width(), async |(user_id, user_updates)| {
512 handle_edu_receipt_room_user(services, origin, room_id, &user_id, user_updates).await;
513 })
514 .await;
515}
516
517async fn handle_edu_receipt_room_user(
518 services: &Services,
519 origin: &ServerName,
520 room_id: &RoomId,
521 user_id: &UserId,
522 user_updates: ReceiptData,
523) {
524 if user_id.server_name() != origin {
525 debug_warn!(
526 %user_id, %origin,
527 "received read receipt EDU for user not belonging to origin"
528 );
529 return;
530 }
531
532 if !services
533 .state_cache
534 .server_in_room(origin, room_id)
535 .await
536 {
537 debug_warn!(
538 %user_id, %room_id, %origin,
539 "received read receipt EDU from server who does not have a member in the room",
540 );
541 return;
542 }
543
544 let data = &user_updates.data;
545 user_updates
546 .event_ids
547 .into_iter()
548 .stream()
549 .for_each_concurrent(automatic_width(), async |event_id| {
550 let user_data = [(user_id.to_owned(), data.clone())];
551 let receipts = [(ReceiptType::Read, BTreeMap::from(user_data))];
552 let content = [(event_id.clone(), BTreeMap::from(receipts))];
553 services
554 .read_receipt
555 .readreceipt_update(user_id, room_id, &ReceiptEvent {
556 content: ReceiptEventContent(content.into()),
557 room_id: room_id.to_owned(),
558 })
559 .await;
560 })
561 .await;
562}
563
564async fn handle_edu_typing(
565 services: &Services,
566 _client: &IpAddr,
567 origin: &ServerName,
568 typing: TypingContent,
569) {
570 if typing.user_id.server_name() != origin {
571 debug_warn!(
572 %typing.user_id, %origin,
573 "received typing EDU for user not belonging to origin"
574 );
575 return;
576 }
577
578 if services
579 .event_handler
580 .acl_check(typing.user_id.server_name(), &typing.room_id)
581 .await
582 .is_err()
583 {
584 debug_warn!(
585 %typing.user_id, %typing.room_id, %origin,
586 "received typing EDU for ACL'd user's server"
587 );
588 return;
589 }
590
591 if !services
592 .state_cache
593 .is_joined(&typing.user_id, &typing.room_id)
594 .await
595 {
596 debug_warn!(
597 %typing.user_id, %typing.room_id, %origin,
598 "received typing EDU for user not in room"
599 );
600 return;
601 }
602
603 if typing.typing {
604 let secs = services.server.config.typing_federation_timeout_s;
605 let timeout = millis_since_unix_epoch().saturating_add(secs.saturating_mul(1000));
606
607 services
608 .typing
609 .typing_add(&typing.user_id, &typing.room_id, timeout)
610 .await
611 .log_err()
612 .ok();
613 } else {
614 services
615 .typing
616 .typing_remove(&typing.user_id, &typing.room_id)
617 .await
618 .log_err()
619 .ok();
620 }
621}
622
623async fn handle_edu_device_list_update(
624 services: &Services,
625 _client: &IpAddr,
626 origin: &ServerName,
627 content: DeviceListUpdateContent,
628) {
629 let DeviceListUpdateContent { user_id, .. } = content;
630
631 if user_id.server_name() != origin {
632 debug_warn!(
633 %user_id, %origin,
634 "received device list update EDU for user not belonging to origin"
635 );
636 return;
637 }
638
639 services
640 .users
641 .mark_device_key_update(&user_id)
642 .await;
643}
644
645async fn handle_edu_direct_to_device(
646 services: &Services,
647 _client: &IpAddr,
648 origin: &ServerName,
649 content: DirectDeviceContent,
650) {
651 let DirectDeviceContent {
652 ref sender,
653 ref ev_type,
654 ref message_id,
655 messages,
656 } = content;
657
658 if sender.server_name() != origin {
659 debug_warn!(
660 %sender, %origin,
661 "received direct to device EDU for user not belonging to origin"
662 );
663 return;
664 }
665
666 if services
668 .transaction_ids
669 .existing_txnid(sender, None, message_id)
670 .await
671 .is_ok()
672 {
673 return;
674 }
675
676 let ev_type = ev_type.to_string();
678 messages
679 .into_iter()
680 .stream()
681 .for_each_concurrent(automatic_width(), |(target_user_id, map)| {
682 handle_edu_direct_to_device_user(services, target_user_id, sender, &ev_type, map)
683 })
684 .await;
685
686 services
688 .transaction_ids
689 .add_txnid(sender, None, message_id, &[]);
690}
691
692async fn handle_edu_direct_to_device_user<Event: Send + Sync>(
693 services: &Services,
694 target_user_id: OwnedUserId,
695 sender: &UserId,
696 ev_type: &str,
697 map: BTreeMap<DeviceIdOrAllDevices, Raw<Event>>,
698) {
699 map.into_iter()
700 .stream()
701 .ready_filter_map(|(tid, raw)| {
702 raw.deserialize_as()
703 .map_err(|e| {
704 err!(Request(InvalidParam(error!("To-Device event is invalid: {e}"))))
705 })
706 .ok()
707 .map(|ev| (tid, ev))
708 })
709 .for_each_concurrent(automatic_width(), |(tid, ev)| {
710 handle_edu_direct_to_device_event(services, &target_user_id, sender, tid, ev_type, ev)
711 })
712 .await;
713}
714
715async fn handle_edu_direct_to_device_event(
716 services: &Services,
717 target_user_id: &UserId,
718 sender: &UserId,
719 target_device_id_maybe: DeviceIdOrAllDevices,
720 ev_type: &str,
721 event: serde_json::Value,
722) {
723 match target_device_id_maybe {
724 | DeviceIdOrAllDevices::DeviceId(ref target_device_id) => {
725 services.users.add_to_device_event(
726 sender,
727 target_user_id,
728 target_device_id,
729 ev_type,
730 &event,
731 );
732 },
733
734 | DeviceIdOrAllDevices::AllDevices => {
735 services
736 .users
737 .all_device_ids(target_user_id)
738 .ready_for_each(|target_device_id| {
739 services.users.add_to_device_event(
740 sender,
741 target_user_id,
742 target_device_id,
743 ev_type,
744 &event,
745 );
746 })
747 .await;
748 },
749 }
750}
751
752async fn handle_edu_signing_key_update(
753 services: &Services,
754 _client: &IpAddr,
755 origin: &ServerName,
756 content: SigningKeyUpdateContent,
757) {
758 let SigningKeyUpdateContent { user_id, master_key, self_signing_key } = content;
759
760 if user_id.server_name() != origin {
761 debug_warn!(
762 %user_id, %origin,
763 "received signing key update EDU from server that does not belong to user's server"
764 );
765 return;
766 }
767
768 services
769 .users
770 .add_cross_signing_keys(&user_id, &master_key, &self_signing_key, &None, true)
771 .await
772 .log_err()
773 .ok();
774}
775
776#[cfg(test)]
777mod tests {
778 use ruma::{CanonicalJsonObject, OwnedEventId, event_id, room_id};
779 use serde_json::json;
780
781 use super::{Pdu, TxnPdus, already_sorted, prev_event_ids, sort_pdus};
782
783 fn pdu(index: usize, id: &OwnedEventId, prev: &[&OwnedEventId]) -> (usize, Pdu) {
784 let prev_events: Vec<&str> = prev.iter().map(|e| e.as_str()).collect();
785 let value: CanonicalJsonObject =
786 serde_json::from_value(json!({ "prev_events": prev_events }))
787 .expect("valid canonical json");
788
789 (index, (room_id!("!r:example.com").to_owned(), id.clone(), value))
790 }
791
792 fn ids() -> (OwnedEventId, OwnedEventId, OwnedEventId) {
793 (
794 event_id!("$a:example.com").to_owned(),
795 event_id!("$b:example.com").to_owned(),
796 event_id!("$c:example.com").to_owned(),
797 )
798 }
799
800 fn order(pdus: &[(usize, Pdu)]) -> Vec<&str> {
801 pdus.iter()
802 .map(|(_, (_, id, _))| id.as_str())
803 .collect()
804 }
805
806 #[test]
807 fn sorted_when_parents_lead() {
808 let (a, b, c) = ids();
809 let pdus = [pdu(0, &a, &[]), pdu(1, &b, &[&a]), pdu(2, &c, &[&b])];
810
811 assert!(already_sorted(&pdus));
812 }
813
814 #[test]
815 fn unsorted_when_child_leads() {
816 let (a, b, _c) = ids();
817 let pdus = [pdu(0, &b, &[&a]), pdu(1, &a, &[])];
818
819 assert!(!already_sorted(&pdus));
820 }
821
822 #[test]
823 fn sorted_ignores_out_of_batch_references() {
824 let (a, b, c) = ids();
825 let pdus = [pdu(0, &b, &[&c]), pdu(1, &a, &[&c])];
826
827 assert!(already_sorted(&pdus));
828 }
829
830 #[tokio::test]
831 async fn sort_orders_parents_before_children() {
832 let (a, b, c) = ids();
833 let pdus: TxnPdus = [pdu(0, &c, &[&b]), pdu(1, &b, &[&a]), pdu(2, &a, &[])]
834 .into_iter()
835 .collect();
836
837 let sorted = sort_pdus(pdus).await;
838
839 assert_eq!(order(&sorted), ["$a:example.com", "$b:example.com", "$c:example.com"]);
840 }
841
842 #[tokio::test]
843 async fn sort_is_noop_when_already_ordered() {
844 let (a, b, c) = ids();
845 let pdus: TxnPdus = [pdu(0, &a, &[]), pdu(1, &b, &[&a]), pdu(2, &c, &[&b])]
846 .into_iter()
847 .collect();
848
849 let sorted = sort_pdus(pdus.clone()).await;
850
851 assert_eq!(order(&sorted), order(&pdus));
852 }
853
854 #[tokio::test]
855 async fn sort_preserves_duplicates() {
856 let (a, b, _c) = ids();
857 let pdus: TxnPdus = [pdu(0, &b, &[&a]), pdu(1, &a, &[]), pdu(2, &b, &[&a])]
858 .into_iter()
859 .collect();
860
861 let sorted = sort_pdus(pdus).await;
862
863 assert_eq!(sorted.len(), 3);
864 }
865
866 #[tokio::test]
867 async fn sort_preserves_a_cycle() {
868 let (a, b, _c) = ids();
869 let pdus: TxnPdus = [pdu(0, &a, &[&b]), pdu(1, &b, &[&a])]
870 .into_iter()
871 .collect();
872
873 let sorted = sort_pdus(pdus).await;
874
875 assert_eq!(sorted.len(), 2);
876 }
877
878 #[test]
879 fn prev_event_ids_reads_the_array() {
880 let (a, b, _c) = ids();
881 let (_, (_, _, value)) = pdu(0, &a, &[&b]);
882
883 let prev: Vec<&str> = prev_event_ids(&value).collect();
884
885 assert_eq!(prev, ["$b:example.com"]);
886 }
887
888 #[test]
889 fn prev_event_ids_empty_when_absent() {
890 let value = CanonicalJsonObject::new();
891
892 assert_eq!(prev_event_ids(&value).count(), 0);
893 }
894}