1use crate::models::{
4 NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
5 TransactionStatus, TransactionUpdateRequest,
6};
7use crate::repositories::redis_base::RedisRepository;
8use crate::repositories::{
9 BatchRetrievalResult, PaginatedResult, Repository, TransactionRepository,
10};
11use async_trait::async_trait;
12use redis::aio::ConnectionManager;
13use redis::AsyncCommands;
14use std::fmt;
15use std::sync::Arc;
16use tracing::{debug, error, warn};
17
18const RELAYER_PREFIX: &str = "relayer";
19const TX_PREFIX: &str = "tx";
20const STATUS_PREFIX: &str = "status";
21const NONCE_PREFIX: &str = "nonce";
22const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
23const RELAYER_LIST_KEY: &str = "relayer_list";
24const TX_BY_CREATED_AT_PREFIX: &str = "tx_by_created_at";
25
26#[derive(Clone)]
27pub struct RedisTransactionRepository {
28 pub client: Arc<ConnectionManager>,
29 pub key_prefix: String,
30}
31
32impl RedisRepository for RedisTransactionRepository {}
33
34impl RedisTransactionRepository {
35 pub fn new(
36 connection_manager: Arc<ConnectionManager>,
37 key_prefix: String,
38 ) -> Result<Self, RepositoryError> {
39 if key_prefix.is_empty() {
40 return Err(RepositoryError::InvalidData(
41 "Redis key prefix cannot be empty".to_string(),
42 ));
43 }
44
45 Ok(Self {
46 client: connection_manager,
47 key_prefix,
48 })
49 }
50
51 fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
53 format!(
54 "{}:{}:{}:{}:{}",
55 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
56 )
57 }
58
59 fn tx_to_relayer_key(&self, tx_id: &str) -> String {
61 format!(
62 "{}:{}:{}:{}",
63 self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
64 )
65 }
66
67 fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
69 format!(
70 "{}:{}:{}:{}:{}",
71 self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
72 )
73 }
74
75 fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
77 format!(
78 "{}:{}:{}:{}:{}",
79 self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
80 )
81 }
82
83 fn relayer_list_key(&self) -> String {
85 format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
86 }
87
88 fn relayer_tx_by_created_at_key(&self, relayer_id: &str) -> String {
90 format!(
91 "{}:{}:{}:{}",
92 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_BY_CREATED_AT_PREFIX
93 )
94 }
95
96 fn created_at_to_score(&self, created_at: &str) -> f64 {
98 chrono::DateTime::parse_from_rfc3339(created_at)
99 .map(|dt| dt.timestamp_millis() as f64)
100 .unwrap_or_else(|_| {
101 warn!(created_at = %created_at, "failed to parse created_at timestamp, using 0");
102 0.0
103 })
104 }
105
106 async fn get_transactions_by_ids(
108 &self,
109 ids: &[String],
110 ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
111 if ids.is_empty() {
112 debug!("no transaction IDs provided for batch fetch");
113 return Ok(BatchRetrievalResult {
114 results: vec![],
115 failed_ids: vec![],
116 });
117 }
118
119 let mut conn = self.client.as_ref().clone();
120
121 let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
122
123 debug!(count = %ids.len(), "fetching relayer IDs for transactions");
124
125 let relayer_ids: Vec<Option<String>> = conn
126 .mget(&reverse_keys)
127 .await
128 .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
129
130 let mut tx_keys = Vec::new();
131 let mut valid_ids = Vec::new();
132 let mut failed_ids = Vec::new();
133 for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
134 match relayer_id {
135 Some(relayer_id) => {
136 tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
137 valid_ids.push(ids[i].clone());
138 }
139 None => {
140 warn!(tx_id = %ids[i], "no relayer found for transaction");
141 failed_ids.push(ids[i].clone());
142 }
143 }
144 }
145
146 if tx_keys.is_empty() {
147 debug!("no valid transactions found for batch fetch");
148 return Ok(BatchRetrievalResult {
149 results: vec![],
150 failed_ids,
151 });
152 }
153
154 debug!(count = %tx_keys.len(), "batch fetching transaction data");
155
156 let values: Vec<Option<String>> = conn
157 .mget(&tx_keys)
158 .await
159 .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
160
161 let mut transactions = Vec::new();
162 let mut failed_count = 0;
163 let mut failed_ids = Vec::new();
164 for (i, value) in values.into_iter().enumerate() {
165 match value {
166 Some(json) => {
167 match self.deserialize_entity::<TransactionRepoModel>(
168 &json,
169 &valid_ids[i],
170 "transaction",
171 ) {
172 Ok(tx) => transactions.push(tx),
173 Err(e) => {
174 failed_count += 1;
175 error!(tx_id = %valid_ids[i], error = %e, "failed to deserialize transaction");
176 }
178 }
179 }
180 None => {
181 warn!(tx_id = %valid_ids[i], "transaction not found in batch fetch");
182 failed_ids.push(valid_ids[i].clone());
183 }
184 }
185 }
186
187 if failed_count > 0 {
188 warn!(failed_count = %failed_count, total_count = %valid_ids.len(), "failed to deserialize transactions in batch");
189 }
190
191 debug!(count = %transactions.len(), "successfully fetched transactions");
192 Ok(BatchRetrievalResult {
193 results: transactions,
194 failed_ids,
195 })
196 }
197
198 fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
200 match network_data.get_evm_transaction_data() {
201 Ok(tx_data) => tx_data.nonce,
202 Err(_) => {
203 debug!("no EVM transaction data available for nonce extraction");
204 None
205 }
206 }
207 }
208
209 async fn update_indexes(
211 &self,
212 tx: &TransactionRepoModel,
213 old_tx: Option<&TransactionRepoModel>,
214 ) -> Result<(), RepositoryError> {
215 let mut conn = self.client.as_ref().clone();
216 let mut pipe = redis::pipe();
217 pipe.atomic();
218
219 debug!(tx_id = %tx.id, "updating indexes for transaction");
220
221 let relayer_list_key = self.relayer_list_key();
223 pipe.sadd(&relayer_list_key, &tx.relayer_id);
224
225 let new_status_key = self.relayer_status_key(&tx.relayer_id, &tx.status);
227 pipe.sadd(&new_status_key, &tx.id);
228
229 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
230 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
231 pipe.set(&nonce_key, &tx.id);
232 debug!(tx_id = %tx.id, nonce = %nonce, "added nonce index for transaction");
233 }
234
235 let created_at_score = self.created_at_to_score(&tx.created_at);
237 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
238 pipe.zadd(&relayer_sorted_key, &tx.id, created_at_score);
239 debug!(tx_id = %tx.id, score = %created_at_score, "added transaction to sorted set by created_at");
240
241 if let Some(old) = old_tx {
243 if old.status != tx.status {
244 let old_status_key = self.relayer_status_key(&old.relayer_id, &old.status);
245 pipe.srem(&old_status_key, &tx.id);
246 debug!(tx_id = %tx.id, old_status = %old.status, new_status = %tx.status, "removing old status index for transaction");
247 }
248
249 if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
251 let new_nonce = self.extract_nonce(&tx.network_data);
252 if Some(old_nonce) != new_nonce {
253 let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
254 pipe.del(&old_nonce_key);
255 debug!(tx_id = %tx.id, old_nonce = %old_nonce, new_nonce = ?new_nonce, "removing old nonce index for transaction");
256 }
257 }
258 }
259
260 pipe.exec_async(&mut conn).await.map_err(|e| {
262 error!(tx_id = %tx.id, error = %e, "index update pipeline failed for transaction");
263 self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
264 })?;
265
266 debug!(tx_id = %tx.id, "successfully updated indexes for transaction");
267 Ok(())
268 }
269
270 async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
272 let mut conn = self.client.as_ref().clone();
273 let mut pipe = redis::pipe();
274 pipe.atomic();
275
276 debug!(tx_id = %tx.id, "removing all indexes for transaction");
277
278 let status_key = self.relayer_status_key(&tx.relayer_id, &tx.status);
280 pipe.srem(&status_key, &tx.id);
281
282 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
284 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
285 pipe.del(&nonce_key);
286 debug!(tx_id = %tx.id, nonce = %nonce, "removing nonce index for transaction");
287 }
288
289 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
291 pipe.zrem(&relayer_sorted_key, &tx.id);
292 debug!(tx_id = %tx.id, "removing transaction from sorted set by created_at");
293
294 let reverse_key = self.tx_to_relayer_key(&tx.id);
296 pipe.del(&reverse_key);
297
298 pipe.exec_async(&mut conn).await.map_err(|e| {
299 error!(tx_id = %tx.id, error = %e, "index removal failed for transaction");
300 self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
301 })?;
302
303 debug!(tx_id = %tx.id, "successfully removed all indexes for transaction");
304 Ok(())
305 }
306
307 async fn check_and_migrate_if_needed(
311 &self,
312 relayer_id: &str,
313 ) -> Result<Option<u64>, RepositoryError> {
314 let mut conn = self.client.as_ref().clone();
315
316 let pattern = format!(
318 "{}:{}:{}:{}:*",
319 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
320 );
321
322 let mut cursor = 0u64;
324 let mut found_any = false;
325 loop {
326 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
327 .cursor_arg(cursor)
328 .arg("MATCH")
329 .arg(&pattern)
330 .query_async(&mut conn)
331 .await
332 .map_err(|e| self.map_redis_error(e, "check_and_migrate_scan_check"))?;
333
334 if !keys.is_empty() {
335 found_any = true;
336 break; }
338
339 cursor = next_cursor;
340 if cursor == 0 {
341 break; }
343 }
344
345 if !found_any {
346 debug!(relayer_id = %relayer_id, "no transactions found for relayer");
348 return Ok(None);
349 }
350
351 debug!(relayer_id = %relayer_id, "sorted set is empty but old transactions found, migrating to sorted set");
353 if let Err(e) = self
354 .migrate_relayer_transactions_to_sorted_set(relayer_id)
355 .await
356 {
357 warn!(relayer_id = %relayer_id, error = %e, "failed to migrate transactions");
358 return Err(e);
359 }
360
361 let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
363 let new_count: u64 = conn
364 .zcard(&relayer_sorted_key)
365 .await
366 .map_err(|e| self.map_redis_error(e, "check_and_migrate_count_after"))?;
367
368 if new_count == 0 {
369 debug!(relayer_id = %relayer_id, "migration completed but sorted set still empty");
371 return Ok(None);
372 }
373
374 Ok(Some(new_count))
375 }
376
377 async fn migrate_relayer_transactions_to_sorted_set(
381 &self,
382 relayer_id: &str,
383 ) -> Result<usize, RepositoryError> {
384 let mut conn = self.client.as_ref().clone();
385
386 debug!(relayer_id = %relayer_id, "migrating old transactions to sorted set index");
387
388 let pattern = format!(
390 "{}:{}:{}:{}:*",
391 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
392 );
393 let mut all_tx_ids = Vec::new();
394 let mut cursor = 0;
395
396 loop {
397 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
398 .cursor_arg(cursor)
399 .arg("MATCH")
400 .arg(&pattern)
401 .query_async(&mut conn)
402 .await
403 .map_err(|e| self.map_redis_error(e, "migrate_relayer_transactions_scan"))?;
404
405 for key in keys {
407 if let Some(tx_id) = key.split(':').next_back() {
408 all_tx_ids.push(tx_id.to_string());
409 }
410 }
411
412 cursor = next_cursor;
413 if cursor == 0 {
414 break;
415 }
416 }
417
418 if all_tx_ids.is_empty() {
419 debug!(relayer_id = %relayer_id, "no transactions found to migrate");
420 return Ok(0);
421 }
422
423 let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
425 let transactions = batch_result.results;
426
427 if transactions.is_empty() {
428 debug!(relayer_id = %relayer_id, "no valid transactions found to migrate");
429 return Ok(0);
430 }
431
432 let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
434 let mut pipe = redis::pipe();
435 pipe.atomic();
436
437 for tx in &transactions {
438 let created_at_score = self.created_at_to_score(&tx.created_at);
439 pipe.zadd(&relayer_sorted_key, &tx.id, created_at_score);
440 }
441
442 pipe.exec_async(&mut conn)
443 .await
444 .map_err(|e| self.map_redis_error(e, "migrate_relayer_transactions_pipeline"))?;
445
446 let migrated_count = transactions.len();
447 debug!(relayer_id = %relayer_id, count = %migrated_count, "successfully migrated transactions to sorted set");
448
449 Ok(migrated_count)
450 }
451
452 async fn find_by_relayer_id_fallback(
456 &self,
457 relayer_id: &str,
458 query: PaginationQuery,
459 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
460 let mut conn = self.client.as_ref().clone();
461
462 let pattern = format!(
464 "{}:{}:{}:{}:*",
465 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
466 );
467 let mut all_tx_ids = Vec::new();
468 let mut cursor = 0;
469
470 loop {
471 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
472 .cursor_arg(cursor)
473 .arg("MATCH")
474 .arg(&pattern)
475 .query_async(&mut conn)
476 .await
477 .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_fallback_scan"))?;
478
479 for key in keys {
481 if let Some(tx_id) = key.split(':').next_back() {
482 all_tx_ids.push(tx_id.to_string());
483 }
484 }
485
486 cursor = next_cursor;
487 if cursor == 0 {
488 break;
489 }
490 }
491
492 let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
494 let mut transactions = batch_result.results;
495 transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
496
497 let total = transactions.len() as u64;
498 let start = ((query.page - 1) * query.per_page) as usize;
499 let end = (start + query.per_page as usize).min(transactions.len());
500
501 if start >= transactions.len() {
502 debug!(relayer_id = %relayer_id, page = %query.page, total = %total, "page is beyond available data");
503 return Ok(PaginatedResult {
504 items: vec![],
505 total,
506 page: query.page,
507 per_page: query.per_page,
508 });
509 }
510
511 let items = transactions[start..end].to_vec();
512
513 debug!(relayer_id = %relayer_id, count = %items.len(), page = %query.page, "successfully fetched transactions for relayer using fallback method");
514
515 Ok(PaginatedResult {
516 items,
517 total,
518 page: query.page,
519 per_page: query.per_page,
520 })
521 }
522}
523
524impl fmt::Debug for RedisTransactionRepository {
525 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
526 f.debug_struct("RedisTransactionRepository")
527 .field("client", &"<ConnectionManager>")
528 .field("key_prefix", &self.key_prefix)
529 .finish()
530 }
531}
532
533#[async_trait]
534impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
535 async fn create(
536 &self,
537 entity: TransactionRepoModel,
538 ) -> Result<TransactionRepoModel, RepositoryError> {
539 if entity.id.is_empty() {
540 return Err(RepositoryError::InvalidData(
541 "Transaction ID cannot be empty".to_string(),
542 ));
543 }
544
545 let key = self.tx_key(&entity.relayer_id, &entity.id);
546 let reverse_key = self.tx_to_relayer_key(&entity.id);
547 let mut conn = self.client.as_ref().clone();
548
549 debug!(tx_id = %entity.id, "creating transaction");
550
551 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
552
553 let existing: Option<String> = conn
555 .get(&reverse_key)
556 .await
557 .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
558
559 if existing.is_some() {
560 return Err(RepositoryError::ConstraintViolation(format!(
561 "Transaction with ID {} already exists",
562 entity.id
563 )));
564 }
565
566 let mut pipe = redis::pipe();
568 pipe.atomic();
569 pipe.set(&key, &value);
570 pipe.set(&reverse_key, &entity.relayer_id);
571
572 pipe.exec_async(&mut conn)
573 .await
574 .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
575
576 if let Err(e) = self.update_indexes(&entity, None).await {
578 error!(tx_id = %entity.id, error = %e, "failed to update indexes for new transaction");
579 return Err(e);
580 }
581
582 debug!(tx_id = %entity.id, "successfully created transaction");
583 Ok(entity)
584 }
585
586 async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
587 if id.is_empty() {
588 return Err(RepositoryError::InvalidData(
589 "Transaction ID cannot be empty".to_string(),
590 ));
591 }
592
593 let mut conn = self.client.as_ref().clone();
594
595 debug!(tx_id = %id, "fetching transaction");
596
597 let reverse_key = self.tx_to_relayer_key(&id);
598 let relayer_id: Option<String> = conn
599 .get(&reverse_key)
600 .await
601 .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
602
603 let relayer_id = match relayer_id {
604 Some(relayer_id) => relayer_id,
605 None => {
606 debug!(tx_id = %id, "transaction not found (no reverse lookup)");
607 return Err(RepositoryError::NotFound(format!(
608 "Transaction with ID {id} not found"
609 )));
610 }
611 };
612
613 let key = self.tx_key(&relayer_id, &id);
614 let value: Option<String> = conn
615 .get(&key)
616 .await
617 .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
618
619 match value {
620 Some(json) => {
621 let tx =
622 self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
623 debug!(tx_id = %id, "successfully fetched transaction");
624 Ok(tx)
625 }
626 None => {
627 debug!(tx_id = %id, "transaction not found");
628 Err(RepositoryError::NotFound(format!(
629 "Transaction with ID {id} not found"
630 )))
631 }
632 }
633 }
634
635 async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
637 let mut conn = self.client.as_ref().clone();
638
639 debug!("fetching all transactions sorted by created_at (newest first)");
640
641 let relayer_list_key = self.relayer_list_key();
643 let relayer_ids: Vec<String> = conn
644 .smembers(&relayer_list_key)
645 .await
646 .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
647
648 debug!(count = %relayer_ids.len(), "found relayers");
649
650 let mut all_transactions = Vec::new();
652 for relayer_id in relayer_ids {
653 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
654 let tx_ids: Vec<String> = redis::cmd("ZRANGE")
655 .arg(&relayer_sorted_key)
656 .arg(0)
657 .arg(-1)
658 .arg("REV")
659 .query_async(&mut conn)
660 .await
661 .map_err(|e| self.map_redis_error(e, "list_all_relayer_sorted"))?;
662
663 let batch_result = self.get_transactions_by_ids(&tx_ids).await?;
664 all_transactions.extend(batch_result.results);
665 }
666
667 all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
669
670 debug!(count = %all_transactions.len(), "found transactions");
671 Ok(all_transactions)
672 }
673
674 async fn list_paginated(
676 &self,
677 query: PaginationQuery,
678 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
679 if query.per_page == 0 {
680 return Err(RepositoryError::InvalidData(
681 "per_page must be greater than 0".to_string(),
682 ));
683 }
684
685 let mut conn = self.client.as_ref().clone();
686
687 debug!(page = %query.page, per_page = %query.per_page, "fetching paginated transactions sorted by created_at (newest first)");
688
689 let relayer_list_key = self.relayer_list_key();
691 let relayer_ids: Vec<String> = conn
692 .smembers(&relayer_list_key)
693 .await
694 .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
695
696 let mut all_transactions = Vec::new();
698 for relayer_id in relayer_ids {
699 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
700 let tx_ids: Vec<String> = redis::cmd("ZRANGE")
701 .arg(&relayer_sorted_key)
702 .arg(0)
703 .arg(-1)
704 .arg("REV")
705 .query_async(&mut conn)
706 .await
707 .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_sorted"))?;
708
709 let batch_result = self.get_transactions_by_ids(&tx_ids).await?;
710 all_transactions.extend(batch_result.results);
711 }
712
713 all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
715
716 let total = all_transactions.len() as u64;
717 let start = ((query.page - 1) * query.per_page) as usize;
718 let end = (start + query.per_page as usize).min(all_transactions.len());
719
720 if start >= all_transactions.len() {
721 debug!(page = %query.page, total = %total, "page is beyond available data");
722 return Ok(PaginatedResult {
723 items: vec![],
724 total,
725 page: query.page,
726 per_page: query.per_page,
727 });
728 }
729
730 let items = all_transactions[start..end].to_vec();
731
732 debug!(count = %items.len(), page = %query.page, "successfully fetched transactions for page");
733
734 Ok(PaginatedResult {
735 items,
736 total,
737 page: query.page,
738 per_page: query.per_page,
739 })
740 }
741
742 async fn update(
743 &self,
744 id: String,
745 entity: TransactionRepoModel,
746 ) -> Result<TransactionRepoModel, RepositoryError> {
747 if id.is_empty() {
748 return Err(RepositoryError::InvalidData(
749 "Transaction ID cannot be empty".to_string(),
750 ));
751 }
752
753 debug!(tx_id = %id, "updating transaction");
754
755 let old_tx = self.get_by_id(id.clone()).await?;
757
758 let key = self.tx_key(&entity.relayer_id, &id);
759 let mut conn = self.client.as_ref().clone();
760
761 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
762
763 let _: () = conn
765 .set(&key, value)
766 .await
767 .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
768
769 self.update_indexes(&entity, Some(&old_tx)).await?;
771
772 debug!(tx_id = %id, "successfully updated transaction");
773 Ok(entity)
774 }
775
776 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
777 if id.is_empty() {
778 return Err(RepositoryError::InvalidData(
779 "Transaction ID cannot be empty".to_string(),
780 ));
781 }
782
783 debug!(tx_id = %id, "deleting transaction");
784
785 let tx = self.get_by_id(id.clone()).await?;
787
788 let key = self.tx_key(&tx.relayer_id, &id);
789 let reverse_key = self.tx_to_relayer_key(&id);
790 let mut conn = self.client.as_ref().clone();
791
792 let mut pipe = redis::pipe();
793 pipe.atomic();
794 pipe.del(&key);
795 pipe.del(&reverse_key);
796
797 pipe.exec_async(&mut conn)
798 .await
799 .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
800
801 if let Err(e) = self.remove_all_indexes(&tx).await {
803 error!(tx_id = %id, error = %e, "failed to remove indexes for deleted transaction");
804 }
805
806 debug!(tx_id = %id, "successfully deleted transaction");
807 Ok(())
808 }
809
810 async fn count(&self) -> Result<usize, RepositoryError> {
812 let mut conn = self.client.as_ref().clone();
813
814 debug!("counting transactions");
815
816 let relayer_list_key = self.relayer_list_key();
818 let relayer_ids: Vec<String> = conn
819 .smembers(&relayer_list_key)
820 .await
821 .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
822
823 let mut total_count = 0usize;
824 for relayer_id in relayer_ids {
825 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
826 let count: usize = conn
827 .zcard(&relayer_sorted_key)
828 .await
829 .map_err(|e| self.map_redis_error(e, "count_relayer_transactions"))?;
830 total_count += count;
831 }
832
833 debug!(count = %total_count, "transaction count");
834 Ok(total_count)
835 }
836
837 async fn has_entries(&self) -> Result<bool, RepositoryError> {
838 let mut conn = self.client.as_ref().clone();
839 let relayer_list_key = self.relayer_list_key();
840
841 debug!("checking if transaction entries exist");
842
843 let exists: bool = conn
844 .exists(&relayer_list_key)
845 .await
846 .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
847
848 debug!(exists = %exists, "transaction entries exist");
849 Ok(exists)
850 }
851
852 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
853 let mut conn = self.client.as_ref().clone();
854 let relayer_list_key = self.relayer_list_key();
855
856 debug!("dropping all transaction entries");
857
858 let relayer_ids: Vec<String> = conn
860 .smembers(&relayer_list_key)
861 .await
862 .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
863
864 if relayer_ids.is_empty() {
865 debug!("no transaction entries to drop");
866 return Ok(());
867 }
868
869 let mut pipe = redis::pipe();
871 pipe.atomic();
872
873 for relayer_id in &relayer_ids {
875 let pattern = format!(
877 "{}:{}:{}:{}:*",
878 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
879 );
880 let mut cursor = 0;
881 let mut tx_ids = Vec::new();
882
883 loop {
884 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
885 .cursor_arg(cursor)
886 .arg("MATCH")
887 .arg(&pattern)
888 .query_async(&mut conn)
889 .await
890 .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
891
892 for key in keys {
894 pipe.del(&key);
895 if let Some(tx_id) = key.split(':').next_back() {
896 tx_ids.push(tx_id.to_string());
897 }
898 }
899
900 cursor = next_cursor;
901 if cursor == 0 {
902 break;
903 }
904 }
905
906 for tx_id in tx_ids {
908 let reverse_key = self.tx_to_relayer_key(&tx_id);
909 pipe.del(&reverse_key);
910
911 for status in &[
913 TransactionStatus::Pending,
914 TransactionStatus::Sent,
915 TransactionStatus::Confirmed,
916 TransactionStatus::Failed,
917 TransactionStatus::Canceled,
918 ] {
919 let status_key = self.relayer_status_key(relayer_id, status);
920 pipe.srem(&status_key, &tx_id);
921 }
922 }
923
924 let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
926 pipe.del(&relayer_sorted_key);
927 }
928
929 pipe.del(&relayer_list_key);
931
932 pipe.exec_async(&mut conn)
933 .await
934 .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
935
936 debug!(count = %relayer_ids.len(), "dropped all transaction entries for relayers");
937 Ok(())
938 }
939}
940
941#[async_trait]
942impl TransactionRepository for RedisTransactionRepository {
943 async fn find_by_relayer_id(
944 &self,
945 relayer_id: &str,
946 query: PaginationQuery,
947 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
948 let mut conn = self.client.as_ref().clone();
949
950 debug!(relayer_id = %relayer_id, page = %query.page, per_page = %query.per_page, "fetching transactions for relayer sorted by created_at (newest first)");
951
952 let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
953
954 let mut sorted_set_count: u64 = conn
956 .zcard(&relayer_sorted_key)
957 .await
958 .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_count"))?;
959
960 if sorted_set_count == 0 {
963 match self.check_and_migrate_if_needed(relayer_id).await {
964 Ok(Some(count)) => {
965 sorted_set_count = count;
966 }
967 Ok(None) => {
968 return self.find_by_relayer_id_fallback(relayer_id, query).await;
971 }
972 Err(_) => {
973 warn!(relayer_id = %relayer_id, "migration failed, falling back to SCAN method");
975 return self.find_by_relayer_id_fallback(relayer_id, query).await;
976 }
977 }
978 }
979
980 let total = sorted_set_count;
981
982 let start = ((query.page - 1) * query.per_page) as isize;
984 let end = start + query.per_page as isize - 1;
985
986 if start as u64 >= total {
987 debug!(relayer_id = %relayer_id, page = %query.page, total = %total, "page is beyond available data");
988 return Ok(PaginatedResult {
989 items: vec![],
990 total,
991 page: query.page,
992 per_page: query.per_page,
993 });
994 }
995
996 let page_ids: Vec<String> = redis::cmd("ZRANGE")
998 .arg(&relayer_sorted_key)
999 .arg(start)
1000 .arg(end)
1001 .arg("REV")
1002 .query_async(&mut conn)
1003 .await
1004 .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_sorted"))?;
1005
1006 let items = self.get_transactions_by_ids(&page_ids).await?;
1007
1008 debug!(relayer_id = %relayer_id, count = %items.results.len(), page = %query.page, "successfully fetched transactions for relayer");
1009
1010 Ok(PaginatedResult {
1011 items: items.results,
1012 total,
1013 page: query.page,
1014 per_page: query.per_page,
1015 })
1016 }
1017
1018 async fn find_by_status(
1019 &self,
1020 relayer_id: &str,
1021 statuses: &[TransactionStatus],
1022 ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
1023 let mut conn = self.client.as_ref().clone();
1024 let mut all_ids = Vec::new();
1025
1026 for status in statuses {
1028 let status_key = self.relayer_status_key(relayer_id, status);
1029 let ids: Vec<String> = conn
1030 .smembers(status_key)
1031 .await
1032 .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
1033
1034 all_ids.extend(ids);
1035 }
1036
1037 all_ids.sort();
1039 all_ids.dedup();
1040
1041 let mut transactions = self.get_transactions_by_ids(&all_ids).await?;
1042
1043 transactions
1045 .results
1046 .sort_by(|a, b| b.created_at.cmp(&a.created_at));
1047
1048 Ok(transactions.results)
1049 }
1050
1051 async fn find_by_nonce(
1052 &self,
1053 relayer_id: &str,
1054 nonce: u64,
1055 ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
1056 let mut conn = self.client.as_ref().clone();
1057 let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
1058
1059 let tx_id: Option<String> = conn
1061 .get(nonce_key)
1062 .await
1063 .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
1064
1065 match tx_id {
1066 Some(tx_id) => {
1067 match self.get_by_id(tx_id.clone()).await {
1068 Ok(tx) => Ok(Some(tx)),
1069 Err(RepositoryError::NotFound(_)) => {
1070 warn!(relayer_id = %relayer_id, nonce = %nonce, "stale nonce index found for relayer");
1072 Ok(None)
1073 }
1074 Err(e) => Err(e),
1075 }
1076 }
1077 None => Ok(None),
1078 }
1079 }
1080
1081 async fn update_status(
1082 &self,
1083 tx_id: String,
1084 status: TransactionStatus,
1085 ) -> Result<TransactionRepoModel, RepositoryError> {
1086 let update = TransactionUpdateRequest {
1087 status: Some(status),
1088 ..Default::default()
1089 };
1090 self.partial_update(tx_id, update).await
1091 }
1092
1093 async fn partial_update(
1094 &self,
1095 tx_id: String,
1096 update: TransactionUpdateRequest,
1097 ) -> Result<TransactionRepoModel, RepositoryError> {
1098 const MAX_RETRIES: u32 = 3;
1099 const BACKOFF_MS: u64 = 100;
1100
1101 let original_tx = self.get_by_id(tx_id.clone()).await?;
1107 let mut updated_tx = original_tx.clone();
1108 updated_tx.apply_partial_update(update.clone());
1109
1110 let key = self.tx_key(&updated_tx.relayer_id, &tx_id);
1111 let value = self.serialize_entity(&updated_tx, |t| &t.id, "transaction")?;
1112
1113 let mut last_error = None;
1114
1115 for attempt in 0..MAX_RETRIES {
1116 let mut conn = self.client.as_ref().clone();
1117
1118 let result: Result<(), _> = conn.set(&key, &value).await;
1120 match result {
1121 Ok(_) => {}
1122 Err(e) => {
1123 if attempt < MAX_RETRIES - 1 {
1124 warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed to set transaction data, retrying");
1125 last_error = Some(self.map_redis_error(e, "partial_update"));
1126 tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
1127 continue;
1128 }
1129 return Err(self.map_redis_error(e, "partial_update"));
1130 }
1131 }
1132
1133 match self.update_indexes(&updated_tx, Some(&original_tx)).await {
1136 Ok(_) => {
1137 debug!(tx_id = %tx_id, attempt = %attempt, "successfully updated transaction");
1138 return Ok(updated_tx);
1139 }
1140 Err(e) if attempt < MAX_RETRIES - 1 => {
1141 warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed to update indexes, retrying");
1142 last_error = Some(e);
1143 tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
1144 continue;
1145 }
1146 Err(e) => return Err(e),
1147 }
1148 }
1149
1150 Err(last_error.unwrap_or_else(|| {
1151 RepositoryError::UnexpectedError("partial_update exhausted retries".to_string())
1152 }))
1153 }
1154
1155 async fn update_network_data(
1156 &self,
1157 tx_id: String,
1158 network_data: NetworkTransactionData,
1159 ) -> Result<TransactionRepoModel, RepositoryError> {
1160 let update = TransactionUpdateRequest {
1161 network_data: Some(network_data),
1162 ..Default::default()
1163 };
1164 self.partial_update(tx_id, update).await
1165 }
1166
1167 async fn set_sent_at(
1168 &self,
1169 tx_id: String,
1170 sent_at: String,
1171 ) -> Result<TransactionRepoModel, RepositoryError> {
1172 let update = TransactionUpdateRequest {
1173 sent_at: Some(sent_at),
1174 ..Default::default()
1175 };
1176 self.partial_update(tx_id, update).await
1177 }
1178
1179 async fn set_confirmed_at(
1180 &self,
1181 tx_id: String,
1182 confirmed_at: String,
1183 ) -> Result<TransactionRepoModel, RepositoryError> {
1184 let update = TransactionUpdateRequest {
1185 confirmed_at: Some(confirmed_at),
1186 ..Default::default()
1187 };
1188 self.partial_update(tx_id, update).await
1189 }
1190}
1191
1192#[cfg(test)]
1193mod tests {
1194 use super::*;
1195 use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
1196 use alloy::primitives::U256;
1197 use lazy_static::lazy_static;
1198 use redis::Client;
1199 use std::str::FromStr;
1200 use tokio;
1201 use uuid::Uuid;
1202
1203 use tokio::sync::Mutex;
1204
1205 lazy_static! {
1207 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
1208 }
1209
1210 fn create_test_transaction(id: &str) -> TransactionRepoModel {
1212 TransactionRepoModel {
1213 id: id.to_string(),
1214 relayer_id: "relayer-1".to_string(),
1215 status: TransactionStatus::Pending,
1216 status_reason: None,
1217 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
1218 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
1219 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
1220 valid_until: None,
1221 delete_at: None,
1222 network_type: NetworkType::Evm,
1223 priced_at: None,
1224 hashes: vec![],
1225 network_data: NetworkTransactionData::Evm(EvmTransactionData {
1226 gas_price: Some(1000000000),
1227 gas_limit: Some(21000),
1228 nonce: Some(1),
1229 value: U256::from_str("1000000000000000000").unwrap(),
1230 data: Some("0x".to_string()),
1231 from: "0xSender".to_string(),
1232 to: Some("0xRecipient".to_string()),
1233 chain_id: 1,
1234 signature: None,
1235 hash: Some(format!("0x{}", id)),
1236 speed: Some(Speed::Fast),
1237 max_fee_per_gas: None,
1238 max_priority_fee_per_gas: None,
1239 raw: None,
1240 }),
1241 noop_count: None,
1242 is_canceled: Some(false),
1243 }
1244 }
1245
1246 fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
1247 let mut tx = create_test_transaction(id);
1248 tx.relayer_id = relayer_id.to_string();
1249 tx
1250 }
1251
1252 fn create_test_transaction_with_status(
1253 id: &str,
1254 relayer_id: &str,
1255 status: TransactionStatus,
1256 ) -> TransactionRepoModel {
1257 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1258 tx.status = status;
1259 tx
1260 }
1261
1262 fn create_test_transaction_with_nonce(
1263 id: &str,
1264 nonce: u64,
1265 relayer_id: &str,
1266 ) -> TransactionRepoModel {
1267 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1268 if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
1269 evm_data.nonce = Some(nonce);
1270 }
1271 tx
1272 }
1273
1274 async fn setup_test_repo() -> RedisTransactionRepository {
1275 let redis_url = std::env::var("REDIS_TEST_URL")
1277 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1278
1279 let client = Client::open(redis_url).expect("Failed to create Redis client");
1280 let connection_manager = ConnectionManager::new(client)
1281 .await
1282 .expect("Failed to create connection manager");
1283
1284 let random_id = Uuid::new_v4().to_string();
1285 let key_prefix = format!("test_prefix:{}", random_id);
1286
1287 RedisTransactionRepository::new(Arc::new(connection_manager), key_prefix)
1288 .expect("Failed to create RedisTransactionRepository")
1289 }
1290
1291 #[tokio::test]
1292 #[ignore = "Requires active Redis instance"]
1293 async fn test_new_repository_creation() {
1294 let repo = setup_test_repo().await;
1295 assert!(repo.key_prefix.contains("test_prefix"));
1296 }
1297
1298 #[tokio::test]
1299 #[ignore = "Requires active Redis instance"]
1300 async fn test_new_repository_empty_prefix_fails() {
1301 let redis_url = std::env::var("REDIS_TEST_URL")
1302 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1303 let client = Client::open(redis_url).expect("Failed to create Redis client");
1304 let connection_manager = ConnectionManager::new(client)
1305 .await
1306 .expect("Failed to create connection manager");
1307
1308 let result = RedisTransactionRepository::new(Arc::new(connection_manager), "".to_string());
1309 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1310 }
1311
1312 #[tokio::test]
1313 #[ignore = "Requires active Redis instance"]
1314 async fn test_key_generation() {
1315 let repo = setup_test_repo().await;
1316
1317 assert!(repo
1318 .tx_key("relayer-1", "test-id")
1319 .contains(":relayer:relayer-1:tx:test-id"));
1320 assert!(repo
1321 .tx_to_relayer_key("test-id")
1322 .contains(":relayer:tx_to_relayer:test-id"));
1323 assert!(repo.relayer_list_key().contains(":relayer_list"));
1324 assert!(repo
1325 .relayer_status_key("relayer-1", &TransactionStatus::Pending)
1326 .contains(":relayer:relayer-1:status:Pending"));
1327 assert!(repo
1328 .relayer_nonce_key("relayer-1", 42)
1329 .contains(":relayer:relayer-1:nonce:42"));
1330 }
1331
1332 #[tokio::test]
1333 #[ignore = "Requires active Redis instance"]
1334 async fn test_serialize_deserialize_transaction() {
1335 let repo = setup_test_repo().await;
1336 let tx = create_test_transaction("test-1");
1337
1338 let serialized = repo
1339 .serialize_entity(&tx, |t| &t.id, "transaction")
1340 .expect("Serialization should succeed");
1341 let deserialized: TransactionRepoModel = repo
1342 .deserialize_entity(&serialized, "test-1", "transaction")
1343 .expect("Deserialization should succeed");
1344
1345 assert_eq!(tx.id, deserialized.id);
1346 assert_eq!(tx.relayer_id, deserialized.relayer_id);
1347 assert_eq!(tx.status, deserialized.status);
1348 }
1349
1350 #[tokio::test]
1351 #[ignore = "Requires active Redis instance"]
1352 async fn test_extract_nonce() {
1353 let repo = setup_test_repo().await;
1354 let random_id = Uuid::new_v4().to_string();
1355 let relayer_id = Uuid::new_v4().to_string();
1356 let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1357
1358 let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
1359 assert_eq!(nonce, Some(42));
1360 }
1361
1362 #[tokio::test]
1363 #[ignore = "Requires active Redis instance"]
1364 async fn test_create_transaction() {
1365 let repo = setup_test_repo().await;
1366 let random_id = Uuid::new_v4().to_string();
1367 let tx = create_test_transaction(&random_id);
1368
1369 let result = repo.create(tx.clone()).await.unwrap();
1370 assert_eq!(result.id, tx.id);
1371 }
1372
1373 #[tokio::test]
1374 #[ignore = "Requires active Redis instance"]
1375 async fn test_get_transaction() {
1376 let repo = setup_test_repo().await;
1377 let random_id = Uuid::new_v4().to_string();
1378 let tx = create_test_transaction(&random_id);
1379
1380 repo.create(tx.clone()).await.unwrap();
1381 let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
1382 assert_eq!(stored.id, tx.id);
1383 assert_eq!(stored.relayer_id, tx.relayer_id);
1384 }
1385
1386 #[tokio::test]
1387 #[ignore = "Requires active Redis instance"]
1388 async fn test_update_transaction() {
1389 let repo = setup_test_repo().await;
1390 let random_id = Uuid::new_v4().to_string();
1391 let mut tx = create_test_transaction(&random_id);
1392
1393 repo.create(tx.clone()).await.unwrap();
1394 tx.status = TransactionStatus::Confirmed;
1395
1396 let updated = repo.update(random_id.to_string(), tx).await.unwrap();
1397 assert!(matches!(updated.status, TransactionStatus::Confirmed));
1398 }
1399
1400 #[tokio::test]
1401 #[ignore = "Requires active Redis instance"]
1402 async fn test_delete_transaction() {
1403 let repo = setup_test_repo().await;
1404 let random_id = Uuid::new_v4().to_string();
1405 let tx = create_test_transaction(&random_id);
1406
1407 repo.create(tx).await.unwrap();
1408 repo.delete_by_id(random_id.to_string()).await.unwrap();
1409
1410 let result = repo.get_by_id(random_id.to_string()).await;
1411 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1412 }
1413
1414 #[tokio::test]
1415 #[ignore = "Requires active Redis instance"]
1416 async fn test_list_all_transactions() {
1417 let repo = setup_test_repo().await;
1418 let random_id = Uuid::new_v4().to_string();
1419 let random_id2 = Uuid::new_v4().to_string();
1420
1421 let tx1 = create_test_transaction(&random_id);
1422 let tx2 = create_test_transaction(&random_id2);
1423
1424 repo.create(tx1).await.unwrap();
1425 repo.create(tx2).await.unwrap();
1426
1427 let transactions = repo.list_all().await.unwrap();
1428 assert!(transactions.len() >= 2);
1429 }
1430
1431 #[tokio::test]
1432 #[ignore = "Requires active Redis instance"]
1433 async fn test_count_transactions() {
1434 let repo = setup_test_repo().await;
1435 let random_id = Uuid::new_v4().to_string();
1436 let tx = create_test_transaction(&random_id);
1437
1438 let count = repo.count().await.unwrap();
1439 repo.create(tx).await.unwrap();
1440 assert!(repo.count().await.unwrap() > count);
1441 }
1442
1443 #[tokio::test]
1444 #[ignore = "Requires active Redis instance"]
1445 async fn test_get_nonexistent_transaction() {
1446 let repo = setup_test_repo().await;
1447 let result = repo.get_by_id("nonexistent".to_string()).await;
1448 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1449 }
1450
1451 #[tokio::test]
1452 #[ignore = "Requires active Redis instance"]
1453 async fn test_duplicate_transaction_creation() {
1454 let repo = setup_test_repo().await;
1455 let random_id = Uuid::new_v4().to_string();
1456
1457 let tx = create_test_transaction(&random_id);
1458
1459 repo.create(tx.clone()).await.unwrap();
1460 let result = repo.create(tx).await;
1461
1462 assert!(matches!(
1463 result,
1464 Err(RepositoryError::ConstraintViolation(_))
1465 ));
1466 }
1467
1468 #[tokio::test]
1469 #[ignore = "Requires active Redis instance"]
1470 async fn test_update_nonexistent_transaction() {
1471 let repo = setup_test_repo().await;
1472 let tx = create_test_transaction("test-1");
1473
1474 let result = repo.update("nonexistent".to_string(), tx).await;
1475 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1476 }
1477
1478 #[tokio::test]
1479 #[ignore = "Requires active Redis instance"]
1480 async fn test_list_paginated() {
1481 let repo = setup_test_repo().await;
1482
1483 for _ in 1..=10 {
1485 let random_id = Uuid::new_v4().to_string();
1486 let tx = create_test_transaction(&random_id);
1487 repo.create(tx).await.unwrap();
1488 }
1489
1490 let query = PaginationQuery {
1492 page: 1,
1493 per_page: 3,
1494 };
1495 let result = repo.list_paginated(query).await.unwrap();
1496 assert_eq!(result.items.len(), 3);
1497 assert!(result.total >= 10);
1498 assert_eq!(result.page, 1);
1499 assert_eq!(result.per_page, 3);
1500
1501 let query = PaginationQuery {
1503 page: 1000,
1504 per_page: 3,
1505 };
1506 let result = repo.list_paginated(query).await.unwrap();
1507 assert_eq!(result.items.len(), 0);
1508 }
1509
1510 #[tokio::test]
1511 #[ignore = "Requires active Redis instance"]
1512 async fn test_find_by_relayer_id() {
1513 let repo = setup_test_repo().await;
1514 let random_id = Uuid::new_v4().to_string();
1515 let random_id2 = Uuid::new_v4().to_string();
1516 let random_id3 = Uuid::new_v4().to_string();
1517
1518 let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
1519 let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
1520 let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
1521
1522 repo.create(tx1).await.unwrap();
1523 repo.create(tx2).await.unwrap();
1524 repo.create(tx3).await.unwrap();
1525
1526 let query = PaginationQuery {
1528 page: 1,
1529 per_page: 10,
1530 };
1531 let result = repo
1532 .find_by_relayer_id("relayer-1", query.clone())
1533 .await
1534 .unwrap();
1535 assert!(result.total >= 2);
1536 assert!(result.items.len() >= 2);
1537 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
1538
1539 let result = repo
1541 .find_by_relayer_id("relayer-2", query.clone())
1542 .await
1543 .unwrap();
1544 assert!(result.total >= 1);
1545 assert!(!result.items.is_empty());
1546 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
1547
1548 let result = repo
1550 .find_by_relayer_id("non-existent", query.clone())
1551 .await
1552 .unwrap();
1553 assert_eq!(result.total, 0);
1554 assert_eq!(result.items.len(), 0);
1555 }
1556
1557 #[tokio::test]
1558 #[ignore = "Requires active Redis instance"]
1559 async fn test_find_by_relayer_id_sorted_by_created_at_newest_first() {
1560 let repo = setup_test_repo().await;
1561 let relayer_id = Uuid::new_v4().to_string();
1562
1563 let mut tx1 = create_test_transaction_with_relayer("test-1", &relayer_id);
1565 tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); let mut tx2 = create_test_transaction_with_relayer("test-2", &relayer_id);
1568 tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); let mut tx3 = create_test_transaction_with_relayer("test-3", &relayer_id);
1571 tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); repo.create(tx2.clone()).await.unwrap(); repo.create(tx1.clone()).await.unwrap(); repo.create(tx3.clone()).await.unwrap(); let query = PaginationQuery {
1579 page: 1,
1580 per_page: 10,
1581 };
1582 let result = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
1583
1584 assert_eq!(result.total, 3);
1585 assert_eq!(result.items.len(), 3);
1586
1587 assert_eq!(
1589 result.items[0].id, "test-3",
1590 "First item should be newest (test-3)"
1591 );
1592 assert_eq!(
1593 result.items[0].created_at,
1594 "2025-01-27T14:00:00.000000+00:00"
1595 );
1596
1597 assert_eq!(
1598 result.items[1].id, "test-2",
1599 "Second item should be middle (test-2)"
1600 );
1601 assert_eq!(
1602 result.items[1].created_at,
1603 "2025-01-27T12:00:00.000000+00:00"
1604 );
1605
1606 assert_eq!(
1607 result.items[2].id, "test-1",
1608 "Third item should be oldest (test-1)"
1609 );
1610 assert_eq!(
1611 result.items[2].created_at,
1612 "2025-01-27T10:00:00.000000+00:00"
1613 );
1614 }
1615
1616 #[tokio::test]
1617 #[ignore = "Requires active Redis instance"]
1618 async fn test_find_by_relayer_id_migration_from_old_index() {
1619 let repo = setup_test_repo().await;
1620 let relayer_id = Uuid::new_v4().to_string();
1621
1622 let mut tx1 = create_test_transaction_with_relayer("migrate-test-1", &relayer_id);
1624 tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); let mut tx2 = create_test_transaction_with_relayer("migrate-test-2", &relayer_id);
1627 tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); let mut tx3 = create_test_transaction_with_relayer("migrate-test-3", &relayer_id);
1630 tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); let mut conn = repo.client.as_ref().clone();
1635 let relayer_list_key = repo.relayer_list_key();
1636 let _: () = conn.sadd(&relayer_list_key, &relayer_id).await.unwrap();
1637
1638 for tx in &[&tx1, &tx2, &tx3] {
1639 let key = repo.tx_key(&tx.relayer_id, &tx.id);
1640 let reverse_key = repo.tx_to_relayer_key(&tx.id);
1641 let value = repo.serialize_entity(tx, |t| &t.id, "transaction").unwrap();
1642
1643 let mut pipe = redis::pipe();
1644 pipe.atomic();
1645 pipe.set(&key, &value);
1646 pipe.set(&reverse_key, &tx.relayer_id);
1647
1648 let status_key = repo.relayer_status_key(&tx.relayer_id, &tx.status);
1650 pipe.sadd(&status_key, &tx.id);
1651
1652 pipe.exec_async(&mut conn).await.unwrap();
1653 }
1654
1655 let relayer_sorted_key = repo.relayer_tx_by_created_at_key(&relayer_id);
1657 let count: u64 = conn.zcard(&relayer_sorted_key).await.unwrap();
1658 assert_eq!(count, 0, "Sorted set should be empty for old transactions");
1659
1660 let query = PaginationQuery {
1662 page: 1,
1663 per_page: 10,
1664 };
1665 let result = repo
1666 .find_by_relayer_id(&relayer_id, query.clone())
1667 .await
1668 .unwrap();
1669
1670 let count_after: u64 = conn.zcard(&relayer_sorted_key).await.unwrap();
1672 assert_eq!(
1673 count_after, 3,
1674 "Sorted set should be populated after migration"
1675 );
1676
1677 assert_eq!(result.total, 3);
1679 assert_eq!(result.items.len(), 3);
1680
1681 assert_eq!(
1682 result.items[0].id, "migrate-test-3",
1683 "First item should be newest after migration"
1684 );
1685 assert_eq!(
1686 result.items[0].created_at,
1687 "2025-01-27T14:00:00.000000+00:00"
1688 );
1689
1690 assert_eq!(
1691 result.items[1].id, "migrate-test-2",
1692 "Second item should be middle after migration"
1693 );
1694 assert_eq!(
1695 result.items[1].created_at,
1696 "2025-01-27T12:00:00.000000+00:00"
1697 );
1698
1699 assert_eq!(
1700 result.items[2].id, "migrate-test-1",
1701 "Third item should be oldest after migration"
1702 );
1703 assert_eq!(
1704 result.items[2].created_at,
1705 "2025-01-27T10:00:00.000000+00:00"
1706 );
1707
1708 let result2 = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
1710 assert_eq!(result2.total, 3);
1711 assert_eq!(result2.items.len(), 3);
1712 assert_eq!(result.items[0].id, result2.items[0].id);
1714 }
1715
1716 #[tokio::test]
1717 #[ignore = "Requires active Redis instance"]
1718 async fn test_find_by_status() {
1719 let repo = setup_test_repo().await;
1720 let random_id = Uuid::new_v4().to_string();
1721 let random_id2 = Uuid::new_v4().to_string();
1722 let random_id3 = Uuid::new_v4().to_string();
1723 let relayer_id = Uuid::new_v4().to_string();
1724 let tx1 = create_test_transaction_with_status(
1725 &random_id,
1726 &relayer_id,
1727 TransactionStatus::Pending,
1728 );
1729 let tx2 =
1730 create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
1731 let tx3 = create_test_transaction_with_status(
1732 &random_id3,
1733 &relayer_id,
1734 TransactionStatus::Confirmed,
1735 );
1736
1737 repo.create(tx1).await.unwrap();
1738 repo.create(tx2).await.unwrap();
1739 repo.create(tx3).await.unwrap();
1740
1741 let result = repo
1743 .find_by_status(&relayer_id, &[TransactionStatus::Pending])
1744 .await
1745 .unwrap();
1746 assert_eq!(result.len(), 1);
1747 assert_eq!(result[0].status, TransactionStatus::Pending);
1748
1749 let result = repo
1751 .find_by_status(
1752 &relayer_id,
1753 &[TransactionStatus::Pending, TransactionStatus::Sent],
1754 )
1755 .await
1756 .unwrap();
1757 assert_eq!(result.len(), 2);
1758
1759 let result = repo
1761 .find_by_status(&relayer_id, &[TransactionStatus::Failed])
1762 .await
1763 .unwrap();
1764 assert_eq!(result.len(), 0);
1765 }
1766
1767 #[tokio::test]
1768 #[ignore = "Requires active Redis instance"]
1769 async fn test_find_by_nonce() {
1770 let repo = setup_test_repo().await;
1771 let random_id = Uuid::new_v4().to_string();
1772 let random_id2 = Uuid::new_v4().to_string();
1773 let relayer_id = Uuid::new_v4().to_string();
1774
1775 let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1776 let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
1777
1778 repo.create(tx1.clone()).await.unwrap();
1779 repo.create(tx2).await.unwrap();
1780
1781 let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1783 assert!(result.is_some());
1784 assert_eq!(result.unwrap().id, random_id);
1785
1786 let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
1788 assert!(result.is_none());
1789
1790 let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
1792 assert!(result.is_none());
1793 }
1794
1795 #[tokio::test]
1796 #[ignore = "Requires active Redis instance"]
1797 async fn test_update_status() {
1798 let repo = setup_test_repo().await;
1799 let random_id = Uuid::new_v4().to_string();
1800 let tx = create_test_transaction(&random_id);
1801
1802 repo.create(tx).await.unwrap();
1803 let updated = repo
1804 .update_status(random_id.to_string(), TransactionStatus::Confirmed)
1805 .await
1806 .unwrap();
1807 assert_eq!(updated.status, TransactionStatus::Confirmed);
1808 }
1809
1810 #[tokio::test]
1811 #[ignore = "Requires active Redis instance"]
1812 async fn test_partial_update() {
1813 let repo = setup_test_repo().await;
1814 let random_id = Uuid::new_v4().to_string();
1815 let tx = create_test_transaction(&random_id);
1816
1817 repo.create(tx).await.unwrap();
1818
1819 let update = TransactionUpdateRequest {
1820 status: Some(TransactionStatus::Sent),
1821 status_reason: Some("Transaction sent".to_string()),
1822 sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
1823 confirmed_at: None,
1824 network_data: None,
1825 hashes: None,
1826 is_canceled: None,
1827 priced_at: None,
1828 noop_count: None,
1829 delete_at: None,
1830 };
1831
1832 let updated = repo
1833 .partial_update(random_id.to_string(), update)
1834 .await
1835 .unwrap();
1836 assert_eq!(updated.status, TransactionStatus::Sent);
1837 assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
1838 assert_eq!(
1839 updated.sent_at,
1840 Some("2025-01-27T16:00:00.000000+00:00".to_string())
1841 );
1842 }
1843
1844 #[tokio::test]
1845 #[ignore = "Requires active Redis instance"]
1846 async fn test_set_sent_at() {
1847 let repo = setup_test_repo().await;
1848 let random_id = Uuid::new_v4().to_string();
1849 let tx = create_test_transaction(&random_id);
1850
1851 repo.create(tx).await.unwrap();
1852 let updated = repo
1853 .set_sent_at(
1854 random_id.to_string(),
1855 "2025-01-27T16:00:00.000000+00:00".to_string(),
1856 )
1857 .await
1858 .unwrap();
1859 assert_eq!(
1860 updated.sent_at,
1861 Some("2025-01-27T16:00:00.000000+00:00".to_string())
1862 );
1863 }
1864
1865 #[tokio::test]
1866 #[ignore = "Requires active Redis instance"]
1867 async fn test_set_confirmed_at() {
1868 let repo = setup_test_repo().await;
1869 let random_id = Uuid::new_v4().to_string();
1870 let tx = create_test_transaction(&random_id);
1871
1872 repo.create(tx).await.unwrap();
1873 let updated = repo
1874 .set_confirmed_at(
1875 random_id.to_string(),
1876 "2025-01-27T16:00:00.000000+00:00".to_string(),
1877 )
1878 .await
1879 .unwrap();
1880 assert_eq!(
1881 updated.confirmed_at,
1882 Some("2025-01-27T16:00:00.000000+00:00".to_string())
1883 );
1884 }
1885
1886 #[tokio::test]
1887 #[ignore = "Requires active Redis instance"]
1888 async fn test_update_network_data() {
1889 let repo = setup_test_repo().await;
1890 let random_id = Uuid::new_v4().to_string();
1891 let tx = create_test_transaction(&random_id);
1892
1893 repo.create(tx).await.unwrap();
1894
1895 let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
1896 gas_price: Some(2000000000),
1897 gas_limit: Some(42000),
1898 nonce: Some(2),
1899 value: U256::from_str("2000000000000000000").unwrap(),
1900 data: Some("0x1234".to_string()),
1901 from: "0xNewSender".to_string(),
1902 to: Some("0xNewRecipient".to_string()),
1903 chain_id: 1,
1904 signature: None,
1905 hash: Some("0xnewhash".to_string()),
1906 speed: Some(Speed::SafeLow),
1907 max_fee_per_gas: None,
1908 max_priority_fee_per_gas: None,
1909 raw: None,
1910 });
1911
1912 let updated = repo
1913 .update_network_data(random_id.to_string(), new_network_data.clone())
1914 .await
1915 .unwrap();
1916 assert_eq!(
1917 updated
1918 .network_data
1919 .get_evm_transaction_data()
1920 .unwrap()
1921 .hash,
1922 new_network_data.get_evm_transaction_data().unwrap().hash
1923 );
1924 }
1925
1926 #[tokio::test]
1927 #[ignore = "Requires active Redis instance"]
1928 async fn test_debug_implementation() {
1929 let repo = setup_test_repo().await;
1930 let debug_str = format!("{:?}", repo);
1931 assert!(debug_str.contains("RedisTransactionRepository"));
1932 assert!(debug_str.contains("test_prefix"));
1933 }
1934
1935 #[tokio::test]
1936 #[ignore = "Requires active Redis instance"]
1937 async fn test_error_handling_empty_id() {
1938 let repo = setup_test_repo().await;
1939
1940 let result = repo.get_by_id("".to_string()).await;
1941 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1942
1943 let result = repo
1944 .update("".to_string(), create_test_transaction("test"))
1945 .await;
1946 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1947
1948 let result = repo.delete_by_id("".to_string()).await;
1949 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1950 }
1951
1952 #[tokio::test]
1953 #[ignore = "Requires active Redis instance"]
1954 async fn test_pagination_validation() {
1955 let repo = setup_test_repo().await;
1956
1957 let query = PaginationQuery {
1958 page: 1,
1959 per_page: 0,
1960 };
1961 let result = repo.list_paginated(query).await;
1962 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1963 }
1964
1965 #[tokio::test]
1966 #[ignore = "Requires active Redis instance"]
1967 async fn test_index_consistency() {
1968 let repo = setup_test_repo().await;
1969 let random_id = Uuid::new_v4().to_string();
1970 let relayer_id = Uuid::new_v4().to_string();
1971 let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1972
1973 repo.create(tx.clone()).await.unwrap();
1975
1976 let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1978 assert!(found.is_some());
1979
1980 let mut updated_tx = tx.clone();
1982 if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
1983 evm_data.nonce = Some(43);
1984 }
1985
1986 repo.update(random_id.to_string(), updated_tx)
1987 .await
1988 .unwrap();
1989
1990 let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1992 assert!(old_nonce_result.is_none());
1993
1994 let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
1996 assert!(new_nonce_result.is_some());
1997 }
1998
1999 #[tokio::test]
2000 #[ignore = "Requires active Redis instance"]
2001 async fn test_has_entries() {
2002 let repo = setup_test_repo().await;
2003 assert!(!repo.has_entries().await.unwrap());
2004
2005 let tx_id = uuid::Uuid::new_v4().to_string();
2006 let tx = create_test_transaction(&tx_id);
2007 repo.create(tx.clone()).await.unwrap();
2008
2009 assert!(repo.has_entries().await.unwrap());
2010 }
2011
2012 #[tokio::test]
2013 #[ignore = "Requires active Redis instance"]
2014 async fn test_drop_all_entries() {
2015 let repo = setup_test_repo().await;
2016 let tx_id = uuid::Uuid::new_v4().to_string();
2017 let tx = create_test_transaction(&tx_id);
2018 repo.create(tx.clone()).await.unwrap();
2019 assert!(repo.has_entries().await.unwrap());
2020
2021 repo.drop_all_entries().await.unwrap();
2022 assert!(!repo.has_entries().await.unwrap());
2023 }
2024
2025 #[tokio::test]
2027 #[ignore = "Requires active Redis instance"]
2028 async fn test_update_status_sets_delete_at_for_final_statuses() {
2029 let _lock = ENV_MUTEX.lock().await;
2030
2031 use chrono::{DateTime, Duration, Utc};
2032 use std::env;
2033
2034 env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
2036
2037 let repo = setup_test_repo().await;
2038
2039 let final_statuses = [
2040 TransactionStatus::Canceled,
2041 TransactionStatus::Confirmed,
2042 TransactionStatus::Failed,
2043 TransactionStatus::Expired,
2044 ];
2045
2046 for (i, status) in final_statuses.iter().enumerate() {
2047 let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
2048 let mut tx = create_test_transaction(&tx_id);
2049
2050 tx.delete_at = None;
2052 tx.status = TransactionStatus::Pending;
2053
2054 repo.create(tx).await.unwrap();
2055
2056 let before_update = Utc::now();
2057
2058 let updated = repo
2060 .update_status(tx_id.clone(), status.clone())
2061 .await
2062 .unwrap();
2063
2064 assert!(
2066 updated.delete_at.is_some(),
2067 "delete_at should be set for status: {:?}",
2068 status
2069 );
2070
2071 let delete_at_str = updated.delete_at.unwrap();
2073 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
2074 .expect("delete_at should be valid RFC3339")
2075 .with_timezone(&Utc);
2076
2077 let duration_from_before = delete_at.signed_duration_since(before_update);
2078 let expected_duration = Duration::hours(6);
2079 let tolerance = Duration::minutes(5);
2080
2081 assert!(
2082 duration_from_before >= expected_duration - tolerance &&
2083 duration_from_before <= expected_duration + tolerance,
2084 "delete_at should be approximately 6 hours from now for status: {:?}. Duration: {:?}",
2085 status, duration_from_before
2086 );
2087 }
2088
2089 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2091 }
2092
2093 #[tokio::test]
2094 #[ignore = "Requires active Redis instance"]
2095 async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
2096 let _lock = ENV_MUTEX.lock().await;
2097
2098 use std::env;
2099
2100 env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
2101
2102 let repo = setup_test_repo().await;
2103
2104 let non_final_statuses = [
2105 TransactionStatus::Pending,
2106 TransactionStatus::Sent,
2107 TransactionStatus::Submitted,
2108 TransactionStatus::Mined,
2109 ];
2110
2111 for (i, status) in non_final_statuses.iter().enumerate() {
2112 let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
2113 let mut tx = create_test_transaction(&tx_id);
2114 tx.delete_at = None;
2115 tx.status = TransactionStatus::Pending;
2116
2117 repo.create(tx).await.unwrap();
2118
2119 let updated = repo
2121 .update_status(tx_id.clone(), status.clone())
2122 .await
2123 .unwrap();
2124
2125 assert!(
2127 updated.delete_at.is_none(),
2128 "delete_at should NOT be set for status: {:?}",
2129 status
2130 );
2131 }
2132
2133 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2135 }
2136
2137 #[tokio::test]
2138 #[ignore = "Requires active Redis instance"]
2139 async fn test_partial_update_sets_delete_at_for_final_statuses() {
2140 let _lock = ENV_MUTEX.lock().await;
2141
2142 use chrono::{DateTime, Duration, Utc};
2143 use std::env;
2144
2145 env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
2146
2147 let repo = setup_test_repo().await;
2148 let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
2149 let mut tx = create_test_transaction(&tx_id);
2150 tx.delete_at = None;
2151 tx.status = TransactionStatus::Pending;
2152
2153 repo.create(tx).await.unwrap();
2154
2155 let before_update = Utc::now();
2156
2157 let update = TransactionUpdateRequest {
2159 status: Some(TransactionStatus::Confirmed),
2160 status_reason: Some("Transaction completed".to_string()),
2161 confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
2162 ..Default::default()
2163 };
2164
2165 let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
2166
2167 assert!(
2169 updated.delete_at.is_some(),
2170 "delete_at should be set when updating to Confirmed status"
2171 );
2172
2173 let delete_at_str = updated.delete_at.unwrap();
2175 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
2176 .expect("delete_at should be valid RFC3339")
2177 .with_timezone(&Utc);
2178
2179 let duration_from_before = delete_at.signed_duration_since(before_update);
2180 let expected_duration = Duration::hours(8);
2181 let tolerance = Duration::minutes(5);
2182
2183 assert!(
2184 duration_from_before >= expected_duration - tolerance
2185 && duration_from_before <= expected_duration + tolerance,
2186 "delete_at should be approximately 8 hours from now. Duration: {:?}",
2187 duration_from_before
2188 );
2189
2190 assert_eq!(updated.status, TransactionStatus::Confirmed);
2192 assert_eq!(
2193 updated.status_reason,
2194 Some("Transaction completed".to_string())
2195 );
2196 assert_eq!(
2197 updated.confirmed_at,
2198 Some("2023-01-01T12:05:00Z".to_string())
2199 );
2200
2201 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2203 }
2204
2205 #[tokio::test]
2206 #[ignore = "Requires active Redis instance"]
2207 async fn test_update_status_preserves_existing_delete_at() {
2208 let _lock = ENV_MUTEX.lock().await;
2209
2210 use std::env;
2211
2212 env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
2213
2214 let repo = setup_test_repo().await;
2215 let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
2216 let mut tx = create_test_transaction(&tx_id);
2217
2218 let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
2220 tx.delete_at = Some(existing_delete_at.clone());
2221 tx.status = TransactionStatus::Pending;
2222
2223 repo.create(tx).await.unwrap();
2224
2225 let updated = repo
2227 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
2228 .await
2229 .unwrap();
2230
2231 assert_eq!(
2233 updated.delete_at,
2234 Some(existing_delete_at),
2235 "Existing delete_at should be preserved when updating to final status"
2236 );
2237
2238 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2240 }
2241 #[tokio::test]
2242 #[ignore = "Requires active Redis instance"]
2243 async fn test_partial_update_without_status_change_preserves_delete_at() {
2244 let _lock = ENV_MUTEX.lock().await;
2245
2246 use std::env;
2247
2248 env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
2249
2250 let repo = setup_test_repo().await;
2251 let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
2252 let mut tx = create_test_transaction(&tx_id);
2253 tx.delete_at = None;
2254 tx.status = TransactionStatus::Pending;
2255
2256 repo.create(tx).await.unwrap();
2257
2258 let updated1 = repo
2260 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
2261 .await
2262 .unwrap();
2263
2264 assert!(updated1.delete_at.is_some());
2265 let original_delete_at = updated1.delete_at.clone();
2266
2267 let update = TransactionUpdateRequest {
2269 status: None, status_reason: Some("Updated reason".to_string()),
2271 confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
2272 ..Default::default()
2273 };
2274
2275 let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
2276
2277 assert_eq!(
2279 updated2.delete_at, original_delete_at,
2280 "delete_at should be preserved when status is not updated"
2281 );
2282
2283 assert_eq!(updated2.status, TransactionStatus::Confirmed); assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
2286 assert_eq!(
2287 updated2.confirmed_at,
2288 Some("2023-01-01T12:10:00Z".to_string())
2289 );
2290
2291 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2293 }
2294}