openzeppelin_relayer/repositories/transaction/
transaction_in_memory.rs

1//! This module defines an in-memory transaction repository for managing
2//! transaction data. It provides asynchronous methods for creating, retrieving,
3//! updating, and deleting transactions, as well as querying transactions by
4//! various criteria such as relayer ID, status, and nonce. The repository
5//! is implemented using a `Mutex`-protected `HashMap` to store transaction
6//! data, ensuring thread-safe access in an asynchronous context.
7use crate::{
8    models::{
9        NetworkTransactionData, TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
10    },
11    repositories::*,
12};
13use async_trait::async_trait;
14use eyre::Result;
15use itertools::Itertools;
16use std::collections::HashMap;
17use tokio::sync::{Mutex, MutexGuard};
18
19#[derive(Debug)]
20pub struct InMemoryTransactionRepository {
21    store: Mutex<HashMap<String, TransactionRepoModel>>,
22}
23
24impl Clone for InMemoryTransactionRepository {
25    fn clone(&self) -> Self {
26        // Try to get the current data, or use empty HashMap if lock fails
27        let data = self
28            .store
29            .try_lock()
30            .map(|guard| guard.clone())
31            .unwrap_or_else(|_| HashMap::new());
32
33        Self {
34            store: Mutex::new(data),
35        }
36    }
37}
38
39impl InMemoryTransactionRepository {
40    pub fn new() -> Self {
41        Self {
42            store: Mutex::new(HashMap::new()),
43        }
44    }
45
46    async fn acquire_lock<T>(lock: &Mutex<T>) -> Result<MutexGuard<T>, RepositoryError> {
47        Ok(lock.lock().await)
48    }
49}
50
51// Implement both traits for InMemoryTransactionRepository
52
53#[async_trait]
54impl Repository<TransactionRepoModel, String> for InMemoryTransactionRepository {
55    async fn create(
56        &self,
57        tx: TransactionRepoModel,
58    ) -> Result<TransactionRepoModel, RepositoryError> {
59        let mut store = Self::acquire_lock(&self.store).await?;
60        if store.contains_key(&tx.id) {
61            return Err(RepositoryError::ConstraintViolation(format!(
62                "Transaction with ID {} already exists",
63                tx.id
64            )));
65        }
66        store.insert(tx.id.clone(), tx.clone());
67        Ok(tx)
68    }
69
70    async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
71        let store = Self::acquire_lock(&self.store).await?;
72        store
73            .get(&id)
74            .cloned()
75            .ok_or_else(|| RepositoryError::NotFound(format!("Transaction with ID {id} not found")))
76    }
77
78    #[allow(clippy::map_entry)]
79    async fn update(
80        &self,
81        id: String,
82        tx: TransactionRepoModel,
83    ) -> Result<TransactionRepoModel, RepositoryError> {
84        let mut store = Self::acquire_lock(&self.store).await?;
85        if store.contains_key(&id) {
86            let mut updated_tx = tx;
87            updated_tx.id = id.clone();
88            store.insert(id, updated_tx.clone());
89            Ok(updated_tx)
90        } else {
91            Err(RepositoryError::NotFound(format!(
92                "Transaction with ID {id} not found"
93            )))
94        }
95    }
96
97    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
98        let mut store = Self::acquire_lock(&self.store).await?;
99        if store.remove(&id).is_some() {
100            Ok(())
101        } else {
102            Err(RepositoryError::NotFound(format!(
103                "Transaction with ID {id} not found"
104            )))
105        }
106    }
107
108    async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
109        let store = Self::acquire_lock(&self.store).await?;
110        Ok(store.values().cloned().collect())
111    }
112
113    async fn list_paginated(
114        &self,
115        query: PaginationQuery,
116    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
117        let total = self.count().await?;
118        let start = ((query.page - 1) * query.per_page) as usize;
119        let store = Self::acquire_lock(&self.store).await?;
120        let items: Vec<TransactionRepoModel> = store
121            .values()
122            .skip(start)
123            .take(query.per_page as usize)
124            .cloned()
125            .collect();
126
127        Ok(PaginatedResult {
128            items,
129            total: total as u64,
130            page: query.page,
131            per_page: query.per_page,
132        })
133    }
134
135    async fn count(&self) -> Result<usize, RepositoryError> {
136        let store = Self::acquire_lock(&self.store).await?;
137        Ok(store.len())
138    }
139
140    async fn has_entries(&self) -> Result<bool, RepositoryError> {
141        let store = Self::acquire_lock(&self.store).await?;
142        Ok(!store.is_empty())
143    }
144
145    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
146        let mut store = Self::acquire_lock(&self.store).await?;
147        store.clear();
148        Ok(())
149    }
150}
151
152#[async_trait]
153impl TransactionRepository for InMemoryTransactionRepository {
154    async fn find_by_relayer_id(
155        &self,
156        relayer_id: &str,
157        query: PaginationQuery,
158    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
159        let store = Self::acquire_lock(&self.store).await?;
160        let filtered: Vec<TransactionRepoModel> = store
161            .values()
162            .filter(|tx| tx.relayer_id == relayer_id)
163            .cloned()
164            .collect();
165
166        let total = filtered.len() as u64;
167
168        if total == 0 {
169            return Ok(PaginatedResult::<TransactionRepoModel> {
170                items: vec![],
171                total: 0,
172                page: query.page,
173                per_page: query.per_page,
174            });
175        }
176
177        let start = ((query.page - 1) * query.per_page) as usize;
178
179        // Sort and paginate (newest first)
180        let items = filtered
181            .into_iter()
182            .sorted_by(|a, b| b.created_at.cmp(&a.created_at)) // Sort by created_at descending (newest first)
183            .skip(start)
184            .take(query.per_page as usize)
185            .collect();
186
187        Ok(PaginatedResult {
188            items,
189            total,
190            page: query.page,
191            per_page: query.per_page,
192        })
193    }
194
195    async fn find_by_status(
196        &self,
197        relayer_id: &str,
198        statuses: &[TransactionStatus],
199    ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
200        let store = Self::acquire_lock(&self.store).await?;
201        let filtered: Vec<TransactionRepoModel> = store
202            .values()
203            .filter(|tx| tx.relayer_id == relayer_id && statuses.contains(&tx.status))
204            .cloned()
205            .collect();
206
207        // Sort by created_at (newest first)
208        let sorted = filtered
209            .into_iter()
210            .sorted_by(|a, b| b.created_at.cmp(&a.created_at))
211            .collect();
212
213        Ok(sorted)
214    }
215
216    async fn find_by_nonce(
217        &self,
218        relayer_id: &str,
219        nonce: u64,
220    ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
221        let store = Self::acquire_lock(&self.store).await?;
222        let filtered: Vec<TransactionRepoModel> = store
223            .values()
224            .filter(|tx| {
225                tx.relayer_id == relayer_id
226                    && match &tx.network_data {
227                        NetworkTransactionData::Evm(data) => data.nonce == Some(nonce),
228                        _ => false,
229                    }
230            })
231            .cloned()
232            .collect();
233
234        Ok(filtered.into_iter().next())
235    }
236
237    async fn update_status(
238        &self,
239        tx_id: String,
240        status: TransactionStatus,
241    ) -> Result<TransactionRepoModel, RepositoryError> {
242        let update = TransactionUpdateRequest {
243            status: Some(status),
244            ..Default::default()
245        };
246        self.partial_update(tx_id, update).await
247    }
248
249    async fn partial_update(
250        &self,
251        tx_id: String,
252        update: TransactionUpdateRequest,
253    ) -> Result<TransactionRepoModel, RepositoryError> {
254        let mut store = Self::acquire_lock(&self.store).await?;
255
256        if let Some(tx) = store.get_mut(&tx_id) {
257            // Apply partial updates using the model's business logic
258            tx.apply_partial_update(update);
259            Ok(tx.clone())
260        } else {
261            Err(RepositoryError::NotFound(format!(
262                "Transaction with ID {tx_id} not found"
263            )))
264        }
265    }
266
267    async fn update_network_data(
268        &self,
269        tx_id: String,
270        network_data: NetworkTransactionData,
271    ) -> Result<TransactionRepoModel, RepositoryError> {
272        let mut tx = self.get_by_id(tx_id.clone()).await?;
273        tx.network_data = network_data;
274        self.update(tx_id, tx).await
275    }
276
277    async fn set_sent_at(
278        &self,
279        tx_id: String,
280        sent_at: String,
281    ) -> Result<TransactionRepoModel, RepositoryError> {
282        let mut tx = self.get_by_id(tx_id.clone()).await?;
283        tx.sent_at = Some(sent_at);
284        self.update(tx_id, tx).await
285    }
286
287    async fn set_confirmed_at(
288        &self,
289        tx_id: String,
290        confirmed_at: String,
291    ) -> Result<TransactionRepoModel, RepositoryError> {
292        let mut tx = self.get_by_id(tx_id.clone()).await?;
293        tx.confirmed_at = Some(confirmed_at);
294        self.update(tx_id, tx).await
295    }
296}
297
298impl Default for InMemoryTransactionRepository {
299    fn default() -> Self {
300        Self::new()
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
307    use lazy_static::lazy_static;
308    use std::str::FromStr;
309
310    use crate::models::U256;
311
312    use super::*;
313
314    use tokio::sync::Mutex;
315
316    lazy_static! {
317        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
318    }
319    // Helper function to create test transactions
320    fn create_test_transaction(id: &str) -> TransactionRepoModel {
321        TransactionRepoModel {
322            id: id.to_string(),
323            relayer_id: "relayer-1".to_string(),
324            status: TransactionStatus::Pending,
325            status_reason: None,
326            created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
327            sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
328            confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
329            valid_until: None,
330            delete_at: None,
331            network_type: NetworkType::Evm,
332            priced_at: None,
333            hashes: vec![],
334            network_data: NetworkTransactionData::Evm(EvmTransactionData {
335                gas_price: Some(1000000000),
336                gas_limit: Some(21000),
337                nonce: Some(1),
338                value: U256::from_str("1000000000000000000").unwrap(),
339                data: Some("0x".to_string()),
340                from: "0xSender".to_string(),
341                to: Some("0xRecipient".to_string()),
342                chain_id: 1,
343                signature: None,
344                hash: Some(format!("0x{}", id)),
345                speed: Some(Speed::Fast),
346                max_fee_per_gas: None,
347                max_priority_fee_per_gas: None,
348                raw: None,
349            }),
350            noop_count: None,
351            is_canceled: Some(false),
352        }
353    }
354
355    fn create_test_transaction_pending_state(id: &str) -> TransactionRepoModel {
356        TransactionRepoModel {
357            id: id.to_string(),
358            relayer_id: "relayer-1".to_string(),
359            status: TransactionStatus::Pending,
360            status_reason: None,
361            created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
362            sent_at: None,
363            confirmed_at: None,
364            valid_until: None,
365            delete_at: None,
366            network_type: NetworkType::Evm,
367            priced_at: None,
368            hashes: vec![],
369            network_data: NetworkTransactionData::Evm(EvmTransactionData {
370                gas_price: Some(1000000000),
371                gas_limit: Some(21000),
372                nonce: Some(1),
373                value: U256::from_str("1000000000000000000").unwrap(),
374                data: Some("0x".to_string()),
375                from: "0xSender".to_string(),
376                to: Some("0xRecipient".to_string()),
377                chain_id: 1,
378                signature: None,
379                hash: Some(format!("0x{}", id)),
380                speed: Some(Speed::Fast),
381                max_fee_per_gas: None,
382                max_priority_fee_per_gas: None,
383                raw: None,
384            }),
385            noop_count: None,
386            is_canceled: Some(false),
387        }
388    }
389
390    #[tokio::test]
391    async fn test_create_transaction() {
392        let repo = InMemoryTransactionRepository::new();
393        let tx = create_test_transaction("test-1");
394
395        let result = repo.create(tx.clone()).await.unwrap();
396        assert_eq!(result.id, tx.id);
397        assert_eq!(repo.count().await.unwrap(), 1);
398    }
399
400    #[tokio::test]
401    async fn test_get_transaction() {
402        let repo = InMemoryTransactionRepository::new();
403        let tx = create_test_transaction("test-1");
404
405        repo.create(tx.clone()).await.unwrap();
406        let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
407        if let NetworkTransactionData::Evm(stored_data) = &stored.network_data {
408            if let NetworkTransactionData::Evm(tx_data) = &tx.network_data {
409                assert_eq!(stored_data.hash, tx_data.hash);
410            }
411        }
412    }
413
414    #[tokio::test]
415    async fn test_update_transaction() {
416        let repo = InMemoryTransactionRepository::new();
417        let mut tx = create_test_transaction("test-1");
418
419        repo.create(tx.clone()).await.unwrap();
420        tx.status = TransactionStatus::Confirmed;
421
422        let updated = repo.update("test-1".to_string(), tx).await.unwrap();
423        assert!(matches!(updated.status, TransactionStatus::Confirmed));
424    }
425
426    #[tokio::test]
427    async fn test_delete_transaction() {
428        let repo = InMemoryTransactionRepository::new();
429        let tx = create_test_transaction("test-1");
430
431        repo.create(tx).await.unwrap();
432        repo.delete_by_id("test-1".to_string()).await.unwrap();
433
434        let result = repo.get_by_id("test-1".to_string()).await;
435        assert!(result.is_err());
436    }
437
438    #[tokio::test]
439    async fn test_list_all_transactions() {
440        let repo = InMemoryTransactionRepository::new();
441        let tx1 = create_test_transaction("test-1");
442        let tx2 = create_test_transaction("test-2");
443
444        repo.create(tx1).await.unwrap();
445        repo.create(tx2).await.unwrap();
446
447        let transactions = repo.list_all().await.unwrap();
448        assert_eq!(transactions.len(), 2);
449    }
450
451    #[tokio::test]
452    async fn test_count_transactions() {
453        let repo = InMemoryTransactionRepository::new();
454        let tx = create_test_transaction("test-1");
455
456        assert_eq!(repo.count().await.unwrap(), 0);
457        repo.create(tx).await.unwrap();
458        assert_eq!(repo.count().await.unwrap(), 1);
459    }
460
461    #[tokio::test]
462    async fn test_get_nonexistent_transaction() {
463        let repo = InMemoryTransactionRepository::new();
464        let result = repo.get_by_id("nonexistent".to_string()).await;
465        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
466    }
467
468    #[tokio::test]
469    async fn test_duplicate_transaction_creation() {
470        let repo = InMemoryTransactionRepository::new();
471        let tx = create_test_transaction("test-1");
472
473        repo.create(tx.clone()).await.unwrap();
474        let result = repo.create(tx).await;
475
476        assert!(matches!(
477            result,
478            Err(RepositoryError::ConstraintViolation(_))
479        ));
480    }
481
482    #[tokio::test]
483    async fn test_update_nonexistent_transaction() {
484        let repo = InMemoryTransactionRepository::new();
485        let tx = create_test_transaction("test-1");
486
487        let result = repo.update("nonexistent".to_string(), tx).await;
488        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
489    }
490
491    #[tokio::test]
492    async fn test_partial_update() {
493        let repo = InMemoryTransactionRepository::new();
494        let tx = create_test_transaction_pending_state("test-tx-id");
495        repo.create(tx.clone()).await.unwrap();
496
497        // Test updating only status
498        let update1 = TransactionUpdateRequest {
499            status: Some(TransactionStatus::Sent),
500            status_reason: None,
501            sent_at: None,
502            confirmed_at: None,
503            network_data: None,
504            hashes: None,
505            priced_at: None,
506            noop_count: None,
507            is_canceled: None,
508            delete_at: None,
509        };
510        let updated_tx1 = repo
511            .partial_update("test-tx-id".to_string(), update1)
512            .await
513            .unwrap();
514        assert_eq!(updated_tx1.status, TransactionStatus::Sent);
515        assert_eq!(updated_tx1.sent_at, None);
516
517        // Test updating multiple fields
518        let update2 = TransactionUpdateRequest {
519            status: Some(TransactionStatus::Confirmed),
520            status_reason: None,
521            sent_at: Some("2023-01-01T12:00:00Z".to_string()),
522            confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
523            network_data: None,
524            hashes: None,
525            priced_at: None,
526            noop_count: None,
527            is_canceled: None,
528            delete_at: None,
529        };
530        let updated_tx2 = repo
531            .partial_update("test-tx-id".to_string(), update2)
532            .await
533            .unwrap();
534        assert_eq!(updated_tx2.status, TransactionStatus::Confirmed);
535        assert_eq!(
536            updated_tx2.sent_at,
537            Some("2023-01-01T12:00:00Z".to_string())
538        );
539        assert_eq!(
540            updated_tx2.confirmed_at,
541            Some("2023-01-01T12:05:00Z".to_string())
542        );
543
544        // Test updating non-existent transaction
545        let update3 = TransactionUpdateRequest {
546            status: Some(TransactionStatus::Failed),
547            status_reason: None,
548            sent_at: None,
549            confirmed_at: None,
550            network_data: None,
551            hashes: None,
552            priced_at: None,
553            noop_count: None,
554            is_canceled: None,
555            delete_at: None,
556        };
557        let result = repo
558            .partial_update("non-existent-id".to_string(), update3)
559            .await;
560        assert!(result.is_err());
561        assert!(matches!(result.unwrap_err(), RepositoryError::NotFound(_)));
562    }
563
564    #[tokio::test]
565    async fn test_update_status() {
566        let repo = InMemoryTransactionRepository::new();
567        let tx = create_test_transaction("test-1");
568
569        repo.create(tx).await.unwrap();
570
571        // Update status to Confirmed
572        let updated = repo
573            .update_status("test-1".to_string(), TransactionStatus::Confirmed)
574            .await
575            .unwrap();
576
577        // Verify the status was updated in the returned transaction
578        assert_eq!(updated.status, TransactionStatus::Confirmed);
579
580        // Also verify by getting the transaction directly
581        let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
582        assert_eq!(stored.status, TransactionStatus::Confirmed);
583
584        // Update status to Failed
585        let updated = repo
586            .update_status("test-1".to_string(), TransactionStatus::Failed)
587            .await
588            .unwrap();
589
590        // Verify the status was updated
591        assert_eq!(updated.status, TransactionStatus::Failed);
592
593        // Verify updating a non-existent transaction
594        let result = repo
595            .update_status("non-existent".to_string(), TransactionStatus::Confirmed)
596            .await;
597        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
598    }
599
600    #[tokio::test]
601    async fn test_list_paginated() {
602        let repo = InMemoryTransactionRepository::new();
603
604        // Create multiple transactions
605        for i in 1..=10 {
606            let tx = create_test_transaction(&format!("test-{}", i));
607            repo.create(tx).await.unwrap();
608        }
609
610        // Test first page with 3 items per page
611        let query = PaginationQuery {
612            page: 1,
613            per_page: 3,
614        };
615        let result = repo.list_paginated(query).await.unwrap();
616        assert_eq!(result.items.len(), 3);
617        assert_eq!(result.total, 10);
618        assert_eq!(result.page, 1);
619        assert_eq!(result.per_page, 3);
620
621        // Test second page with 3 items per page
622        let query = PaginationQuery {
623            page: 2,
624            per_page: 3,
625        };
626        let result = repo.list_paginated(query).await.unwrap();
627        assert_eq!(result.items.len(), 3);
628        assert_eq!(result.total, 10);
629        assert_eq!(result.page, 2);
630        assert_eq!(result.per_page, 3);
631
632        // Test page with fewer items than per_page
633        let query = PaginationQuery {
634            page: 4,
635            per_page: 3,
636        };
637        let result = repo.list_paginated(query).await.unwrap();
638        assert_eq!(result.items.len(), 1);
639        assert_eq!(result.total, 10);
640        assert_eq!(result.page, 4);
641        assert_eq!(result.per_page, 3);
642
643        // Test empty page (beyond total items)
644        let query = PaginationQuery {
645            page: 5,
646            per_page: 3,
647        };
648        let result = repo.list_paginated(query).await.unwrap();
649        assert_eq!(result.items.len(), 0);
650        assert_eq!(result.total, 10);
651    }
652
653    #[tokio::test]
654    async fn test_find_by_nonce() {
655        let repo = InMemoryTransactionRepository::new();
656
657        // Create transactions with different nonces
658        let tx1 = create_test_transaction("test-1");
659
660        let mut tx2 = create_test_transaction("test-2");
661        if let NetworkTransactionData::Evm(ref mut data) = tx2.network_data {
662            data.nonce = Some(2);
663        }
664
665        let mut tx3 = create_test_transaction("test-3");
666        tx3.relayer_id = "relayer-2".to_string();
667        if let NetworkTransactionData::Evm(ref mut data) = tx3.network_data {
668            data.nonce = Some(1);
669        }
670
671        repo.create(tx1).await.unwrap();
672        repo.create(tx2).await.unwrap();
673        repo.create(tx3).await.unwrap();
674
675        // Test finding transaction with specific relayer_id and nonce
676        let result = repo.find_by_nonce("relayer-1", 1).await.unwrap();
677        assert!(result.is_some());
678        assert_eq!(result.as_ref().unwrap().id, "test-1");
679
680        // Test finding transaction with a different nonce
681        let result = repo.find_by_nonce("relayer-1", 2).await.unwrap();
682        assert!(result.is_some());
683        assert_eq!(result.as_ref().unwrap().id, "test-2");
684
685        // Test finding transaction from a different relayer
686        let result = repo.find_by_nonce("relayer-2", 1).await.unwrap();
687        assert!(result.is_some());
688        assert_eq!(result.as_ref().unwrap().id, "test-3");
689
690        // Test finding transaction that doesn't exist
691        let result = repo.find_by_nonce("relayer-1", 99).await.unwrap();
692        assert!(result.is_none());
693    }
694
695    #[tokio::test]
696    async fn test_update_network_data() {
697        let repo = InMemoryTransactionRepository::new();
698        let tx = create_test_transaction("test-1");
699
700        repo.create(tx.clone()).await.unwrap();
701
702        // Create new network data with updated values
703        let updated_network_data = NetworkTransactionData::Evm(EvmTransactionData {
704            gas_price: Some(2000000000),
705            gas_limit: Some(30000),
706            nonce: Some(2),
707            value: U256::from_str("2000000000000000000").unwrap(),
708            data: Some("0xUpdated".to_string()),
709            from: "0xSender".to_string(),
710            to: Some("0xRecipient".to_string()),
711            chain_id: 1,
712            signature: None,
713            hash: Some("0xUpdated".to_string()),
714            raw: None,
715            speed: None,
716            max_fee_per_gas: None,
717            max_priority_fee_per_gas: None,
718        });
719
720        let updated = repo
721            .update_network_data("test-1".to_string(), updated_network_data)
722            .await
723            .unwrap();
724
725        // Verify the network data was updated
726        if let NetworkTransactionData::Evm(data) = &updated.network_data {
727            assert_eq!(data.gas_price, Some(2000000000));
728            assert_eq!(data.gas_limit, Some(30000));
729            assert_eq!(data.nonce, Some(2));
730            assert_eq!(data.hash, Some("0xUpdated".to_string()));
731            assert_eq!(data.data, Some("0xUpdated".to_string()));
732        } else {
733            panic!("Expected EVM network data");
734        }
735    }
736
737    #[tokio::test]
738    async fn test_set_sent_at() {
739        let repo = InMemoryTransactionRepository::new();
740        let tx = create_test_transaction("test-1");
741
742        repo.create(tx).await.unwrap();
743
744        // Updated sent_at timestamp
745        let new_sent_at = "2025-02-01T10:00:00.000000+00:00".to_string();
746
747        let updated = repo
748            .set_sent_at("test-1".to_string(), new_sent_at.clone())
749            .await
750            .unwrap();
751
752        // Verify the sent_at timestamp was updated
753        assert_eq!(updated.sent_at, Some(new_sent_at.clone()));
754
755        // Also verify by getting the transaction directly
756        let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
757        assert_eq!(stored.sent_at, Some(new_sent_at.clone()));
758    }
759
760    #[tokio::test]
761    async fn test_set_confirmed_at() {
762        let repo = InMemoryTransactionRepository::new();
763        let tx = create_test_transaction("test-1");
764
765        repo.create(tx).await.unwrap();
766
767        // Updated confirmed_at timestamp
768        let new_confirmed_at = "2025-02-01T11:30:45.123456+00:00".to_string();
769
770        let updated = repo
771            .set_confirmed_at("test-1".to_string(), new_confirmed_at.clone())
772            .await
773            .unwrap();
774
775        // Verify the confirmed_at timestamp was updated
776        assert_eq!(updated.confirmed_at, Some(new_confirmed_at.clone()));
777
778        // Also verify by getting the transaction directly
779        let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
780        assert_eq!(stored.confirmed_at, Some(new_confirmed_at.clone()));
781    }
782
783    #[tokio::test]
784    async fn test_find_by_relayer_id() {
785        let repo = InMemoryTransactionRepository::new();
786        let tx1 = create_test_transaction("test-1");
787        let tx2 = create_test_transaction("test-2");
788
789        // Create a transaction with a different relayer_id
790        let mut tx3 = create_test_transaction("test-3");
791        tx3.relayer_id = "relayer-2".to_string();
792
793        repo.create(tx1).await.unwrap();
794        repo.create(tx2).await.unwrap();
795        repo.create(tx3).await.unwrap();
796
797        // Test finding transactions for relayer-1
798        let query = PaginationQuery {
799            page: 1,
800            per_page: 10,
801        };
802        let result = repo
803            .find_by_relayer_id("relayer-1", query.clone())
804            .await
805            .unwrap();
806        assert_eq!(result.total, 2);
807        assert_eq!(result.items.len(), 2);
808        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
809
810        // Test finding transactions for relayer-2
811        let result = repo
812            .find_by_relayer_id("relayer-2", query.clone())
813            .await
814            .unwrap();
815        assert_eq!(result.total, 1);
816        assert_eq!(result.items.len(), 1);
817        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
818
819        // Test finding transactions for non-existent relayer
820        let result = repo
821            .find_by_relayer_id("non-existent", query.clone())
822            .await
823            .unwrap();
824        assert_eq!(result.total, 0);
825        assert_eq!(result.items.len(), 0);
826    }
827
828    #[tokio::test]
829    async fn test_find_by_relayer_id_sorted_by_created_at_newest_first() {
830        let repo = InMemoryTransactionRepository::new();
831
832        // Create transactions with different created_at timestamps
833        let mut tx1 = create_test_transaction("test-1");
834        tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); // Oldest
835
836        let mut tx2 = create_test_transaction("test-2");
837        tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); // Middle
838
839        let mut tx3 = create_test_transaction("test-3");
840        tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); // Newest
841
842        // Create transactions in non-chronological order to ensure sorting works
843        repo.create(tx2.clone()).await.unwrap(); // Middle first
844        repo.create(tx1.clone()).await.unwrap(); // Oldest second
845        repo.create(tx3.clone()).await.unwrap(); // Newest last
846
847        let query = PaginationQuery {
848            page: 1,
849            per_page: 10,
850        };
851        let result = repo.find_by_relayer_id("relayer-1", query).await.unwrap();
852
853        assert_eq!(result.total, 3);
854        assert_eq!(result.items.len(), 3);
855
856        // Verify transactions are sorted by created_at descending (newest first)
857        assert_eq!(
858            result.items[0].id, "test-3",
859            "First item should be newest (test-3)"
860        );
861        assert_eq!(
862            result.items[0].created_at,
863            "2025-01-27T14:00:00.000000+00:00"
864        );
865
866        assert_eq!(
867            result.items[1].id, "test-2",
868            "Second item should be middle (test-2)"
869        );
870        assert_eq!(
871            result.items[1].created_at,
872            "2025-01-27T12:00:00.000000+00:00"
873        );
874
875        assert_eq!(
876            result.items[2].id, "test-1",
877            "Third item should be oldest (test-1)"
878        );
879        assert_eq!(
880            result.items[2].created_at,
881            "2025-01-27T10:00:00.000000+00:00"
882        );
883    }
884
885    #[tokio::test]
886    async fn test_find_by_status() {
887        let repo = InMemoryTransactionRepository::new();
888        let tx1 = create_test_transaction_pending_state("tx1");
889        let mut tx2 = create_test_transaction_pending_state("tx2");
890        tx2.status = TransactionStatus::Submitted;
891        let mut tx3 = create_test_transaction_pending_state("tx3");
892        tx3.relayer_id = "relayer-2".to_string();
893        tx3.status = TransactionStatus::Pending;
894
895        repo.create(tx1.clone()).await.unwrap();
896        repo.create(tx2.clone()).await.unwrap();
897        repo.create(tx3.clone()).await.unwrap();
898
899        // Test finding by single status
900        let pending_txs = repo
901            .find_by_status("relayer-1", &[TransactionStatus::Pending])
902            .await
903            .unwrap();
904        assert_eq!(pending_txs.len(), 1);
905        assert_eq!(pending_txs[0].id, "tx1");
906
907        let submitted_txs = repo
908            .find_by_status("relayer-1", &[TransactionStatus::Submitted])
909            .await
910            .unwrap();
911        assert_eq!(submitted_txs.len(), 1);
912        assert_eq!(submitted_txs[0].id, "tx2");
913
914        // Test finding by multiple statuses
915        let multiple_status_txs = repo
916            .find_by_status(
917                "relayer-1",
918                &[TransactionStatus::Pending, TransactionStatus::Submitted],
919            )
920            .await
921            .unwrap();
922        assert_eq!(multiple_status_txs.len(), 2);
923
924        // Test finding for different relayer
925        let relayer2_pending = repo
926            .find_by_status("relayer-2", &[TransactionStatus::Pending])
927            .await
928            .unwrap();
929        assert_eq!(relayer2_pending.len(), 1);
930        assert_eq!(relayer2_pending[0].id, "tx3");
931
932        // Test finding for non-existent relayer
933        let no_txs = repo
934            .find_by_status("non-existent", &[TransactionStatus::Pending])
935            .await
936            .unwrap();
937        assert_eq!(no_txs.len(), 0);
938    }
939
940    #[tokio::test]
941    async fn test_find_by_status_sorted_by_created_at() {
942        let repo = InMemoryTransactionRepository::new();
943
944        // Helper function to create transaction with custom created_at timestamp
945        let create_tx_with_timestamp = |id: &str, timestamp: &str| -> TransactionRepoModel {
946            let mut tx = create_test_transaction_pending_state(id);
947            tx.created_at = timestamp.to_string();
948            tx.status = TransactionStatus::Pending;
949            tx
950        };
951
952        // Create transactions with different timestamps (out of chronological order)
953        let tx3 = create_tx_with_timestamp("tx3", "2025-01-27T17:00:00.000000+00:00"); // Latest
954        let tx1 = create_tx_with_timestamp("tx1", "2025-01-27T15:00:00.000000+00:00"); // Earliest
955        let tx2 = create_tx_with_timestamp("tx2", "2025-01-27T16:00:00.000000+00:00"); // Middle
956
957        // Create them in reverse chronological order to test sorting
958        repo.create(tx3.clone()).await.unwrap();
959        repo.create(tx1.clone()).await.unwrap();
960        repo.create(tx2.clone()).await.unwrap();
961
962        // Find by status
963        let result = repo
964            .find_by_status("relayer-1", &[TransactionStatus::Pending])
965            .await
966            .unwrap();
967
968        // Verify they are sorted by created_at (newest first)
969        assert_eq!(result.len(), 3);
970        assert_eq!(result[0].id, "tx3"); // Earliest
971        assert_eq!(result[1].id, "tx2"); // Middle
972        assert_eq!(result[2].id, "tx1"); // Latest
973
974        // Verify the timestamps are in descending order
975        assert_eq!(result[0].created_at, "2025-01-27T17:00:00.000000+00:00");
976        assert_eq!(result[1].created_at, "2025-01-27T16:00:00.000000+00:00");
977        assert_eq!(result[2].created_at, "2025-01-27T15:00:00.000000+00:00");
978    }
979
980    #[tokio::test]
981    async fn test_has_entries() {
982        let repo = InMemoryTransactionRepository::new();
983        assert!(!repo.has_entries().await.unwrap());
984
985        let tx = create_test_transaction("test");
986        repo.create(tx.clone()).await.unwrap();
987
988        assert!(repo.has_entries().await.unwrap());
989    }
990
991    #[tokio::test]
992    async fn test_drop_all_entries() {
993        let repo = InMemoryTransactionRepository::new();
994        let tx = create_test_transaction("test");
995        repo.create(tx.clone()).await.unwrap();
996
997        assert!(repo.has_entries().await.unwrap());
998
999        repo.drop_all_entries().await.unwrap();
1000        assert!(!repo.has_entries().await.unwrap());
1001    }
1002
1003    // Tests for delete_at field setting on final status updates
1004
1005    #[tokio::test]
1006    async fn test_update_status_sets_delete_at_for_final_statuses() {
1007        let _lock = ENV_MUTEX.lock().await;
1008
1009        use chrono::{DateTime, Duration, Utc};
1010        use std::env;
1011
1012        // Use a unique test environment variable to avoid conflicts
1013        env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
1014
1015        let repo = InMemoryTransactionRepository::new();
1016
1017        let final_statuses = [
1018            TransactionStatus::Canceled,
1019            TransactionStatus::Confirmed,
1020            TransactionStatus::Failed,
1021            TransactionStatus::Expired,
1022        ];
1023
1024        for (i, status) in final_statuses.iter().enumerate() {
1025            let tx_id = format!("test-final-{}", i);
1026            let tx = create_test_transaction_pending_state(&tx_id);
1027
1028            // Ensure transaction has no delete_at initially
1029            assert!(tx.delete_at.is_none());
1030
1031            repo.create(tx).await.unwrap();
1032
1033            let before_update = Utc::now();
1034
1035            // Update to final status
1036            let updated = repo
1037                .update_status(tx_id.clone(), status.clone())
1038                .await
1039                .unwrap();
1040
1041            // Should have delete_at set
1042            assert!(
1043                updated.delete_at.is_some(),
1044                "delete_at should be set for status: {:?}",
1045                status
1046            );
1047
1048            // Verify the timestamp is reasonable (approximately 6 hours from now)
1049            let delete_at_str = updated.delete_at.unwrap();
1050            let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1051                .expect("delete_at should be valid RFC3339")
1052                .with_timezone(&Utc);
1053
1054            let duration_from_before = delete_at.signed_duration_since(before_update);
1055            let expected_duration = Duration::hours(6);
1056            let tolerance = Duration::minutes(5);
1057
1058            assert!(
1059                duration_from_before >= expected_duration - tolerance &&
1060                duration_from_before <= expected_duration + tolerance,
1061                "delete_at should be approximately 6 hours from now for status: {:?}. Duration: {:?}",
1062                status, duration_from_before
1063            );
1064        }
1065
1066        // Cleanup
1067        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1068    }
1069
1070    #[tokio::test]
1071    async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
1072        let _lock = ENV_MUTEX.lock().await;
1073
1074        use std::env;
1075
1076        env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
1077
1078        let repo = InMemoryTransactionRepository::new();
1079
1080        let non_final_statuses = [
1081            TransactionStatus::Pending,
1082            TransactionStatus::Sent,
1083            TransactionStatus::Submitted,
1084            TransactionStatus::Mined,
1085        ];
1086
1087        for (i, status) in non_final_statuses.iter().enumerate() {
1088            let tx_id = format!("test-non-final-{}", i);
1089            let tx = create_test_transaction_pending_state(&tx_id);
1090
1091            repo.create(tx).await.unwrap();
1092
1093            // Update to non-final status
1094            let updated = repo
1095                .update_status(tx_id.clone(), status.clone())
1096                .await
1097                .unwrap();
1098
1099            // Should NOT have delete_at set
1100            assert!(
1101                updated.delete_at.is_none(),
1102                "delete_at should NOT be set for status: {:?}",
1103                status
1104            );
1105        }
1106
1107        // Cleanup
1108        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1109    }
1110
1111    #[tokio::test]
1112    async fn test_partial_update_sets_delete_at_for_final_statuses() {
1113        let _lock = ENV_MUTEX.lock().await;
1114
1115        use chrono::{DateTime, Duration, Utc};
1116        use std::env;
1117
1118        env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
1119
1120        let repo = InMemoryTransactionRepository::new();
1121        let tx = create_test_transaction_pending_state("test-partial-final");
1122
1123        repo.create(tx).await.unwrap();
1124
1125        let before_update = Utc::now();
1126
1127        // Use partial_update to set status to Confirmed (final status)
1128        let update = TransactionUpdateRequest {
1129            status: Some(TransactionStatus::Confirmed),
1130            status_reason: Some("Transaction completed".to_string()),
1131            confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
1132            ..Default::default()
1133        };
1134
1135        let updated = repo
1136            .partial_update("test-partial-final".to_string(), update)
1137            .await
1138            .unwrap();
1139
1140        // Should have delete_at set
1141        assert!(
1142            updated.delete_at.is_some(),
1143            "delete_at should be set when updating to Confirmed status"
1144        );
1145
1146        // Verify the timestamp is reasonable (approximately 8 hours from now)
1147        let delete_at_str = updated.delete_at.unwrap();
1148        let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1149            .expect("delete_at should be valid RFC3339")
1150            .with_timezone(&Utc);
1151
1152        let duration_from_before = delete_at.signed_duration_since(before_update);
1153        let expected_duration = Duration::hours(8);
1154        let tolerance = Duration::minutes(5);
1155
1156        assert!(
1157            duration_from_before >= expected_duration - tolerance
1158                && duration_from_before <= expected_duration + tolerance,
1159            "delete_at should be approximately 8 hours from now. Duration: {:?}",
1160            duration_from_before
1161        );
1162
1163        // Also verify other fields were updated
1164        assert_eq!(updated.status, TransactionStatus::Confirmed);
1165        assert_eq!(
1166            updated.status_reason,
1167            Some("Transaction completed".to_string())
1168        );
1169        assert_eq!(
1170            updated.confirmed_at,
1171            Some("2023-01-01T12:05:00Z".to_string())
1172        );
1173
1174        // Cleanup
1175        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1176    }
1177
1178    #[tokio::test]
1179    async fn test_update_status_preserves_existing_delete_at() {
1180        let _lock = ENV_MUTEX.lock().await;
1181
1182        use std::env;
1183
1184        env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
1185
1186        let repo = InMemoryTransactionRepository::new();
1187        let mut tx = create_test_transaction_pending_state("test-preserve-delete-at");
1188
1189        // Set an existing delete_at value
1190        let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
1191        tx.delete_at = Some(existing_delete_at.clone());
1192
1193        repo.create(tx).await.unwrap();
1194
1195        // Update to final status
1196        let updated = repo
1197            .update_status(
1198                "test-preserve-delete-at".to_string(),
1199                TransactionStatus::Confirmed,
1200            )
1201            .await
1202            .unwrap();
1203
1204        // Should preserve the existing delete_at value
1205        assert_eq!(
1206            updated.delete_at,
1207            Some(existing_delete_at),
1208            "Existing delete_at should be preserved when updating to final status"
1209        );
1210
1211        // Cleanup
1212        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1213    }
1214
1215    #[tokio::test]
1216    async fn test_partial_update_without_status_change_preserves_delete_at() {
1217        let _lock = ENV_MUTEX.lock().await;
1218
1219        use std::env;
1220
1221        env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
1222
1223        let repo = InMemoryTransactionRepository::new();
1224        let tx = create_test_transaction_pending_state("test-preserve-no-status");
1225
1226        repo.create(tx).await.unwrap();
1227
1228        // First, update to final status to set delete_at
1229        let updated1 = repo
1230            .update_status(
1231                "test-preserve-no-status".to_string(),
1232                TransactionStatus::Confirmed,
1233            )
1234            .await
1235            .unwrap();
1236
1237        assert!(updated1.delete_at.is_some());
1238        let original_delete_at = updated1.delete_at.clone();
1239
1240        // Now update other fields without changing status
1241        let update = TransactionUpdateRequest {
1242            status: None, // No status change
1243            status_reason: Some("Updated reason".to_string()),
1244            confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
1245            ..Default::default()
1246        };
1247
1248        let updated2 = repo
1249            .partial_update("test-preserve-no-status".to_string(), update)
1250            .await
1251            .unwrap();
1252
1253        // delete_at should be preserved
1254        assert_eq!(
1255            updated2.delete_at, original_delete_at,
1256            "delete_at should be preserved when status is not updated"
1257        );
1258
1259        // Other fields should be updated
1260        assert_eq!(updated2.status, TransactionStatus::Confirmed); // Unchanged
1261        assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
1262        assert_eq!(
1263            updated2.confirmed_at,
1264            Some("2023-01-01T12:10:00Z".to_string())
1265        );
1266
1267        // Cleanup
1268        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1269    }
1270
1271    #[tokio::test]
1272    async fn test_update_status_multiple_updates_idempotent() {
1273        let _lock = ENV_MUTEX.lock().await;
1274
1275        use std::env;
1276
1277        env::set_var("TRANSACTION_EXPIRATION_HOURS", "12");
1278
1279        let repo = InMemoryTransactionRepository::new();
1280        let tx = create_test_transaction_pending_state("test-idempotent");
1281
1282        repo.create(tx).await.unwrap();
1283
1284        // First update to final status
1285        let updated1 = repo
1286            .update_status("test-idempotent".to_string(), TransactionStatus::Confirmed)
1287            .await
1288            .unwrap();
1289
1290        assert!(updated1.delete_at.is_some());
1291        let first_delete_at = updated1.delete_at.clone();
1292
1293        // Second update to another final status
1294        let updated2 = repo
1295            .update_status("test-idempotent".to_string(), TransactionStatus::Failed)
1296            .await
1297            .unwrap();
1298
1299        // delete_at should remain the same (idempotent)
1300        assert_eq!(
1301            updated2.delete_at, first_delete_at,
1302            "delete_at should not change on subsequent final status updates"
1303        );
1304
1305        // Status should be updated
1306        assert_eq!(updated2.status, TransactionStatus::Failed);
1307
1308        // Cleanup
1309        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1310    }
1311}