openzeppelin_relayer/repositories/transaction/
transaction_redis.rs

1//! Redis-backed implementation of the TransactionRepository.
2
3use crate::models::{
4    NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
5    TransactionStatus, TransactionUpdateRequest,
6};
7use crate::repositories::redis_base::RedisRepository;
8use crate::repositories::{
9    BatchRetrievalResult, PaginatedResult, Repository, TransactionRepository,
10};
11use async_trait::async_trait;
12use redis::aio::ConnectionManager;
13use redis::AsyncCommands;
14use std::fmt;
15use std::sync::Arc;
16use tracing::{debug, error, warn};
17
18const RELAYER_PREFIX: &str = "relayer";
19const TX_PREFIX: &str = "tx";
20const STATUS_PREFIX: &str = "status";
21const NONCE_PREFIX: &str = "nonce";
22const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
23const RELAYER_LIST_KEY: &str = "relayer_list";
24const TX_BY_CREATED_AT_PREFIX: &str = "tx_by_created_at";
25
26#[derive(Clone)]
27pub struct RedisTransactionRepository {
28    pub client: Arc<ConnectionManager>,
29    pub key_prefix: String,
30}
31
32impl RedisRepository for RedisTransactionRepository {}
33
34impl RedisTransactionRepository {
35    pub fn new(
36        connection_manager: Arc<ConnectionManager>,
37        key_prefix: String,
38    ) -> Result<Self, RepositoryError> {
39        if key_prefix.is_empty() {
40            return Err(RepositoryError::InvalidData(
41                "Redis key prefix cannot be empty".to_string(),
42            ));
43        }
44
45        Ok(Self {
46            client: connection_manager,
47            key_prefix,
48        })
49    }
50
51    /// Generate key for transaction data: relayer:{relayer_id}:tx:{tx_id}
52    fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
53        format!(
54            "{}:{}:{}:{}:{}",
55            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
56        )
57    }
58
59    /// Generate key for reverse lookup: tx_to_relayer:{tx_id}
60    fn tx_to_relayer_key(&self, tx_id: &str) -> String {
61        format!(
62            "{}:{}:{}:{}",
63            self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
64        )
65    }
66
67    /// Generate key for relayer status index: relayer:{relayer_id}:status:{status}
68    fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
69        format!(
70            "{}:{}:{}:{}:{}",
71            self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
72        )
73    }
74
75    /// Generate key for relayer nonce index: relayer:{relayer_id}:nonce:{nonce}
76    fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
77        format!(
78            "{}:{}:{}:{}:{}",
79            self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
80        )
81    }
82
83    /// Generate key for relayer list: relayer_list (set of all relayer IDs)
84    fn relayer_list_key(&self) -> String {
85        format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
86    }
87
88    /// Generate key for relayer's sorted set by created_at: relayer:{relayer_id}:tx_by_created_at
89    fn relayer_tx_by_created_at_key(&self, relayer_id: &str) -> String {
90        format!(
91            "{}:{}:{}:{}",
92            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_BY_CREATED_AT_PREFIX
93        )
94    }
95
96    /// Parse created_at timestamp to score for sorted set (milliseconds since epoch)
97    fn created_at_to_score(&self, created_at: &str) -> f64 {
98        chrono::DateTime::parse_from_rfc3339(created_at)
99            .map(|dt| dt.timestamp_millis() as f64)
100            .unwrap_or_else(|_| {
101                warn!(created_at = %created_at, "failed to parse created_at timestamp, using 0");
102                0.0
103            })
104    }
105
106    /// Batch fetch transactions by IDs using reverse lookup
107    async fn get_transactions_by_ids(
108        &self,
109        ids: &[String],
110    ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
111        if ids.is_empty() {
112            debug!("no transaction IDs provided for batch fetch");
113            return Ok(BatchRetrievalResult {
114                results: vec![],
115                failed_ids: vec![],
116            });
117        }
118
119        let mut conn = self.client.as_ref().clone();
120
121        let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
122
123        debug!(count = %ids.len(), "fetching relayer IDs for transactions");
124
125        let relayer_ids: Vec<Option<String>> = conn
126            .mget(&reverse_keys)
127            .await
128            .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
129
130        let mut tx_keys = Vec::new();
131        let mut valid_ids = Vec::new();
132        let mut failed_ids = Vec::new();
133        for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
134            match relayer_id {
135                Some(relayer_id) => {
136                    tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
137                    valid_ids.push(ids[i].clone());
138                }
139                None => {
140                    warn!(tx_id = %ids[i], "no relayer found for transaction");
141                    failed_ids.push(ids[i].clone());
142                }
143            }
144        }
145
146        if tx_keys.is_empty() {
147            debug!("no valid transactions found for batch fetch");
148            return Ok(BatchRetrievalResult {
149                results: vec![],
150                failed_ids,
151            });
152        }
153
154        debug!(count = %tx_keys.len(), "batch fetching transaction data");
155
156        let values: Vec<Option<String>> = conn
157            .mget(&tx_keys)
158            .await
159            .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
160
161        let mut transactions = Vec::new();
162        let mut failed_count = 0;
163        let mut failed_ids = Vec::new();
164        for (i, value) in values.into_iter().enumerate() {
165            match value {
166                Some(json) => {
167                    match self.deserialize_entity::<TransactionRepoModel>(
168                        &json,
169                        &valid_ids[i],
170                        "transaction",
171                    ) {
172                        Ok(tx) => transactions.push(tx),
173                        Err(e) => {
174                            failed_count += 1;
175                            error!(tx_id = %valid_ids[i], error = %e, "failed to deserialize transaction");
176                            // Continue processing other transactions
177                        }
178                    }
179                }
180                None => {
181                    warn!(tx_id = %valid_ids[i], "transaction not found in batch fetch");
182                    failed_ids.push(valid_ids[i].clone());
183                }
184            }
185        }
186
187        if failed_count > 0 {
188            warn!(failed_count = %failed_count, total_count = %valid_ids.len(), "failed to deserialize transactions in batch");
189        }
190
191        debug!(count = %transactions.len(), "successfully fetched transactions");
192        Ok(BatchRetrievalResult {
193            results: transactions,
194            failed_ids,
195        })
196    }
197
198    /// Extract nonce from EVM transaction data
199    fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
200        match network_data.get_evm_transaction_data() {
201            Ok(tx_data) => tx_data.nonce,
202            Err(_) => {
203                debug!("no EVM transaction data available for nonce extraction");
204                None
205            }
206        }
207    }
208
209    /// Update indexes atomically with comprehensive error handling
210    async fn update_indexes(
211        &self,
212        tx: &TransactionRepoModel,
213        old_tx: Option<&TransactionRepoModel>,
214    ) -> Result<(), RepositoryError> {
215        let mut conn = self.client.as_ref().clone();
216        let mut pipe = redis::pipe();
217        pipe.atomic();
218
219        debug!(tx_id = %tx.id, "updating indexes for transaction");
220
221        // Add relayer to the global relayer list
222        let relayer_list_key = self.relayer_list_key();
223        pipe.sadd(&relayer_list_key, &tx.relayer_id);
224
225        // Handle status index updates
226        let new_status_key = self.relayer_status_key(&tx.relayer_id, &tx.status);
227        pipe.sadd(&new_status_key, &tx.id);
228
229        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
230            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
231            pipe.set(&nonce_key, &tx.id);
232            debug!(tx_id = %tx.id, nonce = %nonce, "added nonce index for transaction");
233        }
234
235        // Add to per-relayer sorted set by created_at (for efficient sorted pagination)
236        let created_at_score = self.created_at_to_score(&tx.created_at);
237        let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
238        pipe.zadd(&relayer_sorted_key, &tx.id, created_at_score);
239        debug!(tx_id = %tx.id, score = %created_at_score, "added transaction to sorted set by created_at");
240
241        // Remove old indexes if updating
242        if let Some(old) = old_tx {
243            if old.status != tx.status {
244                let old_status_key = self.relayer_status_key(&old.relayer_id, &old.status);
245                pipe.srem(&old_status_key, &tx.id);
246                debug!(tx_id = %tx.id, old_status = %old.status, new_status = %tx.status, "removing old status index for transaction");
247            }
248
249            // Handle nonce index cleanup
250            if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
251                let new_nonce = self.extract_nonce(&tx.network_data);
252                if Some(old_nonce) != new_nonce {
253                    let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
254                    pipe.del(&old_nonce_key);
255                    debug!(tx_id = %tx.id, old_nonce = %old_nonce, new_nonce = ?new_nonce, "removing old nonce index for transaction");
256                }
257            }
258        }
259
260        // Execute all operations in a single pipeline
261        pipe.exec_async(&mut conn).await.map_err(|e| {
262            error!(tx_id = %tx.id, error = %e, "index update pipeline failed for transaction");
263            self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
264        })?;
265
266        debug!(tx_id = %tx.id, "successfully updated indexes for transaction");
267        Ok(())
268    }
269
270    /// Remove all indexes with error recovery
271    async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
272        let mut conn = self.client.as_ref().clone();
273        let mut pipe = redis::pipe();
274        pipe.atomic();
275
276        debug!(tx_id = %tx.id, "removing all indexes for transaction");
277
278        // Remove from status index
279        let status_key = self.relayer_status_key(&tx.relayer_id, &tx.status);
280        pipe.srem(&status_key, &tx.id);
281
282        // Remove nonce index if exists
283        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
284            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
285            pipe.del(&nonce_key);
286            debug!(tx_id = %tx.id, nonce = %nonce, "removing nonce index for transaction");
287        }
288
289        // Remove from per-relayer sorted set by created_at
290        let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
291        pipe.zrem(&relayer_sorted_key, &tx.id);
292        debug!(tx_id = %tx.id, "removing transaction from sorted set by created_at");
293
294        // Remove reverse lookup
295        let reverse_key = self.tx_to_relayer_key(&tx.id);
296        pipe.del(&reverse_key);
297
298        pipe.exec_async(&mut conn).await.map_err(|e| {
299            error!(tx_id = %tx.id, error = %e, "index removal failed for transaction");
300            self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
301        })?;
302
303        debug!(tx_id = %tx.id, "successfully removed all indexes for transaction");
304        Ok(())
305    }
306
307    /// Migrate old transactions to sorted set if they exist
308    ///
309    /// Remove this migration function in the future
310    async fn check_and_migrate_if_needed(
311        &self,
312        relayer_id: &str,
313    ) -> Result<Option<u64>, RepositoryError> {
314        let mut conn = self.client.as_ref().clone();
315
316        // Quick check: scan for at least one transaction key to see if migration is needed
317        let pattern = format!(
318            "{}:{}:{}:{}:*",
319            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
320        );
321
322        // Scan for keys (need to iterate through cursor to ensure we check all keys)
323        let mut cursor = 0u64;
324        let mut found_any = false;
325        loop {
326            let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
327                .cursor_arg(cursor)
328                .arg("MATCH")
329                .arg(&pattern)
330                .query_async(&mut conn)
331                .await
332                .map_err(|e| self.map_redis_error(e, "check_and_migrate_scan_check"))?;
333
334            if !keys.is_empty() {
335                found_any = true;
336                break; // Found at least one key, that's enough to know migration is needed
337            }
338
339            cursor = next_cursor;
340            if cursor == 0 {
341                break; // Finished scanning
342            }
343        }
344
345        if !found_any {
346            // No transactions at all
347            debug!(relayer_id = %relayer_id, "no transactions found for relayer");
348            return Ok(None);
349        }
350
351        // Old transactions exist, migrate them to sorted set
352        debug!(relayer_id = %relayer_id, "sorted set is empty but old transactions found, migrating to sorted set");
353        if let Err(e) = self
354            .migrate_relayer_transactions_to_sorted_set(relayer_id)
355            .await
356        {
357            warn!(relayer_id = %relayer_id, error = %e, "failed to migrate transactions");
358            return Err(e);
359        }
360
361        // Re-check sorted set count after migration
362        let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
363        let new_count: u64 = conn
364            .zcard(&relayer_sorted_key)
365            .await
366            .map_err(|e| self.map_redis_error(e, "check_and_migrate_count_after"))?;
367
368        if new_count == 0 {
369            // Migration didn't add anything (maybe all transactions were invalid)
370            debug!(relayer_id = %relayer_id, "migration completed but sorted set still empty");
371            return Ok(None);
372        }
373
374        Ok(Some(new_count))
375    }
376
377    /// Migrate old transactions to the sorted set index (lazy migration)
378    ///
379    /// Remove this migration function in the future
380    async fn migrate_relayer_transactions_to_sorted_set(
381        &self,
382        relayer_id: &str,
383    ) -> Result<usize, RepositoryError> {
384        let mut conn = self.client.as_ref().clone();
385
386        debug!(relayer_id = %relayer_id, "migrating old transactions to sorted set index");
387
388        // Scan for all transaction keys for this relayer
389        let pattern = format!(
390            "{}:{}:{}:{}:*",
391            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
392        );
393        let mut all_tx_ids = Vec::new();
394        let mut cursor = 0;
395
396        loop {
397            let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
398                .cursor_arg(cursor)
399                .arg("MATCH")
400                .arg(&pattern)
401                .query_async(&mut conn)
402                .await
403                .map_err(|e| self.map_redis_error(e, "migrate_relayer_transactions_scan"))?;
404
405            // Extract transaction IDs from keys
406            for key in keys {
407                if let Some(tx_id) = key.split(':').next_back() {
408                    all_tx_ids.push(tx_id.to_string());
409                }
410            }
411
412            cursor = next_cursor;
413            if cursor == 0 {
414                break;
415            }
416        }
417
418        if all_tx_ids.is_empty() {
419            debug!(relayer_id = %relayer_id, "no transactions found to migrate");
420            return Ok(0);
421        }
422
423        // Fetch all transactions
424        let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
425        let transactions = batch_result.results;
426
427        if transactions.is_empty() {
428            debug!(relayer_id = %relayer_id, "no valid transactions found to migrate");
429            return Ok(0);
430        }
431
432        // Add transactions to sorted set using pipeline
433        let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
434        let mut pipe = redis::pipe();
435        pipe.atomic();
436
437        for tx in &transactions {
438            let created_at_score = self.created_at_to_score(&tx.created_at);
439            pipe.zadd(&relayer_sorted_key, &tx.id, created_at_score);
440        }
441
442        pipe.exec_async(&mut conn)
443            .await
444            .map_err(|e| self.map_redis_error(e, "migrate_relayer_transactions_pipeline"))?;
445
446        let migrated_count = transactions.len();
447        debug!(relayer_id = %relayer_id, count = %migrated_count, "successfully migrated transactions to sorted set");
448
449        Ok(migrated_count)
450    }
451
452    /// Fallback method using SCAN for backward compatibility with old transactions
453    /// that don't have entries in the sorted set index
454    /// Remove this fallback method in the future
455    async fn find_by_relayer_id_fallback(
456        &self,
457        relayer_id: &str,
458        query: PaginationQuery,
459    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
460        let mut conn = self.client.as_ref().clone();
461
462        // Scan for all transaction keys for this relayer
463        let pattern = format!(
464            "{}:{}:{}:{}:*",
465            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
466        );
467        let mut all_tx_ids = Vec::new();
468        let mut cursor = 0;
469
470        loop {
471            let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
472                .cursor_arg(cursor)
473                .arg("MATCH")
474                .arg(&pattern)
475                .query_async(&mut conn)
476                .await
477                .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_fallback_scan"))?;
478
479            // Extract transaction IDs from keys
480            for key in keys {
481                if let Some(tx_id) = key.split(':').next_back() {
482                    all_tx_ids.push(tx_id.to_string());
483                }
484            }
485
486            cursor = next_cursor;
487            if cursor == 0 {
488                break;
489            }
490        }
491
492        // Fetch all transactions and sort by created_at (newest first)
493        let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
494        let mut transactions = batch_result.results;
495        transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
496
497        let total = transactions.len() as u64;
498        let start = ((query.page - 1) * query.per_page) as usize;
499        let end = (start + query.per_page as usize).min(transactions.len());
500
501        if start >= transactions.len() {
502            debug!(relayer_id = %relayer_id, page = %query.page, total = %total, "page is beyond available data");
503            return Ok(PaginatedResult {
504                items: vec![],
505                total,
506                page: query.page,
507                per_page: query.per_page,
508            });
509        }
510
511        let items = transactions[start..end].to_vec();
512
513        debug!(relayer_id = %relayer_id, count = %items.len(), page = %query.page, "successfully fetched transactions for relayer using fallback method");
514
515        Ok(PaginatedResult {
516            items,
517            total,
518            page: query.page,
519            per_page: query.per_page,
520        })
521    }
522}
523
524impl fmt::Debug for RedisTransactionRepository {
525    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
526        f.debug_struct("RedisTransactionRepository")
527            .field("client", &"<ConnectionManager>")
528            .field("key_prefix", &self.key_prefix)
529            .finish()
530    }
531}
532
533#[async_trait]
534impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
535    async fn create(
536        &self,
537        entity: TransactionRepoModel,
538    ) -> Result<TransactionRepoModel, RepositoryError> {
539        if entity.id.is_empty() {
540            return Err(RepositoryError::InvalidData(
541                "Transaction ID cannot be empty".to_string(),
542            ));
543        }
544
545        let key = self.tx_key(&entity.relayer_id, &entity.id);
546        let reverse_key = self.tx_to_relayer_key(&entity.id);
547        let mut conn = self.client.as_ref().clone();
548
549        debug!(tx_id = %entity.id, "creating transaction");
550
551        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
552
553        // Check if transaction already exists by checking reverse lookup
554        let existing: Option<String> = conn
555            .get(&reverse_key)
556            .await
557            .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
558
559        if existing.is_some() {
560            return Err(RepositoryError::ConstraintViolation(format!(
561                "Transaction with ID {} already exists",
562                entity.id
563            )));
564        }
565
566        // Use atomic pipeline for consistency
567        let mut pipe = redis::pipe();
568        pipe.atomic();
569        pipe.set(&key, &value);
570        pipe.set(&reverse_key, &entity.relayer_id);
571
572        pipe.exec_async(&mut conn)
573            .await
574            .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
575
576        // Update indexes separately to handle partial failures gracefully
577        if let Err(e) = self.update_indexes(&entity, None).await {
578            error!(tx_id = %entity.id, error = %e, "failed to update indexes for new transaction");
579            return Err(e);
580        }
581
582        debug!(tx_id = %entity.id, "successfully created transaction");
583        Ok(entity)
584    }
585
586    async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
587        if id.is_empty() {
588            return Err(RepositoryError::InvalidData(
589                "Transaction ID cannot be empty".to_string(),
590            ));
591        }
592
593        let mut conn = self.client.as_ref().clone();
594
595        debug!(tx_id = %id, "fetching transaction");
596
597        let reverse_key = self.tx_to_relayer_key(&id);
598        let relayer_id: Option<String> = conn
599            .get(&reverse_key)
600            .await
601            .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
602
603        let relayer_id = match relayer_id {
604            Some(relayer_id) => relayer_id,
605            None => {
606                debug!(tx_id = %id, "transaction not found (no reverse lookup)");
607                return Err(RepositoryError::NotFound(format!(
608                    "Transaction with ID {id} not found"
609                )));
610            }
611        };
612
613        let key = self.tx_key(&relayer_id, &id);
614        let value: Option<String> = conn
615            .get(&key)
616            .await
617            .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
618
619        match value {
620            Some(json) => {
621                let tx =
622                    self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
623                debug!(tx_id = %id, "successfully fetched transaction");
624                Ok(tx)
625            }
626            None => {
627                debug!(tx_id = %id, "transaction not found");
628                Err(RepositoryError::NotFound(format!(
629                    "Transaction with ID {id} not found"
630                )))
631            }
632        }
633    }
634
635    // Unoptimized implementation of list_paginated. Rarely used. find_by_relayer_id is preferred.
636    async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
637        let mut conn = self.client.as_ref().clone();
638
639        debug!("fetching all transactions sorted by created_at (newest first)");
640
641        // Get all relayer IDs
642        let relayer_list_key = self.relayer_list_key();
643        let relayer_ids: Vec<String> = conn
644            .smembers(&relayer_list_key)
645            .await
646            .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
647
648        debug!(count = %relayer_ids.len(), "found relayers");
649
650        // Collect all transactions from all relayers using their sorted sets
651        let mut all_transactions = Vec::new();
652        for relayer_id in relayer_ids {
653            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
654            let tx_ids: Vec<String> = redis::cmd("ZRANGE")
655                .arg(&relayer_sorted_key)
656                .arg(0)
657                .arg(-1)
658                .arg("REV")
659                .query_async(&mut conn)
660                .await
661                .map_err(|e| self.map_redis_error(e, "list_all_relayer_sorted"))?;
662
663            let batch_result = self.get_transactions_by_ids(&tx_ids).await?;
664            all_transactions.extend(batch_result.results);
665        }
666
667        // Sort all transactions by created_at (newest first)
668        all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
669
670        debug!(count = %all_transactions.len(), "found transactions");
671        Ok(all_transactions)
672    }
673
674    // Unoptimized implementation of list_paginated. Rarely used. find_by_relayer_id is preferred.
675    async fn list_paginated(
676        &self,
677        query: PaginationQuery,
678    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
679        if query.per_page == 0 {
680            return Err(RepositoryError::InvalidData(
681                "per_page must be greater than 0".to_string(),
682            ));
683        }
684
685        let mut conn = self.client.as_ref().clone();
686
687        debug!(page = %query.page, per_page = %query.per_page, "fetching paginated transactions sorted by created_at (newest first)");
688
689        // Get all relayer IDs
690        let relayer_list_key = self.relayer_list_key();
691        let relayer_ids: Vec<String> = conn
692            .smembers(&relayer_list_key)
693            .await
694            .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
695
696        // Collect all transactions from all relayers using their sorted sets
697        let mut all_transactions = Vec::new();
698        for relayer_id in relayer_ids {
699            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
700            let tx_ids: Vec<String> = redis::cmd("ZRANGE")
701                .arg(&relayer_sorted_key)
702                .arg(0)
703                .arg(-1)
704                .arg("REV")
705                .query_async(&mut conn)
706                .await
707                .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_sorted"))?;
708
709            let batch_result = self.get_transactions_by_ids(&tx_ids).await?;
710            all_transactions.extend(batch_result.results);
711        }
712
713        // Sort all transactions by created_at (newest first)
714        all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
715
716        let total = all_transactions.len() as u64;
717        let start = ((query.page - 1) * query.per_page) as usize;
718        let end = (start + query.per_page as usize).min(all_transactions.len());
719
720        if start >= all_transactions.len() {
721            debug!(page = %query.page, total = %total, "page is beyond available data");
722            return Ok(PaginatedResult {
723                items: vec![],
724                total,
725                page: query.page,
726                per_page: query.per_page,
727            });
728        }
729
730        let items = all_transactions[start..end].to_vec();
731
732        debug!(count = %items.len(), page = %query.page, "successfully fetched transactions for page");
733
734        Ok(PaginatedResult {
735            items,
736            total,
737            page: query.page,
738            per_page: query.per_page,
739        })
740    }
741
742    async fn update(
743        &self,
744        id: String,
745        entity: TransactionRepoModel,
746    ) -> Result<TransactionRepoModel, RepositoryError> {
747        if id.is_empty() {
748            return Err(RepositoryError::InvalidData(
749                "Transaction ID cannot be empty".to_string(),
750            ));
751        }
752
753        debug!(tx_id = %id, "updating transaction");
754
755        // Get the old transaction for index cleanup
756        let old_tx = self.get_by_id(id.clone()).await?;
757
758        let key = self.tx_key(&entity.relayer_id, &id);
759        let mut conn = self.client.as_ref().clone();
760
761        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
762
763        // Update transaction
764        let _: () = conn
765            .set(&key, value)
766            .await
767            .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
768
769        // Update indexes
770        self.update_indexes(&entity, Some(&old_tx)).await?;
771
772        debug!(tx_id = %id, "successfully updated transaction");
773        Ok(entity)
774    }
775
776    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
777        if id.is_empty() {
778            return Err(RepositoryError::InvalidData(
779                "Transaction ID cannot be empty".to_string(),
780            ));
781        }
782
783        debug!(tx_id = %id, "deleting transaction");
784
785        // Get transaction first for index cleanup
786        let tx = self.get_by_id(id.clone()).await?;
787
788        let key = self.tx_key(&tx.relayer_id, &id);
789        let reverse_key = self.tx_to_relayer_key(&id);
790        let mut conn = self.client.as_ref().clone();
791
792        let mut pipe = redis::pipe();
793        pipe.atomic();
794        pipe.del(&key);
795        pipe.del(&reverse_key);
796
797        pipe.exec_async(&mut conn)
798            .await
799            .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
800
801        // Remove indexes (log errors but don't fail the delete)
802        if let Err(e) = self.remove_all_indexes(&tx).await {
803            error!(tx_id = %id, error = %e, "failed to remove indexes for deleted transaction");
804        }
805
806        debug!(tx_id = %id, "successfully deleted transaction");
807        Ok(())
808    }
809
810    // Unoptimized implementation of count. Rarely used. find_by_relayer_id is preferred.
811    async fn count(&self) -> Result<usize, RepositoryError> {
812        let mut conn = self.client.as_ref().clone();
813
814        debug!("counting transactions");
815
816        // Get all relayer IDs and sum their sorted set counts
817        let relayer_list_key = self.relayer_list_key();
818        let relayer_ids: Vec<String> = conn
819            .smembers(&relayer_list_key)
820            .await
821            .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
822
823        let mut total_count = 0usize;
824        for relayer_id in relayer_ids {
825            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
826            let count: usize = conn
827                .zcard(&relayer_sorted_key)
828                .await
829                .map_err(|e| self.map_redis_error(e, "count_relayer_transactions"))?;
830            total_count += count;
831        }
832
833        debug!(count = %total_count, "transaction count");
834        Ok(total_count)
835    }
836
837    async fn has_entries(&self) -> Result<bool, RepositoryError> {
838        let mut conn = self.client.as_ref().clone();
839        let relayer_list_key = self.relayer_list_key();
840
841        debug!("checking if transaction entries exist");
842
843        let exists: bool = conn
844            .exists(&relayer_list_key)
845            .await
846            .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
847
848        debug!(exists = %exists, "transaction entries exist");
849        Ok(exists)
850    }
851
852    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
853        let mut conn = self.client.as_ref().clone();
854        let relayer_list_key = self.relayer_list_key();
855
856        debug!("dropping all transaction entries");
857
858        // Get all relayer IDs first
859        let relayer_ids: Vec<String> = conn
860            .smembers(&relayer_list_key)
861            .await
862            .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
863
864        if relayer_ids.is_empty() {
865            debug!("no transaction entries to drop");
866            return Ok(());
867        }
868
869        // Use pipeline for atomic operations
870        let mut pipe = redis::pipe();
871        pipe.atomic();
872
873        // Delete all transactions and their indexes for each relayer
874        for relayer_id in &relayer_ids {
875            // Get all transaction IDs for this relayer
876            let pattern = format!(
877                "{}:{}:{}:{}:*",
878                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
879            );
880            let mut cursor = 0;
881            let mut tx_ids = Vec::new();
882
883            loop {
884                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
885                    .cursor_arg(cursor)
886                    .arg("MATCH")
887                    .arg(&pattern)
888                    .query_async(&mut conn)
889                    .await
890                    .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
891
892                // Extract transaction IDs from keys and delete keys
893                for key in keys {
894                    pipe.del(&key);
895                    if let Some(tx_id) = key.split(':').next_back() {
896                        tx_ids.push(tx_id.to_string());
897                    }
898                }
899
900                cursor = next_cursor;
901                if cursor == 0 {
902                    break;
903                }
904            }
905
906            // Delete reverse lookup keys and indexes
907            for tx_id in tx_ids {
908                let reverse_key = self.tx_to_relayer_key(&tx_id);
909                pipe.del(&reverse_key);
910
911                // Delete status indexes (we can't know the specific status, so we'll clean up known ones)
912                for status in &[
913                    TransactionStatus::Pending,
914                    TransactionStatus::Sent,
915                    TransactionStatus::Confirmed,
916                    TransactionStatus::Failed,
917                    TransactionStatus::Canceled,
918                ] {
919                    let status_key = self.relayer_status_key(relayer_id, status);
920                    pipe.srem(&status_key, &tx_id);
921                }
922            }
923
924            // Delete the relayer's sorted set by created_at
925            let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
926            pipe.del(&relayer_sorted_key);
927        }
928
929        // Delete the relayer list key
930        pipe.del(&relayer_list_key);
931
932        pipe.exec_async(&mut conn)
933            .await
934            .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
935
936        debug!(count = %relayer_ids.len(), "dropped all transaction entries for relayers");
937        Ok(())
938    }
939}
940
941#[async_trait]
942impl TransactionRepository for RedisTransactionRepository {
943    async fn find_by_relayer_id(
944        &self,
945        relayer_id: &str,
946        query: PaginationQuery,
947    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
948        let mut conn = self.client.as_ref().clone();
949
950        debug!(relayer_id = %relayer_id, page = %query.page, per_page = %query.per_page, "fetching transactions for relayer sorted by created_at (newest first)");
951
952        let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
953
954        // Get total count from relayer's sorted set
955        let mut sorted_set_count: u64 = conn
956            .zcard(&relayer_sorted_key)
957            .await
958            .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_count"))?;
959
960        // Check if migration is needed to new sorted set index and perform it if necessary
961        // TODO: Remove migration check in the future
962        if sorted_set_count == 0 {
963            match self.check_and_migrate_if_needed(relayer_id).await {
964                Ok(Some(count)) => {
965                    sorted_set_count = count;
966                }
967                Ok(None) => {
968                    // No transactions exist or migration didn't add anything
969                    // Try fallback to see if there are any transactions via SCAN
970                    return self.find_by_relayer_id_fallback(relayer_id, query).await;
971                }
972                Err(_) => {
973                    // Migration failed, fall back to SCAN method
974                    warn!(relayer_id = %relayer_id, "migration failed, falling back to SCAN method");
975                    return self.find_by_relayer_id_fallback(relayer_id, query).await;
976                }
977            }
978        }
979
980        let total = sorted_set_count;
981
982        // Calculate pagination range (0-indexed for Redis ZRANGE with REV)
983        let start = ((query.page - 1) * query.per_page) as isize;
984        let end = start + query.per_page as isize - 1;
985
986        if start as u64 >= total {
987            debug!(relayer_id = %relayer_id, page = %query.page, total = %total, "page is beyond available data");
988            return Ok(PaginatedResult {
989                items: vec![],
990                total,
991                page: query.page,
992                per_page: query.per_page,
993            });
994        }
995
996        // Get page of transaction IDs from sorted set (newest first using ZRANGE with REV)
997        let page_ids: Vec<String> = redis::cmd("ZRANGE")
998            .arg(&relayer_sorted_key)
999            .arg(start)
1000            .arg(end)
1001            .arg("REV")
1002            .query_async(&mut conn)
1003            .await
1004            .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_sorted"))?;
1005
1006        let items = self.get_transactions_by_ids(&page_ids).await?;
1007
1008        debug!(relayer_id = %relayer_id, count = %items.results.len(), page = %query.page, "successfully fetched transactions for relayer");
1009
1010        Ok(PaginatedResult {
1011            items: items.results,
1012            total,
1013            page: query.page,
1014            per_page: query.per_page,
1015        })
1016    }
1017
1018    async fn find_by_status(
1019        &self,
1020        relayer_id: &str,
1021        statuses: &[TransactionStatus],
1022    ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
1023        let mut conn = self.client.as_ref().clone();
1024        let mut all_ids = Vec::new();
1025
1026        // Collect IDs from all status sets
1027        for status in statuses {
1028            let status_key = self.relayer_status_key(relayer_id, status);
1029            let ids: Vec<String> = conn
1030                .smembers(status_key)
1031                .await
1032                .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
1033
1034            all_ids.extend(ids);
1035        }
1036
1037        // Remove duplicates and batch fetch
1038        all_ids.sort();
1039        all_ids.dedup();
1040
1041        let mut transactions = self.get_transactions_by_ids(&all_ids).await?;
1042
1043        // Sort by created_at descending (newest first)
1044        transactions
1045            .results
1046            .sort_by(|a, b| b.created_at.cmp(&a.created_at));
1047
1048        Ok(transactions.results)
1049    }
1050
1051    async fn find_by_nonce(
1052        &self,
1053        relayer_id: &str,
1054        nonce: u64,
1055    ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
1056        let mut conn = self.client.as_ref().clone();
1057        let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
1058
1059        // Get transaction ID with this nonce for this relayer (should be single value)
1060        let tx_id: Option<String> = conn
1061            .get(nonce_key)
1062            .await
1063            .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
1064
1065        match tx_id {
1066            Some(tx_id) => {
1067                match self.get_by_id(tx_id.clone()).await {
1068                    Ok(tx) => Ok(Some(tx)),
1069                    Err(RepositoryError::NotFound(_)) => {
1070                        // Transaction was deleted but index wasn't cleaned up
1071                        warn!(relayer_id = %relayer_id, nonce = %nonce, "stale nonce index found for relayer");
1072                        Ok(None)
1073                    }
1074                    Err(e) => Err(e),
1075                }
1076            }
1077            None => Ok(None),
1078        }
1079    }
1080
1081    async fn update_status(
1082        &self,
1083        tx_id: String,
1084        status: TransactionStatus,
1085    ) -> Result<TransactionRepoModel, RepositoryError> {
1086        let update = TransactionUpdateRequest {
1087            status: Some(status),
1088            ..Default::default()
1089        };
1090        self.partial_update(tx_id, update).await
1091    }
1092
1093    async fn partial_update(
1094        &self,
1095        tx_id: String,
1096        update: TransactionUpdateRequest,
1097    ) -> Result<TransactionRepoModel, RepositoryError> {
1098        const MAX_RETRIES: u32 = 3;
1099        const BACKOFF_MS: u64 = 100;
1100
1101        // Fetch the original transaction state ONCE before retrying.
1102        // This is critical: if conn.set() succeeds but update_indexes() fails,
1103        // subsequent retries must still reference the original state to remove
1104        // stale index entries. Otherwise, get_by_id() returns the already-updated
1105        // record and update_indexes() skips removing the old indexes.
1106        let original_tx = self.get_by_id(tx_id.clone()).await?;
1107        let mut updated_tx = original_tx.clone();
1108        updated_tx.apply_partial_update(update.clone());
1109
1110        let key = self.tx_key(&updated_tx.relayer_id, &tx_id);
1111        let value = self.serialize_entity(&updated_tx, |t| &t.id, "transaction")?;
1112
1113        let mut last_error = None;
1114
1115        for attempt in 0..MAX_RETRIES {
1116            let mut conn = self.client.as_ref().clone();
1117
1118            // Try to update transaction data
1119            let result: Result<(), _> = conn.set(&key, &value).await;
1120            match result {
1121                Ok(_) => {}
1122                Err(e) => {
1123                    if attempt < MAX_RETRIES - 1 {
1124                        warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed to set transaction data, retrying");
1125                        last_error = Some(self.map_redis_error(e, "partial_update"));
1126                        tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
1127                        continue;
1128                    }
1129                    return Err(self.map_redis_error(e, "partial_update"));
1130                }
1131            }
1132
1133            // Try to update indexes with the original pre-update state
1134            // This ensures stale indexes are removed even on retry attempts
1135            match self.update_indexes(&updated_tx, Some(&original_tx)).await {
1136                Ok(_) => {
1137                    debug!(tx_id = %tx_id, attempt = %attempt, "successfully updated transaction");
1138                    return Ok(updated_tx);
1139                }
1140                Err(e) if attempt < MAX_RETRIES - 1 => {
1141                    warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed to update indexes, retrying");
1142                    last_error = Some(e);
1143                    tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
1144                    continue;
1145                }
1146                Err(e) => return Err(e),
1147            }
1148        }
1149
1150        Err(last_error.unwrap_or_else(|| {
1151            RepositoryError::UnexpectedError("partial_update exhausted retries".to_string())
1152        }))
1153    }
1154
1155    async fn update_network_data(
1156        &self,
1157        tx_id: String,
1158        network_data: NetworkTransactionData,
1159    ) -> Result<TransactionRepoModel, RepositoryError> {
1160        let update = TransactionUpdateRequest {
1161            network_data: Some(network_data),
1162            ..Default::default()
1163        };
1164        self.partial_update(tx_id, update).await
1165    }
1166
1167    async fn set_sent_at(
1168        &self,
1169        tx_id: String,
1170        sent_at: String,
1171    ) -> Result<TransactionRepoModel, RepositoryError> {
1172        let update = TransactionUpdateRequest {
1173            sent_at: Some(sent_at),
1174            ..Default::default()
1175        };
1176        self.partial_update(tx_id, update).await
1177    }
1178
1179    async fn set_confirmed_at(
1180        &self,
1181        tx_id: String,
1182        confirmed_at: String,
1183    ) -> Result<TransactionRepoModel, RepositoryError> {
1184        let update = TransactionUpdateRequest {
1185            confirmed_at: Some(confirmed_at),
1186            ..Default::default()
1187        };
1188        self.partial_update(tx_id, update).await
1189    }
1190}
1191
1192#[cfg(test)]
1193mod tests {
1194    use super::*;
1195    use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
1196    use alloy::primitives::U256;
1197    use lazy_static::lazy_static;
1198    use redis::Client;
1199    use std::str::FromStr;
1200    use tokio;
1201    use uuid::Uuid;
1202
1203    use tokio::sync::Mutex;
1204
1205    // Use a mutex to ensure tests don't run in parallel when modifying env vars
1206    lazy_static! {
1207        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
1208    }
1209
1210    // Helper function to create test transactions
1211    fn create_test_transaction(id: &str) -> TransactionRepoModel {
1212        TransactionRepoModel {
1213            id: id.to_string(),
1214            relayer_id: "relayer-1".to_string(),
1215            status: TransactionStatus::Pending,
1216            status_reason: None,
1217            created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
1218            sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
1219            confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
1220            valid_until: None,
1221            delete_at: None,
1222            network_type: NetworkType::Evm,
1223            priced_at: None,
1224            hashes: vec![],
1225            network_data: NetworkTransactionData::Evm(EvmTransactionData {
1226                gas_price: Some(1000000000),
1227                gas_limit: Some(21000),
1228                nonce: Some(1),
1229                value: U256::from_str("1000000000000000000").unwrap(),
1230                data: Some("0x".to_string()),
1231                from: "0xSender".to_string(),
1232                to: Some("0xRecipient".to_string()),
1233                chain_id: 1,
1234                signature: None,
1235                hash: Some(format!("0x{}", id)),
1236                speed: Some(Speed::Fast),
1237                max_fee_per_gas: None,
1238                max_priority_fee_per_gas: None,
1239                raw: None,
1240            }),
1241            noop_count: None,
1242            is_canceled: Some(false),
1243        }
1244    }
1245
1246    fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
1247        let mut tx = create_test_transaction(id);
1248        tx.relayer_id = relayer_id.to_string();
1249        tx
1250    }
1251
1252    fn create_test_transaction_with_status(
1253        id: &str,
1254        relayer_id: &str,
1255        status: TransactionStatus,
1256    ) -> TransactionRepoModel {
1257        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1258        tx.status = status;
1259        tx
1260    }
1261
1262    fn create_test_transaction_with_nonce(
1263        id: &str,
1264        nonce: u64,
1265        relayer_id: &str,
1266    ) -> TransactionRepoModel {
1267        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1268        if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
1269            evm_data.nonce = Some(nonce);
1270        }
1271        tx
1272    }
1273
1274    async fn setup_test_repo() -> RedisTransactionRepository {
1275        // Use a mock Redis URL - in real integration tests, this would connect to a test Redis instance
1276        let redis_url = std::env::var("REDIS_TEST_URL")
1277            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1278
1279        let client = Client::open(redis_url).expect("Failed to create Redis client");
1280        let connection_manager = ConnectionManager::new(client)
1281            .await
1282            .expect("Failed to create connection manager");
1283
1284        let random_id = Uuid::new_v4().to_string();
1285        let key_prefix = format!("test_prefix:{}", random_id);
1286
1287        RedisTransactionRepository::new(Arc::new(connection_manager), key_prefix)
1288            .expect("Failed to create RedisTransactionRepository")
1289    }
1290
1291    #[tokio::test]
1292    #[ignore = "Requires active Redis instance"]
1293    async fn test_new_repository_creation() {
1294        let repo = setup_test_repo().await;
1295        assert!(repo.key_prefix.contains("test_prefix"));
1296    }
1297
1298    #[tokio::test]
1299    #[ignore = "Requires active Redis instance"]
1300    async fn test_new_repository_empty_prefix_fails() {
1301        let redis_url = std::env::var("REDIS_TEST_URL")
1302            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1303        let client = Client::open(redis_url).expect("Failed to create Redis client");
1304        let connection_manager = ConnectionManager::new(client)
1305            .await
1306            .expect("Failed to create connection manager");
1307
1308        let result = RedisTransactionRepository::new(Arc::new(connection_manager), "".to_string());
1309        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1310    }
1311
1312    #[tokio::test]
1313    #[ignore = "Requires active Redis instance"]
1314    async fn test_key_generation() {
1315        let repo = setup_test_repo().await;
1316
1317        assert!(repo
1318            .tx_key("relayer-1", "test-id")
1319            .contains(":relayer:relayer-1:tx:test-id"));
1320        assert!(repo
1321            .tx_to_relayer_key("test-id")
1322            .contains(":relayer:tx_to_relayer:test-id"));
1323        assert!(repo.relayer_list_key().contains(":relayer_list"));
1324        assert!(repo
1325            .relayer_status_key("relayer-1", &TransactionStatus::Pending)
1326            .contains(":relayer:relayer-1:status:Pending"));
1327        assert!(repo
1328            .relayer_nonce_key("relayer-1", 42)
1329            .contains(":relayer:relayer-1:nonce:42"));
1330    }
1331
1332    #[tokio::test]
1333    #[ignore = "Requires active Redis instance"]
1334    async fn test_serialize_deserialize_transaction() {
1335        let repo = setup_test_repo().await;
1336        let tx = create_test_transaction("test-1");
1337
1338        let serialized = repo
1339            .serialize_entity(&tx, |t| &t.id, "transaction")
1340            .expect("Serialization should succeed");
1341        let deserialized: TransactionRepoModel = repo
1342            .deserialize_entity(&serialized, "test-1", "transaction")
1343            .expect("Deserialization should succeed");
1344
1345        assert_eq!(tx.id, deserialized.id);
1346        assert_eq!(tx.relayer_id, deserialized.relayer_id);
1347        assert_eq!(tx.status, deserialized.status);
1348    }
1349
1350    #[tokio::test]
1351    #[ignore = "Requires active Redis instance"]
1352    async fn test_extract_nonce() {
1353        let repo = setup_test_repo().await;
1354        let random_id = Uuid::new_v4().to_string();
1355        let relayer_id = Uuid::new_v4().to_string();
1356        let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1357
1358        let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
1359        assert_eq!(nonce, Some(42));
1360    }
1361
1362    #[tokio::test]
1363    #[ignore = "Requires active Redis instance"]
1364    async fn test_create_transaction() {
1365        let repo = setup_test_repo().await;
1366        let random_id = Uuid::new_v4().to_string();
1367        let tx = create_test_transaction(&random_id);
1368
1369        let result = repo.create(tx.clone()).await.unwrap();
1370        assert_eq!(result.id, tx.id);
1371    }
1372
1373    #[tokio::test]
1374    #[ignore = "Requires active Redis instance"]
1375    async fn test_get_transaction() {
1376        let repo = setup_test_repo().await;
1377        let random_id = Uuid::new_v4().to_string();
1378        let tx = create_test_transaction(&random_id);
1379
1380        repo.create(tx.clone()).await.unwrap();
1381        let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
1382        assert_eq!(stored.id, tx.id);
1383        assert_eq!(stored.relayer_id, tx.relayer_id);
1384    }
1385
1386    #[tokio::test]
1387    #[ignore = "Requires active Redis instance"]
1388    async fn test_update_transaction() {
1389        let repo = setup_test_repo().await;
1390        let random_id = Uuid::new_v4().to_string();
1391        let mut tx = create_test_transaction(&random_id);
1392
1393        repo.create(tx.clone()).await.unwrap();
1394        tx.status = TransactionStatus::Confirmed;
1395
1396        let updated = repo.update(random_id.to_string(), tx).await.unwrap();
1397        assert!(matches!(updated.status, TransactionStatus::Confirmed));
1398    }
1399
1400    #[tokio::test]
1401    #[ignore = "Requires active Redis instance"]
1402    async fn test_delete_transaction() {
1403        let repo = setup_test_repo().await;
1404        let random_id = Uuid::new_v4().to_string();
1405        let tx = create_test_transaction(&random_id);
1406
1407        repo.create(tx).await.unwrap();
1408        repo.delete_by_id(random_id.to_string()).await.unwrap();
1409
1410        let result = repo.get_by_id(random_id.to_string()).await;
1411        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1412    }
1413
1414    #[tokio::test]
1415    #[ignore = "Requires active Redis instance"]
1416    async fn test_list_all_transactions() {
1417        let repo = setup_test_repo().await;
1418        let random_id = Uuid::new_v4().to_string();
1419        let random_id2 = Uuid::new_v4().to_string();
1420
1421        let tx1 = create_test_transaction(&random_id);
1422        let tx2 = create_test_transaction(&random_id2);
1423
1424        repo.create(tx1).await.unwrap();
1425        repo.create(tx2).await.unwrap();
1426
1427        let transactions = repo.list_all().await.unwrap();
1428        assert!(transactions.len() >= 2);
1429    }
1430
1431    #[tokio::test]
1432    #[ignore = "Requires active Redis instance"]
1433    async fn test_count_transactions() {
1434        let repo = setup_test_repo().await;
1435        let random_id = Uuid::new_v4().to_string();
1436        let tx = create_test_transaction(&random_id);
1437
1438        let count = repo.count().await.unwrap();
1439        repo.create(tx).await.unwrap();
1440        assert!(repo.count().await.unwrap() > count);
1441    }
1442
1443    #[tokio::test]
1444    #[ignore = "Requires active Redis instance"]
1445    async fn test_get_nonexistent_transaction() {
1446        let repo = setup_test_repo().await;
1447        let result = repo.get_by_id("nonexistent".to_string()).await;
1448        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1449    }
1450
1451    #[tokio::test]
1452    #[ignore = "Requires active Redis instance"]
1453    async fn test_duplicate_transaction_creation() {
1454        let repo = setup_test_repo().await;
1455        let random_id = Uuid::new_v4().to_string();
1456
1457        let tx = create_test_transaction(&random_id);
1458
1459        repo.create(tx.clone()).await.unwrap();
1460        let result = repo.create(tx).await;
1461
1462        assert!(matches!(
1463            result,
1464            Err(RepositoryError::ConstraintViolation(_))
1465        ));
1466    }
1467
1468    #[tokio::test]
1469    #[ignore = "Requires active Redis instance"]
1470    async fn test_update_nonexistent_transaction() {
1471        let repo = setup_test_repo().await;
1472        let tx = create_test_transaction("test-1");
1473
1474        let result = repo.update("nonexistent".to_string(), tx).await;
1475        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1476    }
1477
1478    #[tokio::test]
1479    #[ignore = "Requires active Redis instance"]
1480    async fn test_list_paginated() {
1481        let repo = setup_test_repo().await;
1482
1483        // Create multiple transactions
1484        for _ in 1..=10 {
1485            let random_id = Uuid::new_v4().to_string();
1486            let tx = create_test_transaction(&random_id);
1487            repo.create(tx).await.unwrap();
1488        }
1489
1490        // Test first page with 3 items per page
1491        let query = PaginationQuery {
1492            page: 1,
1493            per_page: 3,
1494        };
1495        let result = repo.list_paginated(query).await.unwrap();
1496        assert_eq!(result.items.len(), 3);
1497        assert!(result.total >= 10);
1498        assert_eq!(result.page, 1);
1499        assert_eq!(result.per_page, 3);
1500
1501        // Test empty page (beyond total items)
1502        let query = PaginationQuery {
1503            page: 1000,
1504            per_page: 3,
1505        };
1506        let result = repo.list_paginated(query).await.unwrap();
1507        assert_eq!(result.items.len(), 0);
1508    }
1509
1510    #[tokio::test]
1511    #[ignore = "Requires active Redis instance"]
1512    async fn test_find_by_relayer_id() {
1513        let repo = setup_test_repo().await;
1514        let random_id = Uuid::new_v4().to_string();
1515        let random_id2 = Uuid::new_v4().to_string();
1516        let random_id3 = Uuid::new_v4().to_string();
1517
1518        let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
1519        let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
1520        let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
1521
1522        repo.create(tx1).await.unwrap();
1523        repo.create(tx2).await.unwrap();
1524        repo.create(tx3).await.unwrap();
1525
1526        // Test finding transactions for relayer-1
1527        let query = PaginationQuery {
1528            page: 1,
1529            per_page: 10,
1530        };
1531        let result = repo
1532            .find_by_relayer_id("relayer-1", query.clone())
1533            .await
1534            .unwrap();
1535        assert!(result.total >= 2);
1536        assert!(result.items.len() >= 2);
1537        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
1538
1539        // Test finding transactions for relayer-2
1540        let result = repo
1541            .find_by_relayer_id("relayer-2", query.clone())
1542            .await
1543            .unwrap();
1544        assert!(result.total >= 1);
1545        assert!(!result.items.is_empty());
1546        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
1547
1548        // Test finding transactions for non-existent relayer
1549        let result = repo
1550            .find_by_relayer_id("non-existent", query.clone())
1551            .await
1552            .unwrap();
1553        assert_eq!(result.total, 0);
1554        assert_eq!(result.items.len(), 0);
1555    }
1556
1557    #[tokio::test]
1558    #[ignore = "Requires active Redis instance"]
1559    async fn test_find_by_relayer_id_sorted_by_created_at_newest_first() {
1560        let repo = setup_test_repo().await;
1561        let relayer_id = Uuid::new_v4().to_string();
1562
1563        // Create transactions with different created_at timestamps
1564        let mut tx1 = create_test_transaction_with_relayer("test-1", &relayer_id);
1565        tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); // Oldest
1566
1567        let mut tx2 = create_test_transaction_with_relayer("test-2", &relayer_id);
1568        tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); // Middle
1569
1570        let mut tx3 = create_test_transaction_with_relayer("test-3", &relayer_id);
1571        tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); // Newest
1572
1573        // Create transactions in non-chronological order to ensure sorting works
1574        repo.create(tx2.clone()).await.unwrap(); // Middle first
1575        repo.create(tx1.clone()).await.unwrap(); // Oldest second
1576        repo.create(tx3.clone()).await.unwrap(); // Newest last
1577
1578        let query = PaginationQuery {
1579            page: 1,
1580            per_page: 10,
1581        };
1582        let result = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
1583
1584        assert_eq!(result.total, 3);
1585        assert_eq!(result.items.len(), 3);
1586
1587        // Verify transactions are sorted by created_at descending (newest first)
1588        assert_eq!(
1589            result.items[0].id, "test-3",
1590            "First item should be newest (test-3)"
1591        );
1592        assert_eq!(
1593            result.items[0].created_at,
1594            "2025-01-27T14:00:00.000000+00:00"
1595        );
1596
1597        assert_eq!(
1598            result.items[1].id, "test-2",
1599            "Second item should be middle (test-2)"
1600        );
1601        assert_eq!(
1602            result.items[1].created_at,
1603            "2025-01-27T12:00:00.000000+00:00"
1604        );
1605
1606        assert_eq!(
1607            result.items[2].id, "test-1",
1608            "Third item should be oldest (test-1)"
1609        );
1610        assert_eq!(
1611            result.items[2].created_at,
1612            "2025-01-27T10:00:00.000000+00:00"
1613        );
1614    }
1615
1616    #[tokio::test]
1617    #[ignore = "Requires active Redis instance"]
1618    async fn test_find_by_relayer_id_migration_from_old_index() {
1619        let repo = setup_test_repo().await;
1620        let relayer_id = Uuid::new_v4().to_string();
1621
1622        // Create transactions with different created_at timestamps
1623        let mut tx1 = create_test_transaction_with_relayer("migrate-test-1", &relayer_id);
1624        tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); // Oldest
1625
1626        let mut tx2 = create_test_transaction_with_relayer("migrate-test-2", &relayer_id);
1627        tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); // Middle
1628
1629        let mut tx3 = create_test_transaction_with_relayer("migrate-test-3", &relayer_id);
1630        tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); // Newest
1631
1632        // Create transactions directly in Redis WITHOUT adding to sorted set
1633        // This simulates old transactions created before the sorted set index existed
1634        let mut conn = repo.client.as_ref().clone();
1635        let relayer_list_key = repo.relayer_list_key();
1636        let _: () = conn.sadd(&relayer_list_key, &relayer_id).await.unwrap();
1637
1638        for tx in &[&tx1, &tx2, &tx3] {
1639            let key = repo.tx_key(&tx.relayer_id, &tx.id);
1640            let reverse_key = repo.tx_to_relayer_key(&tx.id);
1641            let value = repo.serialize_entity(tx, |t| &t.id, "transaction").unwrap();
1642
1643            let mut pipe = redis::pipe();
1644            pipe.atomic();
1645            pipe.set(&key, &value);
1646            pipe.set(&reverse_key, &tx.relayer_id);
1647
1648            // Add to status index (but NOT to sorted set)
1649            let status_key = repo.relayer_status_key(&tx.relayer_id, &tx.status);
1650            pipe.sadd(&status_key, &tx.id);
1651
1652            pipe.exec_async(&mut conn).await.unwrap();
1653        }
1654
1655        // Verify sorted set is empty (transactions were created without sorted set index)
1656        let relayer_sorted_key = repo.relayer_tx_by_created_at_key(&relayer_id);
1657        let count: u64 = conn.zcard(&relayer_sorted_key).await.unwrap();
1658        assert_eq!(count, 0, "Sorted set should be empty for old transactions");
1659
1660        // Call find_by_relayer_id - this should trigger migration
1661        let query = PaginationQuery {
1662            page: 1,
1663            per_page: 10,
1664        };
1665        let result = repo
1666            .find_by_relayer_id(&relayer_id, query.clone())
1667            .await
1668            .unwrap();
1669
1670        // Verify migration happened - sorted set should now have entries
1671        let count_after: u64 = conn.zcard(&relayer_sorted_key).await.unwrap();
1672        assert_eq!(
1673            count_after, 3,
1674            "Sorted set should be populated after migration"
1675        );
1676
1677        // Verify results are correct and sorted (newest first)
1678        assert_eq!(result.total, 3);
1679        assert_eq!(result.items.len(), 3);
1680
1681        assert_eq!(
1682            result.items[0].id, "migrate-test-3",
1683            "First item should be newest after migration"
1684        );
1685        assert_eq!(
1686            result.items[0].created_at,
1687            "2025-01-27T14:00:00.000000+00:00"
1688        );
1689
1690        assert_eq!(
1691            result.items[1].id, "migrate-test-2",
1692            "Second item should be middle after migration"
1693        );
1694        assert_eq!(
1695            result.items[1].created_at,
1696            "2025-01-27T12:00:00.000000+00:00"
1697        );
1698
1699        assert_eq!(
1700            result.items[2].id, "migrate-test-1",
1701            "Third item should be oldest after migration"
1702        );
1703        assert_eq!(
1704            result.items[2].created_at,
1705            "2025-01-27T10:00:00.000000+00:00"
1706        );
1707
1708        // Verify second call uses sorted set (no migration needed)
1709        let result2 = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
1710        assert_eq!(result2.total, 3);
1711        assert_eq!(result2.items.len(), 3);
1712        // Results should be identical since sorted set is now populated
1713        assert_eq!(result.items[0].id, result2.items[0].id);
1714    }
1715
1716    #[tokio::test]
1717    #[ignore = "Requires active Redis instance"]
1718    async fn test_find_by_status() {
1719        let repo = setup_test_repo().await;
1720        let random_id = Uuid::new_v4().to_string();
1721        let random_id2 = Uuid::new_v4().to_string();
1722        let random_id3 = Uuid::new_v4().to_string();
1723        let relayer_id = Uuid::new_v4().to_string();
1724        let tx1 = create_test_transaction_with_status(
1725            &random_id,
1726            &relayer_id,
1727            TransactionStatus::Pending,
1728        );
1729        let tx2 =
1730            create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
1731        let tx3 = create_test_transaction_with_status(
1732            &random_id3,
1733            &relayer_id,
1734            TransactionStatus::Confirmed,
1735        );
1736
1737        repo.create(tx1).await.unwrap();
1738        repo.create(tx2).await.unwrap();
1739        repo.create(tx3).await.unwrap();
1740
1741        // Test finding pending transactions
1742        let result = repo
1743            .find_by_status(&relayer_id, &[TransactionStatus::Pending])
1744            .await
1745            .unwrap();
1746        assert_eq!(result.len(), 1);
1747        assert_eq!(result[0].status, TransactionStatus::Pending);
1748
1749        // Test finding multiple statuses
1750        let result = repo
1751            .find_by_status(
1752                &relayer_id,
1753                &[TransactionStatus::Pending, TransactionStatus::Sent],
1754            )
1755            .await
1756            .unwrap();
1757        assert_eq!(result.len(), 2);
1758
1759        // Test finding non-existent status
1760        let result = repo
1761            .find_by_status(&relayer_id, &[TransactionStatus::Failed])
1762            .await
1763            .unwrap();
1764        assert_eq!(result.len(), 0);
1765    }
1766
1767    #[tokio::test]
1768    #[ignore = "Requires active Redis instance"]
1769    async fn test_find_by_nonce() {
1770        let repo = setup_test_repo().await;
1771        let random_id = Uuid::new_v4().to_string();
1772        let random_id2 = Uuid::new_v4().to_string();
1773        let relayer_id = Uuid::new_v4().to_string();
1774
1775        let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1776        let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
1777
1778        repo.create(tx1.clone()).await.unwrap();
1779        repo.create(tx2).await.unwrap();
1780
1781        // Test finding existing nonce
1782        let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1783        assert!(result.is_some());
1784        assert_eq!(result.unwrap().id, random_id);
1785
1786        // Test finding non-existent nonce
1787        let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
1788        assert!(result.is_none());
1789
1790        // Test finding nonce for non-existent relayer
1791        let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
1792        assert!(result.is_none());
1793    }
1794
1795    #[tokio::test]
1796    #[ignore = "Requires active Redis instance"]
1797    async fn test_update_status() {
1798        let repo = setup_test_repo().await;
1799        let random_id = Uuid::new_v4().to_string();
1800        let tx = create_test_transaction(&random_id);
1801
1802        repo.create(tx).await.unwrap();
1803        let updated = repo
1804            .update_status(random_id.to_string(), TransactionStatus::Confirmed)
1805            .await
1806            .unwrap();
1807        assert_eq!(updated.status, TransactionStatus::Confirmed);
1808    }
1809
1810    #[tokio::test]
1811    #[ignore = "Requires active Redis instance"]
1812    async fn test_partial_update() {
1813        let repo = setup_test_repo().await;
1814        let random_id = Uuid::new_v4().to_string();
1815        let tx = create_test_transaction(&random_id);
1816
1817        repo.create(tx).await.unwrap();
1818
1819        let update = TransactionUpdateRequest {
1820            status: Some(TransactionStatus::Sent),
1821            status_reason: Some("Transaction sent".to_string()),
1822            sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
1823            confirmed_at: None,
1824            network_data: None,
1825            hashes: None,
1826            is_canceled: None,
1827            priced_at: None,
1828            noop_count: None,
1829            delete_at: None,
1830        };
1831
1832        let updated = repo
1833            .partial_update(random_id.to_string(), update)
1834            .await
1835            .unwrap();
1836        assert_eq!(updated.status, TransactionStatus::Sent);
1837        assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
1838        assert_eq!(
1839            updated.sent_at,
1840            Some("2025-01-27T16:00:00.000000+00:00".to_string())
1841        );
1842    }
1843
1844    #[tokio::test]
1845    #[ignore = "Requires active Redis instance"]
1846    async fn test_set_sent_at() {
1847        let repo = setup_test_repo().await;
1848        let random_id = Uuid::new_v4().to_string();
1849        let tx = create_test_transaction(&random_id);
1850
1851        repo.create(tx).await.unwrap();
1852        let updated = repo
1853            .set_sent_at(
1854                random_id.to_string(),
1855                "2025-01-27T16:00:00.000000+00:00".to_string(),
1856            )
1857            .await
1858            .unwrap();
1859        assert_eq!(
1860            updated.sent_at,
1861            Some("2025-01-27T16:00:00.000000+00:00".to_string())
1862        );
1863    }
1864
1865    #[tokio::test]
1866    #[ignore = "Requires active Redis instance"]
1867    async fn test_set_confirmed_at() {
1868        let repo = setup_test_repo().await;
1869        let random_id = Uuid::new_v4().to_string();
1870        let tx = create_test_transaction(&random_id);
1871
1872        repo.create(tx).await.unwrap();
1873        let updated = repo
1874            .set_confirmed_at(
1875                random_id.to_string(),
1876                "2025-01-27T16:00:00.000000+00:00".to_string(),
1877            )
1878            .await
1879            .unwrap();
1880        assert_eq!(
1881            updated.confirmed_at,
1882            Some("2025-01-27T16:00:00.000000+00:00".to_string())
1883        );
1884    }
1885
1886    #[tokio::test]
1887    #[ignore = "Requires active Redis instance"]
1888    async fn test_update_network_data() {
1889        let repo = setup_test_repo().await;
1890        let random_id = Uuid::new_v4().to_string();
1891        let tx = create_test_transaction(&random_id);
1892
1893        repo.create(tx).await.unwrap();
1894
1895        let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
1896            gas_price: Some(2000000000),
1897            gas_limit: Some(42000),
1898            nonce: Some(2),
1899            value: U256::from_str("2000000000000000000").unwrap(),
1900            data: Some("0x1234".to_string()),
1901            from: "0xNewSender".to_string(),
1902            to: Some("0xNewRecipient".to_string()),
1903            chain_id: 1,
1904            signature: None,
1905            hash: Some("0xnewhash".to_string()),
1906            speed: Some(Speed::SafeLow),
1907            max_fee_per_gas: None,
1908            max_priority_fee_per_gas: None,
1909            raw: None,
1910        });
1911
1912        let updated = repo
1913            .update_network_data(random_id.to_string(), new_network_data.clone())
1914            .await
1915            .unwrap();
1916        assert_eq!(
1917            updated
1918                .network_data
1919                .get_evm_transaction_data()
1920                .unwrap()
1921                .hash,
1922            new_network_data.get_evm_transaction_data().unwrap().hash
1923        );
1924    }
1925
1926    #[tokio::test]
1927    #[ignore = "Requires active Redis instance"]
1928    async fn test_debug_implementation() {
1929        let repo = setup_test_repo().await;
1930        let debug_str = format!("{:?}", repo);
1931        assert!(debug_str.contains("RedisTransactionRepository"));
1932        assert!(debug_str.contains("test_prefix"));
1933    }
1934
1935    #[tokio::test]
1936    #[ignore = "Requires active Redis instance"]
1937    async fn test_error_handling_empty_id() {
1938        let repo = setup_test_repo().await;
1939
1940        let result = repo.get_by_id("".to_string()).await;
1941        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1942
1943        let result = repo
1944            .update("".to_string(), create_test_transaction("test"))
1945            .await;
1946        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1947
1948        let result = repo.delete_by_id("".to_string()).await;
1949        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1950    }
1951
1952    #[tokio::test]
1953    #[ignore = "Requires active Redis instance"]
1954    async fn test_pagination_validation() {
1955        let repo = setup_test_repo().await;
1956
1957        let query = PaginationQuery {
1958            page: 1,
1959            per_page: 0,
1960        };
1961        let result = repo.list_paginated(query).await;
1962        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1963    }
1964
1965    #[tokio::test]
1966    #[ignore = "Requires active Redis instance"]
1967    async fn test_index_consistency() {
1968        let repo = setup_test_repo().await;
1969        let random_id = Uuid::new_v4().to_string();
1970        let relayer_id = Uuid::new_v4().to_string();
1971        let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1972
1973        // Create transaction
1974        repo.create(tx.clone()).await.unwrap();
1975
1976        // Verify it can be found by nonce
1977        let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1978        assert!(found.is_some());
1979
1980        // Update the transaction with a new nonce
1981        let mut updated_tx = tx.clone();
1982        if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
1983            evm_data.nonce = Some(43);
1984        }
1985
1986        repo.update(random_id.to_string(), updated_tx)
1987            .await
1988            .unwrap();
1989
1990        // Verify old nonce index is cleaned up
1991        let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1992        assert!(old_nonce_result.is_none());
1993
1994        // Verify new nonce index works
1995        let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
1996        assert!(new_nonce_result.is_some());
1997    }
1998
1999    #[tokio::test]
2000    #[ignore = "Requires active Redis instance"]
2001    async fn test_has_entries() {
2002        let repo = setup_test_repo().await;
2003        assert!(!repo.has_entries().await.unwrap());
2004
2005        let tx_id = uuid::Uuid::new_v4().to_string();
2006        let tx = create_test_transaction(&tx_id);
2007        repo.create(tx.clone()).await.unwrap();
2008
2009        assert!(repo.has_entries().await.unwrap());
2010    }
2011
2012    #[tokio::test]
2013    #[ignore = "Requires active Redis instance"]
2014    async fn test_drop_all_entries() {
2015        let repo = setup_test_repo().await;
2016        let tx_id = uuid::Uuid::new_v4().to_string();
2017        let tx = create_test_transaction(&tx_id);
2018        repo.create(tx.clone()).await.unwrap();
2019        assert!(repo.has_entries().await.unwrap());
2020
2021        repo.drop_all_entries().await.unwrap();
2022        assert!(!repo.has_entries().await.unwrap());
2023    }
2024
2025    // Tests for delete_at field setting on final status updates
2026    #[tokio::test]
2027    #[ignore = "Requires active Redis instance"]
2028    async fn test_update_status_sets_delete_at_for_final_statuses() {
2029        let _lock = ENV_MUTEX.lock().await;
2030
2031        use chrono::{DateTime, Duration, Utc};
2032        use std::env;
2033
2034        // Use a unique test environment variable to avoid conflicts
2035        env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
2036
2037        let repo = setup_test_repo().await;
2038
2039        let final_statuses = [
2040            TransactionStatus::Canceled,
2041            TransactionStatus::Confirmed,
2042            TransactionStatus::Failed,
2043            TransactionStatus::Expired,
2044        ];
2045
2046        for (i, status) in final_statuses.iter().enumerate() {
2047            let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
2048            let mut tx = create_test_transaction(&tx_id);
2049
2050            // Ensure transaction has no delete_at initially and is in pending state
2051            tx.delete_at = None;
2052            tx.status = TransactionStatus::Pending;
2053
2054            repo.create(tx).await.unwrap();
2055
2056            let before_update = Utc::now();
2057
2058            // Update to final status
2059            let updated = repo
2060                .update_status(tx_id.clone(), status.clone())
2061                .await
2062                .unwrap();
2063
2064            // Should have delete_at set
2065            assert!(
2066                updated.delete_at.is_some(),
2067                "delete_at should be set for status: {:?}",
2068                status
2069            );
2070
2071            // Verify the timestamp is reasonable (approximately 6 hours from now)
2072            let delete_at_str = updated.delete_at.unwrap();
2073            let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
2074                .expect("delete_at should be valid RFC3339")
2075                .with_timezone(&Utc);
2076
2077            let duration_from_before = delete_at.signed_duration_since(before_update);
2078            let expected_duration = Duration::hours(6);
2079            let tolerance = Duration::minutes(5);
2080
2081            assert!(
2082                duration_from_before >= expected_duration - tolerance &&
2083                duration_from_before <= expected_duration + tolerance,
2084                "delete_at should be approximately 6 hours from now for status: {:?}. Duration: {:?}",
2085                status, duration_from_before
2086            );
2087        }
2088
2089        // Cleanup
2090        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2091    }
2092
2093    #[tokio::test]
2094    #[ignore = "Requires active Redis instance"]
2095    async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
2096        let _lock = ENV_MUTEX.lock().await;
2097
2098        use std::env;
2099
2100        env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
2101
2102        let repo = setup_test_repo().await;
2103
2104        let non_final_statuses = [
2105            TransactionStatus::Pending,
2106            TransactionStatus::Sent,
2107            TransactionStatus::Submitted,
2108            TransactionStatus::Mined,
2109        ];
2110
2111        for (i, status) in non_final_statuses.iter().enumerate() {
2112            let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
2113            let mut tx = create_test_transaction(&tx_id);
2114            tx.delete_at = None;
2115            tx.status = TransactionStatus::Pending;
2116
2117            repo.create(tx).await.unwrap();
2118
2119            // Update to non-final status
2120            let updated = repo
2121                .update_status(tx_id.clone(), status.clone())
2122                .await
2123                .unwrap();
2124
2125            // Should NOT have delete_at set
2126            assert!(
2127                updated.delete_at.is_none(),
2128                "delete_at should NOT be set for status: {:?}",
2129                status
2130            );
2131        }
2132
2133        // Cleanup
2134        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2135    }
2136
2137    #[tokio::test]
2138    #[ignore = "Requires active Redis instance"]
2139    async fn test_partial_update_sets_delete_at_for_final_statuses() {
2140        let _lock = ENV_MUTEX.lock().await;
2141
2142        use chrono::{DateTime, Duration, Utc};
2143        use std::env;
2144
2145        env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
2146
2147        let repo = setup_test_repo().await;
2148        let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
2149        let mut tx = create_test_transaction(&tx_id);
2150        tx.delete_at = None;
2151        tx.status = TransactionStatus::Pending;
2152
2153        repo.create(tx).await.unwrap();
2154
2155        let before_update = Utc::now();
2156
2157        // Use partial_update to set status to Confirmed (final status)
2158        let update = TransactionUpdateRequest {
2159            status: Some(TransactionStatus::Confirmed),
2160            status_reason: Some("Transaction completed".to_string()),
2161            confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
2162            ..Default::default()
2163        };
2164
2165        let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
2166
2167        // Should have delete_at set
2168        assert!(
2169            updated.delete_at.is_some(),
2170            "delete_at should be set when updating to Confirmed status"
2171        );
2172
2173        // Verify the timestamp is reasonable (approximately 8 hours from now)
2174        let delete_at_str = updated.delete_at.unwrap();
2175        let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
2176            .expect("delete_at should be valid RFC3339")
2177            .with_timezone(&Utc);
2178
2179        let duration_from_before = delete_at.signed_duration_since(before_update);
2180        let expected_duration = Duration::hours(8);
2181        let tolerance = Duration::minutes(5);
2182
2183        assert!(
2184            duration_from_before >= expected_duration - tolerance
2185                && duration_from_before <= expected_duration + tolerance,
2186            "delete_at should be approximately 8 hours from now. Duration: {:?}",
2187            duration_from_before
2188        );
2189
2190        // Also verify other fields were updated
2191        assert_eq!(updated.status, TransactionStatus::Confirmed);
2192        assert_eq!(
2193            updated.status_reason,
2194            Some("Transaction completed".to_string())
2195        );
2196        assert_eq!(
2197            updated.confirmed_at,
2198            Some("2023-01-01T12:05:00Z".to_string())
2199        );
2200
2201        // Cleanup
2202        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2203    }
2204
2205    #[tokio::test]
2206    #[ignore = "Requires active Redis instance"]
2207    async fn test_update_status_preserves_existing_delete_at() {
2208        let _lock = ENV_MUTEX.lock().await;
2209
2210        use std::env;
2211
2212        env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
2213
2214        let repo = setup_test_repo().await;
2215        let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
2216        let mut tx = create_test_transaction(&tx_id);
2217
2218        // Set an existing delete_at value
2219        let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
2220        tx.delete_at = Some(existing_delete_at.clone());
2221        tx.status = TransactionStatus::Pending;
2222
2223        repo.create(tx).await.unwrap();
2224
2225        // Update to final status
2226        let updated = repo
2227            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
2228            .await
2229            .unwrap();
2230
2231        // Should preserve the existing delete_at value
2232        assert_eq!(
2233            updated.delete_at,
2234            Some(existing_delete_at),
2235            "Existing delete_at should be preserved when updating to final status"
2236        );
2237
2238        // Cleanup
2239        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2240    }
2241    #[tokio::test]
2242    #[ignore = "Requires active Redis instance"]
2243    async fn test_partial_update_without_status_change_preserves_delete_at() {
2244        let _lock = ENV_MUTEX.lock().await;
2245
2246        use std::env;
2247
2248        env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
2249
2250        let repo = setup_test_repo().await;
2251        let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
2252        let mut tx = create_test_transaction(&tx_id);
2253        tx.delete_at = None;
2254        tx.status = TransactionStatus::Pending;
2255
2256        repo.create(tx).await.unwrap();
2257
2258        // First, update to final status to set delete_at
2259        let updated1 = repo
2260            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
2261            .await
2262            .unwrap();
2263
2264        assert!(updated1.delete_at.is_some());
2265        let original_delete_at = updated1.delete_at.clone();
2266
2267        // Now update other fields without changing status
2268        let update = TransactionUpdateRequest {
2269            status: None, // No status change
2270            status_reason: Some("Updated reason".to_string()),
2271            confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
2272            ..Default::default()
2273        };
2274
2275        let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
2276
2277        // delete_at should be preserved
2278        assert_eq!(
2279            updated2.delete_at, original_delete_at,
2280            "delete_at should be preserved when status is not updated"
2281        );
2282
2283        // Other fields should be updated
2284        assert_eq!(updated2.status, TransactionStatus::Confirmed); // Unchanged
2285        assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
2286        assert_eq!(
2287            updated2.confirmed_at,
2288            Some("2023-01-01T12:10:00Z".to_string())
2289        );
2290
2291        // Cleanup
2292        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2293    }
2294}