openzeppelin_relayer/repositories/relayer/
relayer_redis.rs

1//! Redis-backed implementation of the RelayerRepository.
2
3use crate::models::UpdateRelayerRequest;
4use crate::models::{
5    DisabledReason, PaginationQuery, RelayerNetworkPolicy, RelayerRepoModel, RepositoryError,
6};
7use crate::repositories::redis_base::RedisRepository;
8use crate::repositories::{BatchRetrievalResult, PaginatedResult, RelayerRepository, Repository};
9use async_trait::async_trait;
10use redis::aio::ConnectionManager;
11use redis::AsyncCommands;
12use std::fmt;
13use std::sync::Arc;
14use tracing::{debug, error, warn};
15
16const RELAYER_PREFIX: &str = "relayer";
17const RELAYER_LIST_KEY: &str = "relayer_list";
18
19#[derive(Clone)]
20pub struct RedisRelayerRepository {
21    pub client: Arc<ConnectionManager>,
22    pub key_prefix: String,
23}
24
25impl RedisRepository for RedisRelayerRepository {}
26
27impl RedisRelayerRepository {
28    pub fn new(
29        connection_manager: Arc<ConnectionManager>,
30        key_prefix: String,
31    ) -> Result<Self, RepositoryError> {
32        if key_prefix.is_empty() {
33            return Err(RepositoryError::InvalidData(
34                "Redis key prefix cannot be empty".to_string(),
35            ));
36        }
37
38        Ok(Self {
39            client: connection_manager,
40            key_prefix,
41        })
42    }
43
44    /// Generate key for relayer data: relayer:{relayer_id}
45    fn relayer_key(&self, relayer_id: &str) -> String {
46        format!("{}:{}:{}", self.key_prefix, RELAYER_PREFIX, relayer_id)
47    }
48
49    /// Generate key for relayer list: relayer_list (set of all relayer IDs)
50    fn relayer_list_key(&self) -> String {
51        format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
52    }
53
54    /// Batch fetch relayers by IDs
55    async fn get_relayers_by_ids(
56        &self,
57        ids: &[String],
58    ) -> Result<BatchRetrievalResult<RelayerRepoModel>, RepositoryError> {
59        if ids.is_empty() {
60            debug!("no relayer IDs provided for batch fetch");
61            return Ok(BatchRetrievalResult {
62                results: vec![],
63                failed_ids: vec![],
64            });
65        }
66
67        let mut conn = self.client.as_ref().clone();
68        let keys: Vec<String> = ids.iter().map(|id| self.relayer_key(id)).collect();
69
70        debug!(count = %keys.len(), "batch fetching relayer data");
71
72        let values: Vec<Option<String>> = conn
73            .mget(&keys)
74            .await
75            .map_err(|e| self.map_redis_error(e, "batch_fetch_relayers"))?;
76
77        let mut relayers = Vec::new();
78        let mut failed_count = 0;
79        let mut failed_ids = Vec::new();
80        for (i, value) in values.into_iter().enumerate() {
81            match value {
82                Some(json) => {
83                    match self.deserialize_entity(&json, &ids[i], "relayer") {
84                        Ok(relayer) => relayers.push(relayer),
85                        Err(e) => {
86                            failed_count += 1;
87                            error!(relayer_id = %ids[i], error = %e, "failed to deserialize relayer");
88                            failed_ids.push(ids[i].clone());
89                            // Continue processing other relayers
90                        }
91                    }
92                }
93                None => {
94                    warn!(relayer_id = %ids[i], "relayer not found in batch fetch");
95                }
96            }
97        }
98
99        if failed_count > 0 {
100            warn!(failed_count = %failed_count, total_count = %ids.len(), "failed to deserialize relayers in batch");
101        }
102
103        debug!(count = %relayers.len(), "successfully fetched relayers");
104        Ok(BatchRetrievalResult {
105            results: relayers,
106            failed_ids,
107        })
108    }
109}
110
111impl fmt::Debug for RedisRelayerRepository {
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113        f.debug_struct("RedisRelayerRepository")
114            .field("client", &"<ConnectionManager>")
115            .field("key_prefix", &self.key_prefix)
116            .finish()
117    }
118}
119
120#[async_trait]
121impl Repository<RelayerRepoModel, String> for RedisRelayerRepository {
122    async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError> {
123        if entity.id.is_empty() {
124            return Err(RepositoryError::InvalidData(
125                "Relayer ID cannot be empty".to_string(),
126            ));
127        }
128
129        if entity.name.is_empty() {
130            return Err(RepositoryError::InvalidData(
131                "Relayer name cannot be empty".to_string(),
132            ));
133        }
134
135        let mut conn = self.client.as_ref().clone();
136        let relayer_key = self.relayer_key(&entity.id);
137
138        // Check if relayer already exists
139        let exists: bool = conn
140            .exists(&relayer_key)
141            .await
142            .map_err(|e| self.map_redis_error(e, "create_relayer_exists_check"))?;
143
144        if exists {
145            return Err(RepositoryError::ConstraintViolation(format!(
146                "Relayer with ID {} already exists",
147                entity.id
148            )));
149        }
150
151        let serialized = self.serialize_entity(&entity, |r| &r.id, "relayer")?;
152
153        // Use pipeline for atomic operations
154        let mut pipe = redis::pipe();
155        pipe.atomic();
156        pipe.set(&relayer_key, &serialized);
157        pipe.sadd(self.relayer_list_key(), &entity.id);
158
159        pipe.exec_async(&mut conn)
160            .await
161            .map_err(|e| self.map_redis_error(e, "create_relayer_pipeline"))?;
162
163        debug!(relayer_id = %entity.id, "created relayer");
164        Ok(entity)
165    }
166
167    async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError> {
168        if id.is_empty() {
169            return Err(RepositoryError::InvalidData(
170                "Relayer ID cannot be empty".to_string(),
171            ));
172        }
173
174        let mut conn = self.client.as_ref().clone();
175        let relayer_key = self.relayer_key(&id);
176
177        debug!(relayer_id = %id, "fetching relayer");
178
179        let json: Option<String> = conn
180            .get(&relayer_key)
181            .await
182            .map_err(|e| self.map_redis_error(e, "get_relayer_by_id"))?;
183
184        match json {
185            Some(json) => {
186                debug!(relayer_id = %id, "found relayer");
187                self.deserialize_entity(&json, &id, "relayer")
188            }
189            None => {
190                debug!(relayer_id = %id, "relayer not found");
191                Err(RepositoryError::NotFound(format!(
192                    "Relayer with ID {id} not found"
193                )))
194            }
195        }
196    }
197
198    async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
199        let mut conn = self.client.as_ref().clone();
200        let relayer_list_key = self.relayer_list_key();
201
202        debug!("listing all relayers");
203
204        let relayer_ids: Vec<String> = conn
205            .smembers(&relayer_list_key)
206            .await
207            .map_err(|e| self.map_redis_error(e, "list_all_relayers"))?;
208
209        debug!(count = %relayer_ids.len(), "found relayers in index");
210
211        let relayers = self.get_relayers_by_ids(&relayer_ids).await?;
212        Ok(relayers.results)
213    }
214
215    async fn list_paginated(
216        &self,
217        query: PaginationQuery,
218    ) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError> {
219        if query.page == 0 {
220            return Err(RepositoryError::InvalidData(
221                "Page number must be greater than 0".to_string(),
222            ));
223        }
224
225        if query.per_page == 0 {
226            return Err(RepositoryError::InvalidData(
227                "Per page count must be greater than 0".to_string(),
228            ));
229        }
230
231        let mut conn = self.client.as_ref().clone();
232        let relayer_list_key = self.relayer_list_key();
233
234        // Get total count
235        let total: u64 = conn
236            .scard(&relayer_list_key)
237            .await
238            .map_err(|e| self.map_redis_error(e, "list_paginated_count"))?;
239
240        if total == 0 {
241            return Ok(PaginatedResult {
242                items: vec![],
243                total: 0,
244                page: query.page,
245                per_page: query.per_page,
246            });
247        }
248
249        // Get all IDs and paginate in memory
250        let all_ids: Vec<String> = conn
251            .smembers(&relayer_list_key)
252            .await
253            .map_err(|e| self.map_redis_error(e, "list_paginated_members"))?;
254
255        let start = ((query.page - 1) * query.per_page) as usize;
256        let end = (start + query.per_page as usize).min(all_ids.len());
257
258        let page_ids = &all_ids[start..end];
259        let items = self.get_relayers_by_ids(page_ids).await?;
260
261        Ok(PaginatedResult {
262            items: items.results.clone(),
263            total,
264            page: query.page,
265            per_page: query.per_page,
266        })
267    }
268
269    async fn update(
270        &self,
271        id: String,
272        entity: RelayerRepoModel,
273    ) -> Result<RelayerRepoModel, RepositoryError> {
274        if id.is_empty() {
275            return Err(RepositoryError::InvalidData(
276                "Relayer ID cannot be empty".to_string(),
277            ));
278        }
279
280        if entity.name.is_empty() {
281            return Err(RepositoryError::InvalidData(
282                "Relayer name cannot be empty".to_string(),
283            ));
284        }
285
286        let mut conn = self.client.as_ref().clone();
287        let relayer_key = self.relayer_key(&id);
288
289        // Check if relayer exists
290        let exists: bool = conn
291            .exists(&relayer_key)
292            .await
293            .map_err(|e| self.map_redis_error(e, "update_relayer_exists_check"))?;
294
295        if !exists {
296            return Err(RepositoryError::NotFound(format!(
297                "Relayer with ID {id} not found"
298            )));
299        }
300
301        // Ensure we preserve the original ID
302        let mut updated_entity = entity;
303        updated_entity.id = id.clone();
304
305        let serialized = self.serialize_entity(&updated_entity, |r| &r.id, "relayer")?;
306
307        // Use pipeline for atomic operations
308        let mut pipe = redis::pipe();
309        pipe.atomic();
310        pipe.set(&relayer_key, &serialized);
311        pipe.sadd(self.relayer_list_key(), &id);
312
313        pipe.exec_async(&mut conn)
314            .await
315            .map_err(|e| self.map_redis_error(e, "update_relayer_pipeline"))?;
316
317        debug!(relayer_id = %id, "updated relayer");
318        Ok(updated_entity)
319    }
320
321    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
322        if id.is_empty() {
323            return Err(RepositoryError::InvalidData(
324                "Relayer ID cannot be empty".to_string(),
325            ));
326        }
327
328        let mut conn = self.client.as_ref().clone();
329        let relayer_key = self.relayer_key(&id);
330
331        // Check if relayer exists
332        let exists: bool = conn
333            .exists(&relayer_key)
334            .await
335            .map_err(|e| self.map_redis_error(e, "delete_relayer_exists_check"))?;
336
337        if !exists {
338            return Err(RepositoryError::NotFound(format!(
339                "Relayer with ID {id} not found"
340            )));
341        }
342
343        // Use pipeline for atomic operations
344        let mut pipe = redis::pipe();
345        pipe.atomic();
346        pipe.del(&relayer_key);
347        pipe.srem(self.relayer_list_key(), &id);
348
349        pipe.exec_async(&mut conn)
350            .await
351            .map_err(|e| self.map_redis_error(e, "delete_relayer_pipeline"))?;
352
353        debug!(relayer_id = %id, "deleted relayer");
354        Ok(())
355    }
356
357    async fn count(&self) -> Result<usize, RepositoryError> {
358        let mut conn = self.client.as_ref().clone();
359        let relayer_list_key = self.relayer_list_key();
360
361        let count: u64 = conn
362            .scard(&relayer_list_key)
363            .await
364            .map_err(|e| self.map_redis_error(e, "count_relayers"))?;
365
366        Ok(count as usize)
367    }
368
369    async fn has_entries(&self) -> Result<bool, RepositoryError> {
370        let mut conn = self.client.as_ref().clone();
371        let relayer_list_key = self.relayer_list_key();
372
373        debug!("checking if relayer entries exist");
374
375        let exists: bool = conn
376            .exists(&relayer_list_key)
377            .await
378            .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
379
380        debug!(exists = %exists, "relayer entries exist");
381        Ok(exists)
382    }
383
384    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
385        let mut conn = self.client.as_ref().clone();
386        let relayer_list_key = self.relayer_list_key();
387
388        debug!("dropping all relayer entries");
389
390        // Get all relayer IDs first
391        let relayer_ids: Vec<String> = conn
392            .smembers(&relayer_list_key)
393            .await
394            .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_ids"))?;
395
396        if relayer_ids.is_empty() {
397            debug!("no relayer entries to drop");
398            return Ok(());
399        }
400
401        // Use pipeline for atomic operations
402        let mut pipe = redis::pipe();
403        pipe.atomic();
404
405        // Delete all individual relayer entries
406        for relayer_id in &relayer_ids {
407            let relayer_key = self.relayer_key(relayer_id);
408            pipe.del(&relayer_key);
409        }
410
411        // Delete the relayer list key
412        pipe.del(&relayer_list_key);
413
414        pipe.exec_async(&mut conn)
415            .await
416            .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
417
418        debug!(count = %relayer_ids.len(), "dropped relayer entries");
419        Ok(())
420    }
421}
422
423#[async_trait]
424impl RelayerRepository for RedisRelayerRepository {
425    async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
426        let all_relayers = self.list_all().await?;
427        let active_relayers: Vec<RelayerRepoModel> = all_relayers
428            .into_iter()
429            .filter(|relayer| !relayer.paused)
430            .collect();
431
432        debug!(count = %active_relayers.len(), "found active relayers");
433        Ok(active_relayers)
434    }
435
436    async fn list_by_signer_id(
437        &self,
438        signer_id: &str,
439    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
440        let all_relayers = self.list_all().await?;
441        let relayers_with_signer: Vec<RelayerRepoModel> = all_relayers
442            .into_iter()
443            .filter(|relayer| relayer.signer_id == signer_id)
444            .collect();
445
446        debug!(count = %relayers_with_signer.len(), signer_id = %signer_id, "found relayers using signer");
447        Ok(relayers_with_signer)
448    }
449
450    async fn list_by_notification_id(
451        &self,
452        notification_id: &str,
453    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
454        let all_relayers = self.list_all().await?;
455        let relayers_with_notification: Vec<RelayerRepoModel> = all_relayers
456            .into_iter()
457            .filter(|relayer| {
458                relayer
459                    .notification_id
460                    .as_ref()
461                    .is_some_and(|id| id == notification_id)
462            })
463            .collect();
464
465        debug!(count = %relayers_with_notification.len(), notification_id = %notification_id, "found relayers using notification");
466        Ok(relayers_with_notification)
467    }
468
469    async fn partial_update(
470        &self,
471        id: String,
472        update: UpdateRelayerRequest,
473    ) -> Result<RelayerRepoModel, RepositoryError> {
474        // First get the current relayer
475        let mut relayer = self.get_by_id(id.clone()).await?;
476
477        // Apply the partial update
478        if let Some(paused) = update.paused {
479            relayer.paused = paused;
480        }
481
482        // Update the relayer
483        self.update(id, relayer).await
484    }
485
486    async fn enable_relayer(
487        &self,
488        relayer_id: String,
489    ) -> Result<RelayerRepoModel, RepositoryError> {
490        // First get the current relayer
491        let mut relayer = self.get_by_id(relayer_id.clone()).await?;
492
493        // Update the system_disabled flag and clear reason
494        relayer.system_disabled = false;
495        relayer.disabled_reason = None;
496
497        // Update the relayer
498        self.update(relayer_id, relayer).await
499    }
500
501    async fn disable_relayer(
502        &self,
503        relayer_id: String,
504        reason: DisabledReason,
505    ) -> Result<RelayerRepoModel, RepositoryError> {
506        // First get the current relayer
507        let mut relayer = self.get_by_id(relayer_id.clone()).await?;
508
509        // Update the system_disabled flag and set reason
510        relayer.system_disabled = true;
511        relayer.disabled_reason = Some(reason);
512
513        // Update the relayer
514        self.update(relayer_id, relayer).await
515    }
516
517    async fn update_policy(
518        &self,
519        id: String,
520        policy: RelayerNetworkPolicy,
521    ) -> Result<RelayerRepoModel, RepositoryError> {
522        // First get the current relayer
523        let mut relayer = self.get_by_id(id.clone()).await?;
524
525        // Update the policy
526        relayer.policies = policy;
527
528        // Update the relayer
529        self.update(id, relayer).await
530    }
531
532    fn is_persistent_storage(&self) -> bool {
533        true
534    }
535}
536
537#[cfg(test)]
538mod tests {
539    use super::*;
540    use crate::models::{NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy};
541    use redis::aio::ConnectionManager;
542    use std::sync::Arc;
543
544    fn create_test_relayer(id: &str) -> RelayerRepoModel {
545        RelayerRepoModel {
546            id: id.to_string(),
547            name: format!("Test Relayer {}", id),
548            network: "ethereum".to_string(),
549            paused: false,
550            network_type: NetworkType::Evm,
551            signer_id: "test-signer".to_string(),
552            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
553            address: "0x742d35Cc6634C0532925a3b844Bc454e4438f44e".to_string(),
554            notification_id: None,
555            system_disabled: false,
556            disabled_reason: None,
557            custom_rpc_urls: None,
558        }
559    }
560
561    fn create_test_relayer_with_pause(id: &str, paused: bool) -> RelayerRepoModel {
562        let mut relayer = create_test_relayer(id);
563        relayer.paused = paused;
564        relayer
565    }
566
567    async fn setup_test_repo() -> RedisRelayerRepository {
568        let redis_url =
569            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
570        let client = redis::Client::open(redis_url).expect("Failed to create Redis client");
571        let connection_manager = ConnectionManager::new(client)
572            .await
573            .expect("Failed to create Redis connection manager");
574
575        RedisRelayerRepository::new(Arc::new(connection_manager), "test".to_string())
576            .expect("Failed to create Redis relayer repository")
577    }
578
579    #[ignore = "Requires active Redis instance"]
580    #[tokio::test]
581    async fn test_new_repository_creation() {
582        let repo = setup_test_repo().await;
583        assert_eq!(repo.key_prefix, "test");
584    }
585
586    #[ignore = "Requires active Redis instance"]
587    #[tokio::test]
588    async fn test_new_repository_empty_prefix_fails() {
589        let redis_url =
590            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
591        let client = redis::Client::open(redis_url).expect("Failed to create Redis client");
592        let connection_manager = ConnectionManager::new(client)
593            .await
594            .expect("Failed to create Redis connection manager");
595
596        let result = RedisRelayerRepository::new(Arc::new(connection_manager), "".to_string());
597        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
598    }
599
600    #[ignore = "Requires active Redis instance"]
601    #[tokio::test]
602    async fn test_key_generation() {
603        let repo = setup_test_repo().await;
604
605        let relayer_key = repo.relayer_key("test-relayer");
606        assert_eq!(relayer_key, "test:relayer:test-relayer");
607
608        let list_key = repo.relayer_list_key();
609        assert_eq!(list_key, "test:relayer_list");
610    }
611
612    #[ignore = "Requires active Redis instance"]
613    #[tokio::test]
614    async fn test_serialize_deserialize_relayer() {
615        let repo = setup_test_repo().await;
616        let relayer = create_test_relayer("test-relayer");
617
618        let serialized = repo
619            .serialize_entity(&relayer, |r| &r.id, "relayer")
620            .unwrap();
621        let deserialized: RelayerRepoModel = repo
622            .deserialize_entity(&serialized, &relayer.id, "relayer")
623            .unwrap();
624
625        assert_eq!(relayer.id, deserialized.id);
626        assert_eq!(relayer.name, deserialized.name);
627        assert_eq!(relayer.network, deserialized.network);
628        assert_eq!(relayer.paused, deserialized.paused);
629        assert_eq!(relayer.network_type, deserialized.network_type);
630        assert_eq!(relayer.signer_id, deserialized.signer_id);
631        assert_eq!(relayer.address, deserialized.address);
632        assert_eq!(relayer.notification_id, deserialized.notification_id);
633        assert_eq!(relayer.system_disabled, deserialized.system_disabled);
634        assert_eq!(relayer.custom_rpc_urls, deserialized.custom_rpc_urls);
635    }
636
637    #[ignore = "Requires active Redis instance"]
638    #[tokio::test]
639    async fn test_create_relayer() {
640        let repo = setup_test_repo().await;
641        let relayer_id = uuid::Uuid::new_v4().to_string();
642        let relayer = create_test_relayer(&relayer_id);
643
644        let result = repo.create(relayer.clone()).await;
645        assert!(result.is_ok());
646
647        let created_relayer = result.unwrap();
648        assert_eq!(created_relayer.id, relayer_id);
649        assert_eq!(created_relayer.name, relayer.name);
650    }
651
652    #[ignore = "Requires active Redis instance"]
653    #[tokio::test]
654    async fn test_get_relayer() {
655        let repo = setup_test_repo().await;
656        let relayer_id = uuid::Uuid::new_v4().to_string();
657        let relayer = create_test_relayer(&relayer_id);
658
659        repo.create(relayer.clone()).await.unwrap();
660
661        let retrieved = repo.get_by_id(relayer_id).await.unwrap();
662        assert_eq!(retrieved.id, relayer.id);
663        assert_eq!(retrieved.name, relayer.name);
664    }
665
666    #[ignore = "Requires active Redis instance"]
667    #[tokio::test]
668    async fn test_list_all_relayers() {
669        let repo = setup_test_repo().await;
670        let relayer1_id = uuid::Uuid::new_v4().to_string();
671        let relayer2_id = uuid::Uuid::new_v4().to_string();
672        let relayer1 = create_test_relayer(&relayer1_id);
673        let relayer2 = create_test_relayer(&relayer2_id);
674
675        repo.create(relayer1).await.unwrap();
676        repo.create(relayer2).await.unwrap();
677
678        let all_relayers = repo.list_all().await.unwrap();
679        assert!(all_relayers.len() >= 2);
680    }
681
682    #[ignore = "Requires active Redis instance"]
683    #[tokio::test]
684    async fn test_list_active_relayers() {
685        let repo = setup_test_repo().await;
686        let relayer1_id = uuid::Uuid::new_v4().to_string();
687        let relayer2_id = uuid::Uuid::new_v4().to_string();
688        let relayer1 = create_test_relayer_with_pause(&relayer1_id, false);
689        let relayer2 = create_test_relayer_with_pause(&relayer2_id, true);
690
691        repo.create(relayer1).await.unwrap();
692        repo.create(relayer2).await.unwrap();
693
694        let active_relayers = repo.list_active().await.unwrap();
695        // Should have at least 1 active relayer
696        assert!(!active_relayers.is_empty());
697        // All returned relayers should be active
698        assert!(active_relayers.iter().all(|r| !r.paused));
699    }
700
701    #[ignore = "Requires active Redis instance"]
702    #[tokio::test]
703    async fn test_count_relayers() {
704        let repo = setup_test_repo().await;
705        let relayer_id = uuid::Uuid::new_v4().to_string();
706        let relayer = create_test_relayer(&relayer_id);
707
708        repo.create(relayer).await.unwrap();
709
710        let count = repo.count().await.unwrap();
711        assert!(count >= 1);
712    }
713
714    #[ignore = "Requires active Redis instance"]
715    #[tokio::test]
716    async fn test_get_nonexistent_relayer() {
717        let repo = setup_test_repo().await;
718
719        let result = repo.get_by_id("nonexistent-relayer".to_string()).await;
720        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
721    }
722
723    #[ignore = "Requires active Redis instance"]
724    #[tokio::test]
725    async fn test_duplicate_relayer_creation() {
726        let repo = setup_test_repo().await;
727        let relayer_id = uuid::Uuid::new_v4().to_string();
728        let relayer = create_test_relayer(&relayer_id);
729
730        repo.create(relayer.clone()).await.unwrap();
731
732        let duplicate_result = repo.create(relayer).await;
733        assert!(matches!(
734            duplicate_result,
735            Err(RepositoryError::ConstraintViolation(_))
736        ));
737    }
738
739    #[ignore = "Requires active Redis instance"]
740    #[tokio::test]
741    async fn test_update_relayer() {
742        let repo = setup_test_repo().await;
743        let relayer_id = uuid::Uuid::new_v4().to_string();
744        let relayer = create_test_relayer(&relayer_id);
745
746        repo.create(relayer.clone()).await.unwrap();
747
748        let mut updated_relayer = relayer.clone();
749        updated_relayer.name = "Updated Relayer Name".to_string();
750
751        let result = repo.update(relayer.id.clone(), updated_relayer).await;
752        assert!(result.is_ok());
753
754        let updated = result.unwrap();
755        assert_eq!(updated.name, "Updated Relayer Name");
756        assert_eq!(updated.id, relayer.id);
757    }
758
759    #[ignore = "Requires active Redis instance"]
760    #[tokio::test]
761    async fn test_delete_relayer() {
762        let repo = setup_test_repo().await;
763        let relayer_id = uuid::Uuid::new_v4().to_string();
764        let relayer = create_test_relayer(&relayer_id);
765
766        repo.create(relayer.clone()).await.unwrap();
767
768        let delete_result = repo.delete_by_id(relayer.id.clone()).await;
769        assert!(delete_result.is_ok());
770
771        let get_result = repo.get_by_id(relayer.id).await;
772        assert!(matches!(get_result, Err(RepositoryError::NotFound(_))));
773    }
774
775    #[ignore = "Requires active Redis instance"]
776    #[tokio::test]
777    async fn test_list_paginated() {
778        let repo = setup_test_repo().await;
779        let relayer1_id = uuid::Uuid::new_v4().to_string();
780        let relayer2_id = uuid::Uuid::new_v4().to_string();
781        let relayer1 = create_test_relayer(&relayer1_id);
782        let relayer2 = create_test_relayer(&relayer2_id);
783
784        repo.create(relayer1).await.unwrap();
785        repo.create(relayer2).await.unwrap();
786
787        let query = PaginationQuery {
788            page: 1,
789            per_page: 10,
790        };
791
792        let result = repo.list_paginated(query).await.unwrap();
793        assert!(result.total >= 2);
794        assert_eq!(result.page, 1);
795        assert_eq!(result.per_page, 10);
796    }
797
798    #[ignore = "Requires active Redis instance"]
799    #[tokio::test]
800    async fn test_partial_update_relayer() {
801        let repo = setup_test_repo().await;
802        let relayer_id = uuid::Uuid::new_v4().to_string();
803        let relayer = create_test_relayer(&relayer_id);
804
805        repo.create(relayer.clone()).await.unwrap();
806
807        let update = UpdateRelayerRequest {
808            paused: Some(true),
809            ..Default::default()
810        };
811        let result = repo.partial_update(relayer.id.clone(), update).await;
812        assert!(result.is_ok());
813
814        let updated = result.unwrap();
815        assert_eq!(updated.id, relayer.id);
816        assert!(updated.paused);
817    }
818
819    #[ignore = "Requires active Redis instance"]
820    #[tokio::test]
821    async fn test_enable_disable_relayer() {
822        let repo = setup_test_repo().await;
823        let relayer_id = uuid::Uuid::new_v4().to_string();
824        let relayer = create_test_relayer(&relayer_id);
825
826        repo.create(relayer.clone()).await.unwrap();
827
828        // Test disable
829        let disabled = repo
830            .disable_relayer(
831                relayer.id.clone(),
832                DisabledReason::BalanceCheckFailed("test reason".to_string()),
833            )
834            .await
835            .unwrap();
836        assert!(disabled.system_disabled);
837
838        // Test enable
839        let enabled = repo.enable_relayer(relayer.id.clone()).await.unwrap();
840        assert!(!enabled.system_disabled);
841    }
842
843    #[ignore = "Requires active Redis instance"]
844    #[tokio::test]
845    async fn test_update_policy() {
846        let repo = setup_test_repo().await;
847        let relayer_id = uuid::Uuid::new_v4().to_string();
848        let relayer = create_test_relayer(&relayer_id);
849
850        repo.create(relayer.clone()).await.unwrap();
851
852        let new_policy = RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
853            gas_price_cap: Some(50_000_000_000),
854            whitelist_receivers: Some(vec!["0x123".to_string()]),
855            eip1559_pricing: Some(true),
856            private_transactions: Some(true),
857            min_balance: Some(1000000000000000000),
858            gas_limit_estimation: Some(true),
859        });
860
861        let result = repo.update_policy(relayer.id.clone(), new_policy).await;
862        assert!(result.is_ok());
863
864        let updated = result.unwrap();
865        if let RelayerNetworkPolicy::Evm(evm_policy) = updated.policies {
866            assert_eq!(evm_policy.gas_price_cap, Some(50_000_000_000));
867            assert_eq!(
868                evm_policy.whitelist_receivers,
869                Some(vec!["0x123".to_string()])
870            );
871            assert_eq!(evm_policy.eip1559_pricing, Some(true));
872            assert!(evm_policy.private_transactions.unwrap_or(false));
873            assert_eq!(evm_policy.min_balance, Some(1000000000000000000));
874        } else {
875            panic!("Expected EVM policy");
876        }
877    }
878
879    #[ignore = "Requires active Redis instance"]
880    #[tokio::test]
881    async fn test_debug_implementation() {
882        let repo = setup_test_repo().await;
883        let debug_str = format!("{:?}", repo);
884        assert!(debug_str.contains("RedisRelayerRepository"));
885        assert!(debug_str.contains("key_prefix"));
886    }
887
888    #[ignore = "Requires active Redis instance"]
889    #[tokio::test]
890    async fn test_error_handling_empty_id() {
891        let repo = setup_test_repo().await;
892
893        let create_result = repo
894            .create(RelayerRepoModel {
895                id: "".to_string(),
896                ..create_test_relayer("test")
897            })
898            .await;
899        assert!(matches!(
900            create_result,
901            Err(RepositoryError::InvalidData(_))
902        ));
903
904        let get_result = repo.get_by_id("".to_string()).await;
905        assert!(matches!(get_result, Err(RepositoryError::InvalidData(_))));
906
907        let update_result = repo
908            .update("".to_string(), create_test_relayer("test"))
909            .await;
910        assert!(matches!(
911            update_result,
912            Err(RepositoryError::InvalidData(_))
913        ));
914
915        let delete_result = repo.delete_by_id("".to_string()).await;
916        assert!(matches!(
917            delete_result,
918            Err(RepositoryError::InvalidData(_))
919        ));
920    }
921
922    #[ignore = "Requires active Redis instance"]
923    #[tokio::test]
924    async fn test_error_handling_empty_name() {
925        let repo = setup_test_repo().await;
926
927        let create_result = repo
928            .create(RelayerRepoModel {
929                name: "".to_string(),
930                ..create_test_relayer("test")
931            })
932            .await;
933        assert!(matches!(
934            create_result,
935            Err(RepositoryError::InvalidData(_))
936        ));
937    }
938
939    #[ignore = "Requires active Redis instance"]
940    #[tokio::test]
941    async fn test_pagination_validation() {
942        let repo = setup_test_repo().await;
943
944        let invalid_page = PaginationQuery {
945            page: 0,
946            per_page: 10,
947        };
948        let result = repo.list_paginated(invalid_page).await;
949        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
950
951        let invalid_per_page = PaginationQuery {
952            page: 1,
953            per_page: 0,
954        };
955        let result = repo.list_paginated(invalid_per_page).await;
956        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
957    }
958
959    #[ignore = "Requires active Redis instance"]
960    #[tokio::test]
961    async fn test_update_nonexistent_relayer() {
962        let repo = setup_test_repo().await;
963        let relayer = create_test_relayer("nonexistent-relayer");
964
965        let result = repo
966            .update("nonexistent-relayer".to_string(), relayer)
967            .await;
968        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
969    }
970
971    #[ignore = "Requires active Redis instance"]
972    #[tokio::test]
973    async fn test_delete_nonexistent_relayer() {
974        let repo = setup_test_repo().await;
975
976        let result = repo.delete_by_id("nonexistent-relayer".to_string()).await;
977        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
978    }
979
980    #[tokio::test]
981    #[ignore = "Requires active Redis instance"]
982    async fn test_has_entries() {
983        let repo = setup_test_repo().await;
984        assert!(!repo.has_entries().await.unwrap());
985
986        let relayer_id = uuid::Uuid::new_v4().to_string();
987        let relayer = create_test_relayer(&relayer_id);
988        repo.create(relayer.clone()).await.unwrap();
989        assert!(repo.has_entries().await.unwrap());
990    }
991
992    #[tokio::test]
993    #[ignore = "Requires active Redis instance"]
994    async fn test_drop_all_entries() {
995        let repo = setup_test_repo().await;
996        let relayer_id = uuid::Uuid::new_v4().to_string();
997        let relayer = create_test_relayer(&relayer_id);
998        repo.create(relayer.clone()).await.unwrap();
999        assert!(repo.has_entries().await.unwrap());
1000
1001        repo.drop_all_entries().await.unwrap();
1002        assert!(!repo.has_entries().await.unwrap());
1003    }
1004
1005    #[ignore = "Requires active Redis instance"]
1006    #[tokio::test]
1007    async fn test_list_by_signer_id() {
1008        let repo = setup_test_repo().await;
1009
1010        let relayer1_id = uuid::Uuid::new_v4().to_string();
1011        let relayer2_id = uuid::Uuid::new_v4().to_string();
1012        let relayer3_id = uuid::Uuid::new_v4().to_string();
1013        let signer1_id = uuid::Uuid::new_v4().to_string();
1014        let signer2_id = uuid::Uuid::new_v4().to_string();
1015
1016        let mut relayer1 = create_test_relayer(&relayer1_id);
1017        relayer1.signer_id = signer1_id.clone();
1018        repo.create(relayer1).await.unwrap();
1019
1020        let mut relayer2 = create_test_relayer(&relayer2_id);
1021
1022        relayer2.signer_id = signer2_id.clone();
1023        repo.create(relayer2).await.unwrap();
1024
1025        let mut relayer3 = create_test_relayer(&relayer3_id);
1026        relayer3.signer_id = signer1_id.clone();
1027        repo.create(relayer3).await.unwrap();
1028
1029        let result = repo.list_by_signer_id(&signer1_id).await.unwrap();
1030        assert_eq!(result.len(), 2);
1031        let ids: Vec<_> = result.iter().map(|r| r.id.clone()).collect();
1032        assert!(ids.contains(&relayer1_id));
1033        assert!(ids.contains(&relayer3_id));
1034
1035        let result = repo.list_by_signer_id(&signer2_id).await.unwrap();
1036        assert_eq!(result.len(), 1);
1037
1038        let result = repo.list_by_signer_id("nonexistent").await.unwrap();
1039        assert_eq!(result.len(), 0);
1040    }
1041
1042    #[ignore = "Requires active Redis instance"]
1043    #[tokio::test]
1044    async fn test_list_by_notification_id() {
1045        let repo = setup_test_repo().await;
1046
1047        let relayer1_id = uuid::Uuid::new_v4().to_string();
1048        let mut relayer1 = create_test_relayer(&relayer1_id);
1049        relayer1.notification_id = Some("notif1".to_string());
1050        repo.create(relayer1).await.unwrap();
1051
1052        let relayer2_id = uuid::Uuid::new_v4().to_string();
1053        let mut relayer2 = create_test_relayer(&relayer2_id);
1054        relayer2.notification_id = Some("notif2".to_string());
1055        repo.create(relayer2).await.unwrap();
1056
1057        let relayer3_id = uuid::Uuid::new_v4().to_string();
1058        let mut relayer3 = create_test_relayer(&relayer3_id);
1059        relayer3.notification_id = Some("notif1".to_string());
1060        repo.create(relayer3).await.unwrap();
1061
1062        let relayer4_id = uuid::Uuid::new_v4().to_string();
1063        let mut relayer4 = create_test_relayer(&relayer4_id);
1064        relayer4.notification_id = None;
1065        repo.create(relayer4).await.unwrap();
1066
1067        let result = repo.list_by_notification_id("notif1").await.unwrap();
1068        assert_eq!(result.len(), 2);
1069        let ids: Vec<_> = result.iter().map(|r| r.id.clone()).collect();
1070        assert!(ids.contains(&relayer1_id));
1071        assert!(ids.contains(&relayer3_id));
1072
1073        let result = repo.list_by_notification_id("notif2").await.unwrap();
1074        assert_eq!(result.len(), 1);
1075
1076        let result = repo.list_by_notification_id("nonexistent").await.unwrap();
1077        assert_eq!(result.len(), 0);
1078    }
1079}