openzeppelin_relayer/repositories/relayer/
mod.rs

1//! Relayer Repository Module
2//!
3//! This module provides the relayer repository layer for the OpenZeppelin Relayer service.
4//! It implements the Repository pattern to abstract relayer data persistence operations,
5//! supporting both in-memory and Redis-backed storage implementations.
6//!
7//! ## Features
8//!
9//! - **CRUD Operations**: Create, read, update, and delete relayer configurations
10//! - **Status Management**: Enable/disable relayers and track their state
11//! - **Policy Management**: Update relayer network policies
12//! - **Partial Updates**: Support for partial relayer configuration updates
13//! - **Active Filtering**: Query for active (non-paused) relayers
14//! - **Pagination Support**: Efficient paginated listing of relayers
15//!
16//! ## Repository Implementations
17//!
18//! - [`InMemoryRelayerRepository`]: Fast in-memory storage for testing/development
19//! - [`RedisRelayerRepository`]: Redis-backed storage for production environments
20//!
21
22mod relayer_in_memory;
23mod relayer_redis;
24
25pub use relayer_in_memory::*;
26pub use relayer_redis::*;
27
28use crate::{
29    models::UpdateRelayerRequest,
30    models::{
31        DisabledReason, PaginationQuery, RelayerNetworkPolicy, RelayerRepoModel, RepositoryError,
32    },
33    repositories::{PaginatedResult, Repository},
34};
35use async_trait::async_trait;
36use redis::aio::ConnectionManager;
37use std::sync::Arc;
38
39#[async_trait]
40pub trait RelayerRepository: Repository<RelayerRepoModel, String> + Send + Sync {
41    async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
42    async fn list_by_signer_id(
43        &self,
44        signer_id: &str,
45    ) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
46    async fn list_by_notification_id(
47        &self,
48        notification_id: &str,
49    ) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
50    async fn partial_update(
51        &self,
52        id: String,
53        update: UpdateRelayerRequest,
54    ) -> Result<RelayerRepoModel, RepositoryError>;
55    async fn enable_relayer(&self, relayer_id: String)
56        -> Result<RelayerRepoModel, RepositoryError>;
57    async fn disable_relayer(
58        &self,
59        relayer_id: String,
60        reason: DisabledReason,
61    ) -> Result<RelayerRepoModel, RepositoryError>;
62    async fn update_policy(
63        &self,
64        id: String,
65        policy: RelayerNetworkPolicy,
66    ) -> Result<RelayerRepoModel, RepositoryError>;
67    /// Returns true if this repository uses persistent storage (e.g., Redis).
68    /// Returns false for in-memory storage.
69    fn is_persistent_storage(&self) -> bool;
70}
71
72/// Enum wrapper for different relayer repository implementations
73#[derive(Debug, Clone)]
74pub enum RelayerRepositoryStorage {
75    InMemory(InMemoryRelayerRepository),
76    Redis(RedisRelayerRepository),
77}
78
79impl RelayerRepositoryStorage {
80    pub fn new_in_memory() -> Self {
81        Self::InMemory(InMemoryRelayerRepository::new())
82    }
83
84    pub fn new_redis(
85        connection_manager: Arc<ConnectionManager>,
86        key_prefix: String,
87    ) -> Result<Self, RepositoryError> {
88        Ok(Self::Redis(RedisRelayerRepository::new(
89            connection_manager,
90            key_prefix,
91        )?))
92    }
93}
94
95impl Default for RelayerRepositoryStorage {
96    fn default() -> Self {
97        Self::new_in_memory()
98    }
99}
100
101#[async_trait]
102impl Repository<RelayerRepoModel, String> for RelayerRepositoryStorage {
103    async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError> {
104        match self {
105            RelayerRepositoryStorage::InMemory(repo) => repo.create(entity).await,
106            RelayerRepositoryStorage::Redis(repo) => repo.create(entity).await,
107        }
108    }
109
110    async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError> {
111        match self {
112            RelayerRepositoryStorage::InMemory(repo) => repo.get_by_id(id).await,
113            RelayerRepositoryStorage::Redis(repo) => repo.get_by_id(id).await,
114        }
115    }
116
117    async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
118        match self {
119            RelayerRepositoryStorage::InMemory(repo) => repo.list_all().await,
120            RelayerRepositoryStorage::Redis(repo) => repo.list_all().await,
121        }
122    }
123
124    async fn list_paginated(
125        &self,
126        query: PaginationQuery,
127    ) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError> {
128        match self {
129            RelayerRepositoryStorage::InMemory(repo) => repo.list_paginated(query).await,
130            RelayerRepositoryStorage::Redis(repo) => repo.list_paginated(query).await,
131        }
132    }
133
134    async fn update(
135        &self,
136        id: String,
137        entity: RelayerRepoModel,
138    ) -> Result<RelayerRepoModel, RepositoryError> {
139        match self {
140            RelayerRepositoryStorage::InMemory(repo) => repo.update(id, entity).await,
141            RelayerRepositoryStorage::Redis(repo) => repo.update(id, entity).await,
142        }
143    }
144
145    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
146        match self {
147            RelayerRepositoryStorage::InMemory(repo) => repo.delete_by_id(id).await,
148            RelayerRepositoryStorage::Redis(repo) => repo.delete_by_id(id).await,
149        }
150    }
151
152    async fn count(&self) -> Result<usize, RepositoryError> {
153        match self {
154            RelayerRepositoryStorage::InMemory(repo) => repo.count().await,
155            RelayerRepositoryStorage::Redis(repo) => repo.count().await,
156        }
157    }
158
159    async fn has_entries(&self) -> Result<bool, RepositoryError> {
160        match self {
161            RelayerRepositoryStorage::InMemory(repo) => repo.has_entries().await,
162            RelayerRepositoryStorage::Redis(repo) => repo.has_entries().await,
163        }
164    }
165
166    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
167        match self {
168            RelayerRepositoryStorage::InMemory(repo) => repo.drop_all_entries().await,
169            RelayerRepositoryStorage::Redis(repo) => repo.drop_all_entries().await,
170        }
171    }
172}
173
174#[async_trait]
175impl RelayerRepository for RelayerRepositoryStorage {
176    async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
177        match self {
178            RelayerRepositoryStorage::InMemory(repo) => repo.list_active().await,
179            RelayerRepositoryStorage::Redis(repo) => repo.list_active().await,
180        }
181    }
182
183    async fn list_by_signer_id(
184        &self,
185        signer_id: &str,
186    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
187        match self {
188            RelayerRepositoryStorage::InMemory(repo) => repo.list_by_signer_id(signer_id).await,
189            RelayerRepositoryStorage::Redis(repo) => repo.list_by_signer_id(signer_id).await,
190        }
191    }
192
193    async fn list_by_notification_id(
194        &self,
195        notification_id: &str,
196    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
197        match self {
198            RelayerRepositoryStorage::InMemory(repo) => {
199                repo.list_by_notification_id(notification_id).await
200            }
201            RelayerRepositoryStorage::Redis(repo) => {
202                repo.list_by_notification_id(notification_id).await
203            }
204        }
205    }
206
207    async fn partial_update(
208        &self,
209        id: String,
210        update: UpdateRelayerRequest,
211    ) -> Result<RelayerRepoModel, RepositoryError> {
212        match self {
213            RelayerRepositoryStorage::InMemory(repo) => repo.partial_update(id, update).await,
214            RelayerRepositoryStorage::Redis(repo) => repo.partial_update(id, update).await,
215        }
216    }
217
218    async fn enable_relayer(
219        &self,
220        relayer_id: String,
221    ) -> Result<RelayerRepoModel, RepositoryError> {
222        match self {
223            RelayerRepositoryStorage::InMemory(repo) => repo.enable_relayer(relayer_id).await,
224            RelayerRepositoryStorage::Redis(repo) => repo.enable_relayer(relayer_id).await,
225        }
226    }
227
228    async fn disable_relayer(
229        &self,
230        relayer_id: String,
231        reason: DisabledReason,
232    ) -> Result<RelayerRepoModel, RepositoryError> {
233        match self {
234            RelayerRepositoryStorage::InMemory(repo) => {
235                repo.disable_relayer(relayer_id, reason).await
236            }
237            RelayerRepositoryStorage::Redis(repo) => repo.disable_relayer(relayer_id, reason).await,
238        }
239    }
240
241    async fn update_policy(
242        &self,
243        id: String,
244        policy: RelayerNetworkPolicy,
245    ) -> Result<RelayerRepoModel, RepositoryError> {
246        match self {
247            RelayerRepositoryStorage::InMemory(repo) => repo.update_policy(id, policy).await,
248            RelayerRepositoryStorage::Redis(repo) => repo.update_policy(id, policy).await,
249        }
250    }
251
252    fn is_persistent_storage(&self) -> bool {
253        match self {
254            RelayerRepositoryStorage::InMemory(_) => false,
255            RelayerRepositoryStorage::Redis(_) => true,
256        }
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263    use crate::models::{NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy};
264
265    fn create_test_relayer(id: String) -> RelayerRepoModel {
266        RelayerRepoModel {
267            id: id.clone(),
268            name: format!("Relayer {}", id.clone()),
269            network: "TestNet".to_string(),
270            paused: false,
271            network_type: NetworkType::Evm,
272            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
273                min_balance: Some(0),
274                gas_limit_estimation: Some(true),
275                gas_price_cap: None,
276                whitelist_receivers: None,
277                eip1559_pricing: Some(false),
278                private_transactions: Some(false),
279            }),
280            signer_id: "test".to_string(),
281            address: "0x".to_string(),
282            notification_id: None,
283            system_disabled: false,
284            custom_rpc_urls: None,
285            ..Default::default()
286        }
287    }
288
289    #[actix_web::test]
290    async fn test_in_memory_repository_impl() {
291        let impl_repo = RelayerRepositoryStorage::new_in_memory();
292        let relayer = create_test_relayer("test-relayer".to_string());
293
294        // Test create
295        let created = impl_repo.create(relayer.clone()).await.unwrap();
296        assert_eq!(created.id, relayer.id);
297
298        // Test get
299        let retrieved = impl_repo
300            .get_by_id("test-relayer".to_string())
301            .await
302            .unwrap();
303        assert_eq!(retrieved.id, relayer.id);
304
305        // Test list all
306        let all_relayers = impl_repo.list_all().await.unwrap();
307        assert!(!all_relayers.is_empty());
308
309        // Test count
310        let count = impl_repo.count().await.unwrap();
311        assert!(count >= 1);
312
313        // Test update
314        let mut updated_relayer = relayer.clone();
315        updated_relayer.name = "Updated Name".to_string();
316        let updated = impl_repo
317            .update(relayer.id.clone(), updated_relayer)
318            .await
319            .unwrap();
320        assert_eq!(updated.name, "Updated Name");
321
322        // Test delete
323        impl_repo.delete_by_id(relayer.id.clone()).await.unwrap();
324        let get_result = impl_repo.get_by_id("test-relayer".to_string()).await;
325        assert!(get_result.is_err());
326    }
327
328    #[actix_web::test]
329    async fn test_relayer_repository_trait_methods() {
330        let impl_repo = RelayerRepositoryStorage::new_in_memory();
331        let relayer = create_test_relayer("test-relayer".to_string());
332
333        // Create the relayer first
334        impl_repo.create(relayer.clone()).await.unwrap();
335
336        // Test list_active
337        let active_relayers = impl_repo.list_active().await.unwrap();
338        assert!(!active_relayers.is_empty());
339
340        // Test partial_update
341        let update = UpdateRelayerRequest {
342            paused: Some(true),
343            ..Default::default()
344        };
345        let updated = impl_repo
346            .partial_update(relayer.id.clone(), update)
347            .await
348            .unwrap();
349        assert!(updated.paused);
350
351        // Test enable/disable
352        let disabled = impl_repo
353            .disable_relayer(
354                relayer.id.clone(),
355                DisabledReason::BalanceCheckFailed("Test disable reason".to_string()),
356            )
357            .await
358            .unwrap();
359        assert!(disabled.system_disabled);
360        assert_eq!(
361            disabled.disabled_reason,
362            Some(DisabledReason::BalanceCheckFailed(
363                "Test disable reason".to_string()
364            ))
365        );
366
367        let enabled = impl_repo.enable_relayer(relayer.id.clone()).await.unwrap();
368        assert!(!enabled.system_disabled);
369        assert_eq!(enabled.disabled_reason, None);
370
371        // Test update_policy
372        let new_policy = RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
373            min_balance: Some(1000000000000000000),
374            gas_limit_estimation: Some(true),
375            gas_price_cap: Some(50_000_000_000),
376            whitelist_receivers: None,
377            eip1559_pricing: Some(true),
378            private_transactions: Some(false),
379        });
380        let policy_updated = impl_repo
381            .update_policy(relayer.id.clone(), new_policy)
382            .await
383            .unwrap();
384
385        if let RelayerNetworkPolicy::Evm(evm_policy) = policy_updated.policies {
386            assert_eq!(evm_policy.gas_price_cap, Some(50_000_000_000));
387            assert_eq!(evm_policy.eip1559_pricing, Some(true));
388        } else {
389            panic!("Expected EVM policy");
390        }
391    }
392
393    #[actix_web::test]
394    async fn test_create_repository_in_memory() {
395        let result = RelayerRepositoryStorage::new_in_memory();
396
397        assert!(matches!(result, RelayerRepositoryStorage::InMemory(_)));
398    }
399
400    #[actix_web::test]
401    async fn test_pagination() {
402        let impl_repo = RelayerRepositoryStorage::new_in_memory();
403        let relayer1 = create_test_relayer("test-relayer-1".to_string());
404        let relayer2 = create_test_relayer("test-relayer-2".to_string());
405
406        impl_repo.create(relayer1).await.unwrap();
407        impl_repo.create(relayer2).await.unwrap();
408
409        let query = PaginationQuery {
410            page: 1,
411            per_page: 10,
412        };
413
414        let result = impl_repo.list_paginated(query).await.unwrap();
415        assert!(result.total >= 2);
416        assert_eq!(result.page, 1);
417        assert_eq!(result.per_page, 10);
418    }
419
420    #[actix_web::test]
421    async fn test_delete_relayer() {
422        let impl_repo = RelayerRepositoryStorage::new_in_memory();
423        let relayer = create_test_relayer("delete-test".to_string());
424
425        // Create relayer
426        impl_repo.create(relayer.clone()).await.unwrap();
427
428        // Delete relayer
429        impl_repo
430            .delete_by_id("delete-test".to_string())
431            .await
432            .unwrap();
433
434        // Verify deletion
435        let get_result = impl_repo.get_by_id("delete-test".to_string()).await;
436        assert!(get_result.is_err());
437        assert!(matches!(
438            get_result.unwrap_err(),
439            RepositoryError::NotFound(_)
440        ));
441
442        // Test deleting non-existent relayer
443        let delete_result = impl_repo.delete_by_id("nonexistent".to_string()).await;
444        assert!(delete_result.is_err());
445    }
446
447    #[actix_web::test]
448    async fn test_has_entries() {
449        let repo = InMemoryRelayerRepository::new();
450        assert!(!repo.has_entries().await.unwrap());
451
452        let relayer = create_test_relayer("test".to_string());
453
454        repo.create(relayer.clone()).await.unwrap();
455        assert!(repo.has_entries().await.unwrap());
456
457        repo.delete_by_id(relayer.id.clone()).await.unwrap();
458        assert!(!repo.has_entries().await.unwrap());
459    }
460
461    #[actix_web::test]
462    async fn test_drop_all_entries() {
463        let repo = InMemoryRelayerRepository::new();
464        let relayer = create_test_relayer("test".to_string());
465
466        repo.create(relayer.clone()).await.unwrap();
467        assert!(repo.has_entries().await.unwrap());
468
469        repo.drop_all_entries().await.unwrap();
470        assert!(!repo.has_entries().await.unwrap());
471    }
472}
473
474#[cfg(test)]
475mockall::mock! {
476    pub RelayerRepository {}
477
478    #[async_trait]
479    impl Repository<RelayerRepoModel, String> for RelayerRepository {
480        async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError>;
481        async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError>;
482        async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
483        async fn list_paginated(&self, query: PaginationQuery) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError>;
484        async fn update(&self, id: String, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError>;
485        async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError>;
486        async fn count(&self) -> Result<usize, RepositoryError>;
487        async fn has_entries(&self) -> Result<bool, RepositoryError>;
488        async fn drop_all_entries(&self) -> Result<(), RepositoryError>;
489    }
490
491    #[async_trait]
492    impl RelayerRepository for RelayerRepository {
493        async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
494        async fn list_by_signer_id(&self, signer_id: &str) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
495        async fn list_by_notification_id(&self, notification_id: &str) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
496        async fn partial_update(&self, id: String, update: UpdateRelayerRequest) -> Result<RelayerRepoModel, RepositoryError>;
497        async fn enable_relayer(&self, relayer_id: String) -> Result<RelayerRepoModel, RepositoryError>;
498        async fn disable_relayer(&self, relayer_id: String, reason: DisabledReason) -> Result<RelayerRepoModel, RepositoryError>;
499        async fn update_policy(&self, id: String, policy: RelayerNetworkPolicy) -> Result<RelayerRepoModel, RepositoryError>;
500        fn is_persistent_storage(&self) -> bool;
501    }
502}