1use crate::{
8 models::{
9 NetworkTransactionData, TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
10 },
11 repositories::*,
12};
13use async_trait::async_trait;
14use eyre::Result;
15use itertools::Itertools;
16use std::collections::HashMap;
17use tokio::sync::{Mutex, MutexGuard};
18
19#[derive(Debug)]
20pub struct InMemoryTransactionRepository {
21 store: Mutex<HashMap<String, TransactionRepoModel>>,
22}
23
24impl Clone for InMemoryTransactionRepository {
25 fn clone(&self) -> Self {
26 let data = self
28 .store
29 .try_lock()
30 .map(|guard| guard.clone())
31 .unwrap_or_else(|_| HashMap::new());
32
33 Self {
34 store: Mutex::new(data),
35 }
36 }
37}
38
39impl InMemoryTransactionRepository {
40 pub fn new() -> Self {
41 Self {
42 store: Mutex::new(HashMap::new()),
43 }
44 }
45
46 async fn acquire_lock<T>(lock: &Mutex<T>) -> Result<MutexGuard<T>, RepositoryError> {
47 Ok(lock.lock().await)
48 }
49}
50
51#[async_trait]
54impl Repository<TransactionRepoModel, String> for InMemoryTransactionRepository {
55 async fn create(
56 &self,
57 tx: TransactionRepoModel,
58 ) -> Result<TransactionRepoModel, RepositoryError> {
59 let mut store = Self::acquire_lock(&self.store).await?;
60 if store.contains_key(&tx.id) {
61 return Err(RepositoryError::ConstraintViolation(format!(
62 "Transaction with ID {} already exists",
63 tx.id
64 )));
65 }
66 store.insert(tx.id.clone(), tx.clone());
67 Ok(tx)
68 }
69
70 async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
71 let store = Self::acquire_lock(&self.store).await?;
72 store
73 .get(&id)
74 .cloned()
75 .ok_or_else(|| RepositoryError::NotFound(format!("Transaction with ID {id} not found")))
76 }
77
78 #[allow(clippy::map_entry)]
79 async fn update(
80 &self,
81 id: String,
82 tx: TransactionRepoModel,
83 ) -> Result<TransactionRepoModel, RepositoryError> {
84 let mut store = Self::acquire_lock(&self.store).await?;
85 if store.contains_key(&id) {
86 let mut updated_tx = tx;
87 updated_tx.id = id.clone();
88 store.insert(id, updated_tx.clone());
89 Ok(updated_tx)
90 } else {
91 Err(RepositoryError::NotFound(format!(
92 "Transaction with ID {id} not found"
93 )))
94 }
95 }
96
97 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
98 let mut store = Self::acquire_lock(&self.store).await?;
99 if store.remove(&id).is_some() {
100 Ok(())
101 } else {
102 Err(RepositoryError::NotFound(format!(
103 "Transaction with ID {id} not found"
104 )))
105 }
106 }
107
108 async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
109 let store = Self::acquire_lock(&self.store).await?;
110 Ok(store.values().cloned().collect())
111 }
112
113 async fn list_paginated(
114 &self,
115 query: PaginationQuery,
116 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
117 let total = self.count().await?;
118 let start = ((query.page - 1) * query.per_page) as usize;
119 let store = Self::acquire_lock(&self.store).await?;
120 let items: Vec<TransactionRepoModel> = store
121 .values()
122 .skip(start)
123 .take(query.per_page as usize)
124 .cloned()
125 .collect();
126
127 Ok(PaginatedResult {
128 items,
129 total: total as u64,
130 page: query.page,
131 per_page: query.per_page,
132 })
133 }
134
135 async fn count(&self) -> Result<usize, RepositoryError> {
136 let store = Self::acquire_lock(&self.store).await?;
137 Ok(store.len())
138 }
139
140 async fn has_entries(&self) -> Result<bool, RepositoryError> {
141 let store = Self::acquire_lock(&self.store).await?;
142 Ok(!store.is_empty())
143 }
144
145 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
146 let mut store = Self::acquire_lock(&self.store).await?;
147 store.clear();
148 Ok(())
149 }
150}
151
152#[async_trait]
153impl TransactionRepository for InMemoryTransactionRepository {
154 async fn find_by_relayer_id(
155 &self,
156 relayer_id: &str,
157 query: PaginationQuery,
158 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
159 let store = Self::acquire_lock(&self.store).await?;
160 let filtered: Vec<TransactionRepoModel> = store
161 .values()
162 .filter(|tx| tx.relayer_id == relayer_id)
163 .cloned()
164 .collect();
165
166 let total = filtered.len() as u64;
167
168 if total == 0 {
169 return Ok(PaginatedResult::<TransactionRepoModel> {
170 items: vec![],
171 total: 0,
172 page: query.page,
173 per_page: query.per_page,
174 });
175 }
176
177 let start = ((query.page - 1) * query.per_page) as usize;
178
179 let items = filtered
181 .into_iter()
182 .sorted_by(|a, b| b.created_at.cmp(&a.created_at)) .skip(start)
184 .take(query.per_page as usize)
185 .collect();
186
187 Ok(PaginatedResult {
188 items,
189 total,
190 page: query.page,
191 per_page: query.per_page,
192 })
193 }
194
195 async fn find_by_status(
196 &self,
197 relayer_id: &str,
198 statuses: &[TransactionStatus],
199 ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
200 let store = Self::acquire_lock(&self.store).await?;
201 let filtered: Vec<TransactionRepoModel> = store
202 .values()
203 .filter(|tx| tx.relayer_id == relayer_id && statuses.contains(&tx.status))
204 .cloned()
205 .collect();
206
207 let sorted = filtered
209 .into_iter()
210 .sorted_by(|a, b| b.created_at.cmp(&a.created_at))
211 .collect();
212
213 Ok(sorted)
214 }
215
216 async fn find_by_nonce(
217 &self,
218 relayer_id: &str,
219 nonce: u64,
220 ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
221 let store = Self::acquire_lock(&self.store).await?;
222 let filtered: Vec<TransactionRepoModel> = store
223 .values()
224 .filter(|tx| {
225 tx.relayer_id == relayer_id
226 && match &tx.network_data {
227 NetworkTransactionData::Evm(data) => data.nonce == Some(nonce),
228 _ => false,
229 }
230 })
231 .cloned()
232 .collect();
233
234 Ok(filtered.into_iter().next())
235 }
236
237 async fn update_status(
238 &self,
239 tx_id: String,
240 status: TransactionStatus,
241 ) -> Result<TransactionRepoModel, RepositoryError> {
242 let update = TransactionUpdateRequest {
243 status: Some(status),
244 ..Default::default()
245 };
246 self.partial_update(tx_id, update).await
247 }
248
249 async fn partial_update(
250 &self,
251 tx_id: String,
252 update: TransactionUpdateRequest,
253 ) -> Result<TransactionRepoModel, RepositoryError> {
254 let mut store = Self::acquire_lock(&self.store).await?;
255
256 if let Some(tx) = store.get_mut(&tx_id) {
257 tx.apply_partial_update(update);
259 Ok(tx.clone())
260 } else {
261 Err(RepositoryError::NotFound(format!(
262 "Transaction with ID {tx_id} not found"
263 )))
264 }
265 }
266
267 async fn update_network_data(
268 &self,
269 tx_id: String,
270 network_data: NetworkTransactionData,
271 ) -> Result<TransactionRepoModel, RepositoryError> {
272 let mut tx = self.get_by_id(tx_id.clone()).await?;
273 tx.network_data = network_data;
274 self.update(tx_id, tx).await
275 }
276
277 async fn set_sent_at(
278 &self,
279 tx_id: String,
280 sent_at: String,
281 ) -> Result<TransactionRepoModel, RepositoryError> {
282 let mut tx = self.get_by_id(tx_id.clone()).await?;
283 tx.sent_at = Some(sent_at);
284 self.update(tx_id, tx).await
285 }
286
287 async fn set_confirmed_at(
288 &self,
289 tx_id: String,
290 confirmed_at: String,
291 ) -> Result<TransactionRepoModel, RepositoryError> {
292 let mut tx = self.get_by_id(tx_id.clone()).await?;
293 tx.confirmed_at = Some(confirmed_at);
294 self.update(tx_id, tx).await
295 }
296}
297
298impl Default for InMemoryTransactionRepository {
299 fn default() -> Self {
300 Self::new()
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
307 use lazy_static::lazy_static;
308 use std::str::FromStr;
309
310 use crate::models::U256;
311
312 use super::*;
313
314 use tokio::sync::Mutex;
315
316 lazy_static! {
317 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
318 }
319 fn create_test_transaction(id: &str) -> TransactionRepoModel {
321 TransactionRepoModel {
322 id: id.to_string(),
323 relayer_id: "relayer-1".to_string(),
324 status: TransactionStatus::Pending,
325 status_reason: None,
326 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
327 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
328 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
329 valid_until: None,
330 delete_at: None,
331 network_type: NetworkType::Evm,
332 priced_at: None,
333 hashes: vec![],
334 network_data: NetworkTransactionData::Evm(EvmTransactionData {
335 gas_price: Some(1000000000),
336 gas_limit: Some(21000),
337 nonce: Some(1),
338 value: U256::from_str("1000000000000000000").unwrap(),
339 data: Some("0x".to_string()),
340 from: "0xSender".to_string(),
341 to: Some("0xRecipient".to_string()),
342 chain_id: 1,
343 signature: None,
344 hash: Some(format!("0x{}", id)),
345 speed: Some(Speed::Fast),
346 max_fee_per_gas: None,
347 max_priority_fee_per_gas: None,
348 raw: None,
349 }),
350 noop_count: None,
351 is_canceled: Some(false),
352 }
353 }
354
355 fn create_test_transaction_pending_state(id: &str) -> TransactionRepoModel {
356 TransactionRepoModel {
357 id: id.to_string(),
358 relayer_id: "relayer-1".to_string(),
359 status: TransactionStatus::Pending,
360 status_reason: None,
361 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
362 sent_at: None,
363 confirmed_at: None,
364 valid_until: None,
365 delete_at: None,
366 network_type: NetworkType::Evm,
367 priced_at: None,
368 hashes: vec![],
369 network_data: NetworkTransactionData::Evm(EvmTransactionData {
370 gas_price: Some(1000000000),
371 gas_limit: Some(21000),
372 nonce: Some(1),
373 value: U256::from_str("1000000000000000000").unwrap(),
374 data: Some("0x".to_string()),
375 from: "0xSender".to_string(),
376 to: Some("0xRecipient".to_string()),
377 chain_id: 1,
378 signature: None,
379 hash: Some(format!("0x{}", id)),
380 speed: Some(Speed::Fast),
381 max_fee_per_gas: None,
382 max_priority_fee_per_gas: None,
383 raw: None,
384 }),
385 noop_count: None,
386 is_canceled: Some(false),
387 }
388 }
389
390 #[tokio::test]
391 async fn test_create_transaction() {
392 let repo = InMemoryTransactionRepository::new();
393 let tx = create_test_transaction("test-1");
394
395 let result = repo.create(tx.clone()).await.unwrap();
396 assert_eq!(result.id, tx.id);
397 assert_eq!(repo.count().await.unwrap(), 1);
398 }
399
400 #[tokio::test]
401 async fn test_get_transaction() {
402 let repo = InMemoryTransactionRepository::new();
403 let tx = create_test_transaction("test-1");
404
405 repo.create(tx.clone()).await.unwrap();
406 let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
407 if let NetworkTransactionData::Evm(stored_data) = &stored.network_data {
408 if let NetworkTransactionData::Evm(tx_data) = &tx.network_data {
409 assert_eq!(stored_data.hash, tx_data.hash);
410 }
411 }
412 }
413
414 #[tokio::test]
415 async fn test_update_transaction() {
416 let repo = InMemoryTransactionRepository::new();
417 let mut tx = create_test_transaction("test-1");
418
419 repo.create(tx.clone()).await.unwrap();
420 tx.status = TransactionStatus::Confirmed;
421
422 let updated = repo.update("test-1".to_string(), tx).await.unwrap();
423 assert!(matches!(updated.status, TransactionStatus::Confirmed));
424 }
425
426 #[tokio::test]
427 async fn test_delete_transaction() {
428 let repo = InMemoryTransactionRepository::new();
429 let tx = create_test_transaction("test-1");
430
431 repo.create(tx).await.unwrap();
432 repo.delete_by_id("test-1".to_string()).await.unwrap();
433
434 let result = repo.get_by_id("test-1".to_string()).await;
435 assert!(result.is_err());
436 }
437
438 #[tokio::test]
439 async fn test_list_all_transactions() {
440 let repo = InMemoryTransactionRepository::new();
441 let tx1 = create_test_transaction("test-1");
442 let tx2 = create_test_transaction("test-2");
443
444 repo.create(tx1).await.unwrap();
445 repo.create(tx2).await.unwrap();
446
447 let transactions = repo.list_all().await.unwrap();
448 assert_eq!(transactions.len(), 2);
449 }
450
451 #[tokio::test]
452 async fn test_count_transactions() {
453 let repo = InMemoryTransactionRepository::new();
454 let tx = create_test_transaction("test-1");
455
456 assert_eq!(repo.count().await.unwrap(), 0);
457 repo.create(tx).await.unwrap();
458 assert_eq!(repo.count().await.unwrap(), 1);
459 }
460
461 #[tokio::test]
462 async fn test_get_nonexistent_transaction() {
463 let repo = InMemoryTransactionRepository::new();
464 let result = repo.get_by_id("nonexistent".to_string()).await;
465 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
466 }
467
468 #[tokio::test]
469 async fn test_duplicate_transaction_creation() {
470 let repo = InMemoryTransactionRepository::new();
471 let tx = create_test_transaction("test-1");
472
473 repo.create(tx.clone()).await.unwrap();
474 let result = repo.create(tx).await;
475
476 assert!(matches!(
477 result,
478 Err(RepositoryError::ConstraintViolation(_))
479 ));
480 }
481
482 #[tokio::test]
483 async fn test_update_nonexistent_transaction() {
484 let repo = InMemoryTransactionRepository::new();
485 let tx = create_test_transaction("test-1");
486
487 let result = repo.update("nonexistent".to_string(), tx).await;
488 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
489 }
490
491 #[tokio::test]
492 async fn test_partial_update() {
493 let repo = InMemoryTransactionRepository::new();
494 let tx = create_test_transaction_pending_state("test-tx-id");
495 repo.create(tx.clone()).await.unwrap();
496
497 let update1 = TransactionUpdateRequest {
499 status: Some(TransactionStatus::Sent),
500 status_reason: None,
501 sent_at: None,
502 confirmed_at: None,
503 network_data: None,
504 hashes: None,
505 priced_at: None,
506 noop_count: None,
507 is_canceled: None,
508 delete_at: None,
509 };
510 let updated_tx1 = repo
511 .partial_update("test-tx-id".to_string(), update1)
512 .await
513 .unwrap();
514 assert_eq!(updated_tx1.status, TransactionStatus::Sent);
515 assert_eq!(updated_tx1.sent_at, None);
516
517 let update2 = TransactionUpdateRequest {
519 status: Some(TransactionStatus::Confirmed),
520 status_reason: None,
521 sent_at: Some("2023-01-01T12:00:00Z".to_string()),
522 confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
523 network_data: None,
524 hashes: None,
525 priced_at: None,
526 noop_count: None,
527 is_canceled: None,
528 delete_at: None,
529 };
530 let updated_tx2 = repo
531 .partial_update("test-tx-id".to_string(), update2)
532 .await
533 .unwrap();
534 assert_eq!(updated_tx2.status, TransactionStatus::Confirmed);
535 assert_eq!(
536 updated_tx2.sent_at,
537 Some("2023-01-01T12:00:00Z".to_string())
538 );
539 assert_eq!(
540 updated_tx2.confirmed_at,
541 Some("2023-01-01T12:05:00Z".to_string())
542 );
543
544 let update3 = TransactionUpdateRequest {
546 status: Some(TransactionStatus::Failed),
547 status_reason: None,
548 sent_at: None,
549 confirmed_at: None,
550 network_data: None,
551 hashes: None,
552 priced_at: None,
553 noop_count: None,
554 is_canceled: None,
555 delete_at: None,
556 };
557 let result = repo
558 .partial_update("non-existent-id".to_string(), update3)
559 .await;
560 assert!(result.is_err());
561 assert!(matches!(result.unwrap_err(), RepositoryError::NotFound(_)));
562 }
563
564 #[tokio::test]
565 async fn test_update_status() {
566 let repo = InMemoryTransactionRepository::new();
567 let tx = create_test_transaction("test-1");
568
569 repo.create(tx).await.unwrap();
570
571 let updated = repo
573 .update_status("test-1".to_string(), TransactionStatus::Confirmed)
574 .await
575 .unwrap();
576
577 assert_eq!(updated.status, TransactionStatus::Confirmed);
579
580 let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
582 assert_eq!(stored.status, TransactionStatus::Confirmed);
583
584 let updated = repo
586 .update_status("test-1".to_string(), TransactionStatus::Failed)
587 .await
588 .unwrap();
589
590 assert_eq!(updated.status, TransactionStatus::Failed);
592
593 let result = repo
595 .update_status("non-existent".to_string(), TransactionStatus::Confirmed)
596 .await;
597 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
598 }
599
600 #[tokio::test]
601 async fn test_list_paginated() {
602 let repo = InMemoryTransactionRepository::new();
603
604 for i in 1..=10 {
606 let tx = create_test_transaction(&format!("test-{}", i));
607 repo.create(tx).await.unwrap();
608 }
609
610 let query = PaginationQuery {
612 page: 1,
613 per_page: 3,
614 };
615 let result = repo.list_paginated(query).await.unwrap();
616 assert_eq!(result.items.len(), 3);
617 assert_eq!(result.total, 10);
618 assert_eq!(result.page, 1);
619 assert_eq!(result.per_page, 3);
620
621 let query = PaginationQuery {
623 page: 2,
624 per_page: 3,
625 };
626 let result = repo.list_paginated(query).await.unwrap();
627 assert_eq!(result.items.len(), 3);
628 assert_eq!(result.total, 10);
629 assert_eq!(result.page, 2);
630 assert_eq!(result.per_page, 3);
631
632 let query = PaginationQuery {
634 page: 4,
635 per_page: 3,
636 };
637 let result = repo.list_paginated(query).await.unwrap();
638 assert_eq!(result.items.len(), 1);
639 assert_eq!(result.total, 10);
640 assert_eq!(result.page, 4);
641 assert_eq!(result.per_page, 3);
642
643 let query = PaginationQuery {
645 page: 5,
646 per_page: 3,
647 };
648 let result = repo.list_paginated(query).await.unwrap();
649 assert_eq!(result.items.len(), 0);
650 assert_eq!(result.total, 10);
651 }
652
653 #[tokio::test]
654 async fn test_find_by_nonce() {
655 let repo = InMemoryTransactionRepository::new();
656
657 let tx1 = create_test_transaction("test-1");
659
660 let mut tx2 = create_test_transaction("test-2");
661 if let NetworkTransactionData::Evm(ref mut data) = tx2.network_data {
662 data.nonce = Some(2);
663 }
664
665 let mut tx3 = create_test_transaction("test-3");
666 tx3.relayer_id = "relayer-2".to_string();
667 if let NetworkTransactionData::Evm(ref mut data) = tx3.network_data {
668 data.nonce = Some(1);
669 }
670
671 repo.create(tx1).await.unwrap();
672 repo.create(tx2).await.unwrap();
673 repo.create(tx3).await.unwrap();
674
675 let result = repo.find_by_nonce("relayer-1", 1).await.unwrap();
677 assert!(result.is_some());
678 assert_eq!(result.as_ref().unwrap().id, "test-1");
679
680 let result = repo.find_by_nonce("relayer-1", 2).await.unwrap();
682 assert!(result.is_some());
683 assert_eq!(result.as_ref().unwrap().id, "test-2");
684
685 let result = repo.find_by_nonce("relayer-2", 1).await.unwrap();
687 assert!(result.is_some());
688 assert_eq!(result.as_ref().unwrap().id, "test-3");
689
690 let result = repo.find_by_nonce("relayer-1", 99).await.unwrap();
692 assert!(result.is_none());
693 }
694
695 #[tokio::test]
696 async fn test_update_network_data() {
697 let repo = InMemoryTransactionRepository::new();
698 let tx = create_test_transaction("test-1");
699
700 repo.create(tx.clone()).await.unwrap();
701
702 let updated_network_data = NetworkTransactionData::Evm(EvmTransactionData {
704 gas_price: Some(2000000000),
705 gas_limit: Some(30000),
706 nonce: Some(2),
707 value: U256::from_str("2000000000000000000").unwrap(),
708 data: Some("0xUpdated".to_string()),
709 from: "0xSender".to_string(),
710 to: Some("0xRecipient".to_string()),
711 chain_id: 1,
712 signature: None,
713 hash: Some("0xUpdated".to_string()),
714 raw: None,
715 speed: None,
716 max_fee_per_gas: None,
717 max_priority_fee_per_gas: None,
718 });
719
720 let updated = repo
721 .update_network_data("test-1".to_string(), updated_network_data)
722 .await
723 .unwrap();
724
725 if let NetworkTransactionData::Evm(data) = &updated.network_data {
727 assert_eq!(data.gas_price, Some(2000000000));
728 assert_eq!(data.gas_limit, Some(30000));
729 assert_eq!(data.nonce, Some(2));
730 assert_eq!(data.hash, Some("0xUpdated".to_string()));
731 assert_eq!(data.data, Some("0xUpdated".to_string()));
732 } else {
733 panic!("Expected EVM network data");
734 }
735 }
736
737 #[tokio::test]
738 async fn test_set_sent_at() {
739 let repo = InMemoryTransactionRepository::new();
740 let tx = create_test_transaction("test-1");
741
742 repo.create(tx).await.unwrap();
743
744 let new_sent_at = "2025-02-01T10:00:00.000000+00:00".to_string();
746
747 let updated = repo
748 .set_sent_at("test-1".to_string(), new_sent_at.clone())
749 .await
750 .unwrap();
751
752 assert_eq!(updated.sent_at, Some(new_sent_at.clone()));
754
755 let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
757 assert_eq!(stored.sent_at, Some(new_sent_at.clone()));
758 }
759
760 #[tokio::test]
761 async fn test_set_confirmed_at() {
762 let repo = InMemoryTransactionRepository::new();
763 let tx = create_test_transaction("test-1");
764
765 repo.create(tx).await.unwrap();
766
767 let new_confirmed_at = "2025-02-01T11:30:45.123456+00:00".to_string();
769
770 let updated = repo
771 .set_confirmed_at("test-1".to_string(), new_confirmed_at.clone())
772 .await
773 .unwrap();
774
775 assert_eq!(updated.confirmed_at, Some(new_confirmed_at.clone()));
777
778 let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
780 assert_eq!(stored.confirmed_at, Some(new_confirmed_at.clone()));
781 }
782
783 #[tokio::test]
784 async fn test_find_by_relayer_id() {
785 let repo = InMemoryTransactionRepository::new();
786 let tx1 = create_test_transaction("test-1");
787 let tx2 = create_test_transaction("test-2");
788
789 let mut tx3 = create_test_transaction("test-3");
791 tx3.relayer_id = "relayer-2".to_string();
792
793 repo.create(tx1).await.unwrap();
794 repo.create(tx2).await.unwrap();
795 repo.create(tx3).await.unwrap();
796
797 let query = PaginationQuery {
799 page: 1,
800 per_page: 10,
801 };
802 let result = repo
803 .find_by_relayer_id("relayer-1", query.clone())
804 .await
805 .unwrap();
806 assert_eq!(result.total, 2);
807 assert_eq!(result.items.len(), 2);
808 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
809
810 let result = repo
812 .find_by_relayer_id("relayer-2", query.clone())
813 .await
814 .unwrap();
815 assert_eq!(result.total, 1);
816 assert_eq!(result.items.len(), 1);
817 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
818
819 let result = repo
821 .find_by_relayer_id("non-existent", query.clone())
822 .await
823 .unwrap();
824 assert_eq!(result.total, 0);
825 assert_eq!(result.items.len(), 0);
826 }
827
828 #[tokio::test]
829 async fn test_find_by_relayer_id_sorted_by_created_at_newest_first() {
830 let repo = InMemoryTransactionRepository::new();
831
832 let mut tx1 = create_test_transaction("test-1");
834 tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); let mut tx2 = create_test_transaction("test-2");
837 tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); let mut tx3 = create_test_transaction("test-3");
840 tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); repo.create(tx2.clone()).await.unwrap(); repo.create(tx1.clone()).await.unwrap(); repo.create(tx3.clone()).await.unwrap(); let query = PaginationQuery {
848 page: 1,
849 per_page: 10,
850 };
851 let result = repo.find_by_relayer_id("relayer-1", query).await.unwrap();
852
853 assert_eq!(result.total, 3);
854 assert_eq!(result.items.len(), 3);
855
856 assert_eq!(
858 result.items[0].id, "test-3",
859 "First item should be newest (test-3)"
860 );
861 assert_eq!(
862 result.items[0].created_at,
863 "2025-01-27T14:00:00.000000+00:00"
864 );
865
866 assert_eq!(
867 result.items[1].id, "test-2",
868 "Second item should be middle (test-2)"
869 );
870 assert_eq!(
871 result.items[1].created_at,
872 "2025-01-27T12:00:00.000000+00:00"
873 );
874
875 assert_eq!(
876 result.items[2].id, "test-1",
877 "Third item should be oldest (test-1)"
878 );
879 assert_eq!(
880 result.items[2].created_at,
881 "2025-01-27T10:00:00.000000+00:00"
882 );
883 }
884
885 #[tokio::test]
886 async fn test_find_by_status() {
887 let repo = InMemoryTransactionRepository::new();
888 let tx1 = create_test_transaction_pending_state("tx1");
889 let mut tx2 = create_test_transaction_pending_state("tx2");
890 tx2.status = TransactionStatus::Submitted;
891 let mut tx3 = create_test_transaction_pending_state("tx3");
892 tx3.relayer_id = "relayer-2".to_string();
893 tx3.status = TransactionStatus::Pending;
894
895 repo.create(tx1.clone()).await.unwrap();
896 repo.create(tx2.clone()).await.unwrap();
897 repo.create(tx3.clone()).await.unwrap();
898
899 let pending_txs = repo
901 .find_by_status("relayer-1", &[TransactionStatus::Pending])
902 .await
903 .unwrap();
904 assert_eq!(pending_txs.len(), 1);
905 assert_eq!(pending_txs[0].id, "tx1");
906
907 let submitted_txs = repo
908 .find_by_status("relayer-1", &[TransactionStatus::Submitted])
909 .await
910 .unwrap();
911 assert_eq!(submitted_txs.len(), 1);
912 assert_eq!(submitted_txs[0].id, "tx2");
913
914 let multiple_status_txs = repo
916 .find_by_status(
917 "relayer-1",
918 &[TransactionStatus::Pending, TransactionStatus::Submitted],
919 )
920 .await
921 .unwrap();
922 assert_eq!(multiple_status_txs.len(), 2);
923
924 let relayer2_pending = repo
926 .find_by_status("relayer-2", &[TransactionStatus::Pending])
927 .await
928 .unwrap();
929 assert_eq!(relayer2_pending.len(), 1);
930 assert_eq!(relayer2_pending[0].id, "tx3");
931
932 let no_txs = repo
934 .find_by_status("non-existent", &[TransactionStatus::Pending])
935 .await
936 .unwrap();
937 assert_eq!(no_txs.len(), 0);
938 }
939
940 #[tokio::test]
941 async fn test_find_by_status_sorted_by_created_at() {
942 let repo = InMemoryTransactionRepository::new();
943
944 let create_tx_with_timestamp = |id: &str, timestamp: &str| -> TransactionRepoModel {
946 let mut tx = create_test_transaction_pending_state(id);
947 tx.created_at = timestamp.to_string();
948 tx.status = TransactionStatus::Pending;
949 tx
950 };
951
952 let tx3 = create_tx_with_timestamp("tx3", "2025-01-27T17:00:00.000000+00:00"); let tx1 = create_tx_with_timestamp("tx1", "2025-01-27T15:00:00.000000+00:00"); let tx2 = create_tx_with_timestamp("tx2", "2025-01-27T16:00:00.000000+00:00"); repo.create(tx3.clone()).await.unwrap();
959 repo.create(tx1.clone()).await.unwrap();
960 repo.create(tx2.clone()).await.unwrap();
961
962 let result = repo
964 .find_by_status("relayer-1", &[TransactionStatus::Pending])
965 .await
966 .unwrap();
967
968 assert_eq!(result.len(), 3);
970 assert_eq!(result[0].id, "tx3"); assert_eq!(result[1].id, "tx2"); assert_eq!(result[2].id, "tx1"); assert_eq!(result[0].created_at, "2025-01-27T17:00:00.000000+00:00");
976 assert_eq!(result[1].created_at, "2025-01-27T16:00:00.000000+00:00");
977 assert_eq!(result[2].created_at, "2025-01-27T15:00:00.000000+00:00");
978 }
979
980 #[tokio::test]
981 async fn test_has_entries() {
982 let repo = InMemoryTransactionRepository::new();
983 assert!(!repo.has_entries().await.unwrap());
984
985 let tx = create_test_transaction("test");
986 repo.create(tx.clone()).await.unwrap();
987
988 assert!(repo.has_entries().await.unwrap());
989 }
990
991 #[tokio::test]
992 async fn test_drop_all_entries() {
993 let repo = InMemoryTransactionRepository::new();
994 let tx = create_test_transaction("test");
995 repo.create(tx.clone()).await.unwrap();
996
997 assert!(repo.has_entries().await.unwrap());
998
999 repo.drop_all_entries().await.unwrap();
1000 assert!(!repo.has_entries().await.unwrap());
1001 }
1002
1003 #[tokio::test]
1006 async fn test_update_status_sets_delete_at_for_final_statuses() {
1007 let _lock = ENV_MUTEX.lock().await;
1008
1009 use chrono::{DateTime, Duration, Utc};
1010 use std::env;
1011
1012 env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
1014
1015 let repo = InMemoryTransactionRepository::new();
1016
1017 let final_statuses = [
1018 TransactionStatus::Canceled,
1019 TransactionStatus::Confirmed,
1020 TransactionStatus::Failed,
1021 TransactionStatus::Expired,
1022 ];
1023
1024 for (i, status) in final_statuses.iter().enumerate() {
1025 let tx_id = format!("test-final-{}", i);
1026 let tx = create_test_transaction_pending_state(&tx_id);
1027
1028 assert!(tx.delete_at.is_none());
1030
1031 repo.create(tx).await.unwrap();
1032
1033 let before_update = Utc::now();
1034
1035 let updated = repo
1037 .update_status(tx_id.clone(), status.clone())
1038 .await
1039 .unwrap();
1040
1041 assert!(
1043 updated.delete_at.is_some(),
1044 "delete_at should be set for status: {:?}",
1045 status
1046 );
1047
1048 let delete_at_str = updated.delete_at.unwrap();
1050 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1051 .expect("delete_at should be valid RFC3339")
1052 .with_timezone(&Utc);
1053
1054 let duration_from_before = delete_at.signed_duration_since(before_update);
1055 let expected_duration = Duration::hours(6);
1056 let tolerance = Duration::minutes(5);
1057
1058 assert!(
1059 duration_from_before >= expected_duration - tolerance &&
1060 duration_from_before <= expected_duration + tolerance,
1061 "delete_at should be approximately 6 hours from now for status: {:?}. Duration: {:?}",
1062 status, duration_from_before
1063 );
1064 }
1065
1066 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1068 }
1069
1070 #[tokio::test]
1071 async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
1072 let _lock = ENV_MUTEX.lock().await;
1073
1074 use std::env;
1075
1076 env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
1077
1078 let repo = InMemoryTransactionRepository::new();
1079
1080 let non_final_statuses = [
1081 TransactionStatus::Pending,
1082 TransactionStatus::Sent,
1083 TransactionStatus::Submitted,
1084 TransactionStatus::Mined,
1085 ];
1086
1087 for (i, status) in non_final_statuses.iter().enumerate() {
1088 let tx_id = format!("test-non-final-{}", i);
1089 let tx = create_test_transaction_pending_state(&tx_id);
1090
1091 repo.create(tx).await.unwrap();
1092
1093 let updated = repo
1095 .update_status(tx_id.clone(), status.clone())
1096 .await
1097 .unwrap();
1098
1099 assert!(
1101 updated.delete_at.is_none(),
1102 "delete_at should NOT be set for status: {:?}",
1103 status
1104 );
1105 }
1106
1107 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1109 }
1110
1111 #[tokio::test]
1112 async fn test_partial_update_sets_delete_at_for_final_statuses() {
1113 let _lock = ENV_MUTEX.lock().await;
1114
1115 use chrono::{DateTime, Duration, Utc};
1116 use std::env;
1117
1118 env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
1119
1120 let repo = InMemoryTransactionRepository::new();
1121 let tx = create_test_transaction_pending_state("test-partial-final");
1122
1123 repo.create(tx).await.unwrap();
1124
1125 let before_update = Utc::now();
1126
1127 let update = TransactionUpdateRequest {
1129 status: Some(TransactionStatus::Confirmed),
1130 status_reason: Some("Transaction completed".to_string()),
1131 confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
1132 ..Default::default()
1133 };
1134
1135 let updated = repo
1136 .partial_update("test-partial-final".to_string(), update)
1137 .await
1138 .unwrap();
1139
1140 assert!(
1142 updated.delete_at.is_some(),
1143 "delete_at should be set when updating to Confirmed status"
1144 );
1145
1146 let delete_at_str = updated.delete_at.unwrap();
1148 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1149 .expect("delete_at should be valid RFC3339")
1150 .with_timezone(&Utc);
1151
1152 let duration_from_before = delete_at.signed_duration_since(before_update);
1153 let expected_duration = Duration::hours(8);
1154 let tolerance = Duration::minutes(5);
1155
1156 assert!(
1157 duration_from_before >= expected_duration - tolerance
1158 && duration_from_before <= expected_duration + tolerance,
1159 "delete_at should be approximately 8 hours from now. Duration: {:?}",
1160 duration_from_before
1161 );
1162
1163 assert_eq!(updated.status, TransactionStatus::Confirmed);
1165 assert_eq!(
1166 updated.status_reason,
1167 Some("Transaction completed".to_string())
1168 );
1169 assert_eq!(
1170 updated.confirmed_at,
1171 Some("2023-01-01T12:05:00Z".to_string())
1172 );
1173
1174 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1176 }
1177
1178 #[tokio::test]
1179 async fn test_update_status_preserves_existing_delete_at() {
1180 let _lock = ENV_MUTEX.lock().await;
1181
1182 use std::env;
1183
1184 env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
1185
1186 let repo = InMemoryTransactionRepository::new();
1187 let mut tx = create_test_transaction_pending_state("test-preserve-delete-at");
1188
1189 let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
1191 tx.delete_at = Some(existing_delete_at.clone());
1192
1193 repo.create(tx).await.unwrap();
1194
1195 let updated = repo
1197 .update_status(
1198 "test-preserve-delete-at".to_string(),
1199 TransactionStatus::Confirmed,
1200 )
1201 .await
1202 .unwrap();
1203
1204 assert_eq!(
1206 updated.delete_at,
1207 Some(existing_delete_at),
1208 "Existing delete_at should be preserved when updating to final status"
1209 );
1210
1211 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1213 }
1214
1215 #[tokio::test]
1216 async fn test_partial_update_without_status_change_preserves_delete_at() {
1217 let _lock = ENV_MUTEX.lock().await;
1218
1219 use std::env;
1220
1221 env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
1222
1223 let repo = InMemoryTransactionRepository::new();
1224 let tx = create_test_transaction_pending_state("test-preserve-no-status");
1225
1226 repo.create(tx).await.unwrap();
1227
1228 let updated1 = repo
1230 .update_status(
1231 "test-preserve-no-status".to_string(),
1232 TransactionStatus::Confirmed,
1233 )
1234 .await
1235 .unwrap();
1236
1237 assert!(updated1.delete_at.is_some());
1238 let original_delete_at = updated1.delete_at.clone();
1239
1240 let update = TransactionUpdateRequest {
1242 status: None, status_reason: Some("Updated reason".to_string()),
1244 confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
1245 ..Default::default()
1246 };
1247
1248 let updated2 = repo
1249 .partial_update("test-preserve-no-status".to_string(), update)
1250 .await
1251 .unwrap();
1252
1253 assert_eq!(
1255 updated2.delete_at, original_delete_at,
1256 "delete_at should be preserved when status is not updated"
1257 );
1258
1259 assert_eq!(updated2.status, TransactionStatus::Confirmed); assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
1262 assert_eq!(
1263 updated2.confirmed_at,
1264 Some("2023-01-01T12:10:00Z".to_string())
1265 );
1266
1267 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1269 }
1270
1271 #[tokio::test]
1272 async fn test_update_status_multiple_updates_idempotent() {
1273 let _lock = ENV_MUTEX.lock().await;
1274
1275 use std::env;
1276
1277 env::set_var("TRANSACTION_EXPIRATION_HOURS", "12");
1278
1279 let repo = InMemoryTransactionRepository::new();
1280 let tx = create_test_transaction_pending_state("test-idempotent");
1281
1282 repo.create(tx).await.unwrap();
1283
1284 let updated1 = repo
1286 .update_status("test-idempotent".to_string(), TransactionStatus::Confirmed)
1287 .await
1288 .unwrap();
1289
1290 assert!(updated1.delete_at.is_some());
1291 let first_delete_at = updated1.delete_at.clone();
1292
1293 let updated2 = repo
1295 .update_status("test-idempotent".to_string(), TransactionStatus::Failed)
1296 .await
1297 .unwrap();
1298
1299 assert_eq!(
1301 updated2.delete_at, first_delete_at,
1302 "delete_at should not change on subsequent final status updates"
1303 );
1304
1305 assert_eq!(updated2.status, TransactionStatus::Failed);
1307
1308 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1310 }
1311}