openzeppelin_relayer/jobs/
queue.rs1use std::{env, sync::Arc};
11
12use apalis_redis::{Config, ConnectionManager, RedisStorage};
13use color_eyre::{eyre, Result};
14use serde::{Deserialize, Serialize};
15use tokio::time::{timeout, Duration};
16use tracing::error;
17
18use crate::config::ServerConfig;
19
20use super::{
21 Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
22 TransactionSend, TransactionStatusCheck,
23};
24
25#[derive(Clone, Debug)]
26pub struct Queue {
27 pub transaction_request_queue: RedisStorage<Job<TransactionRequest>>,
28 pub transaction_submission_queue: RedisStorage<Job<TransactionSend>>,
29 pub transaction_status_queue: RedisStorage<Job<TransactionStatusCheck>>,
31 pub transaction_status_queue_evm: RedisStorage<Job<TransactionStatusCheck>>,
33 pub transaction_status_queue_stellar: RedisStorage<Job<TransactionStatusCheck>>,
35 pub notification_queue: RedisStorage<Job<NotificationSend>>,
36 pub token_swap_request_queue: RedisStorage<Job<TokenSwapRequest>>,
37 pub relayer_health_check_queue: RedisStorage<Job<RelayerHealthCheck>>,
38}
39
40impl Queue {
41 async fn storage<T: Serialize + for<'de> Deserialize<'de>>(
42 namespace: &str,
43 shared: Arc<ConnectionManager>,
44 ) -> Result<RedisStorage<T>> {
45 let config = Config::default()
46 .set_namespace(namespace)
47 .set_enqueue_scheduled(Duration::from_secs(1)); Ok(RedisStorage::new_with_config((*shared).clone(), config))
50 }
51
52 pub async fn setup() -> Result<Self> {
53 let config = ServerConfig::from_env();
54 let redis_url = config.redis_url.clone();
55 let redis_connection_timeout_ms = config.redis_connection_timeout_ms;
56 let conn = match timeout(Duration::from_millis(redis_connection_timeout_ms), apalis_redis::connect(redis_url.clone())).await {
57 Ok(result) => result.map_err(|e| {
58 error!(redis_url = %redis_url, error = %e, "failed to connect to redis");
59 eyre::eyre!("Failed to connect to Redis. Please ensure Redis is running and accessible at {}. Error: {}", redis_url, e)
60 })?,
61 Err(_) => {
62 error!(redis_url = %redis_url, "timeout connecting to redis");
63 return Err(eyre::eyre!("Timed out after {} milliseconds while connecting to Redis at {}", redis_connection_timeout_ms, redis_url));
64 }
65 };
66
67 let shared = Arc::new(conn);
68 let redis_key_prefix = env::var("REDIS_KEY_PREFIX")
70 .ok()
71 .filter(|v| !v.is_empty())
72 .map(|value| format!("{value}:queue:"))
73 .unwrap_or_default();
74 Ok(Self {
75 transaction_request_queue: Self::storage(
76 &format!("{redis_key_prefix}transaction_request_queue"),
77 shared.clone(),
78 )
79 .await?,
80 transaction_submission_queue: Self::storage(
81 &format!("{redis_key_prefix}transaction_submission_queue"),
82 shared.clone(),
83 )
84 .await?,
85 transaction_status_queue: Self::storage(
86 &format!("{redis_key_prefix}transaction_status_queue"),
87 shared.clone(),
88 )
89 .await?,
90 transaction_status_queue_evm: Self::storage(
91 &format!("{redis_key_prefix}transaction_status_queue_evm"),
92 shared.clone(),
93 )
94 .await?,
95 transaction_status_queue_stellar: Self::storage(
96 &format!("{redis_key_prefix}transaction_status_queue_stellar"),
97 shared.clone(),
98 )
99 .await?,
100 notification_queue: Self::storage(
101 &format!("{redis_key_prefix}notification_queue"),
102 shared.clone(),
103 )
104 .await?,
105 token_swap_request_queue: Self::storage(
106 &format!("{redis_key_prefix}token_swap_request_queue"),
107 shared.clone(),
108 )
109 .await?,
110 relayer_health_check_queue: Self::storage(
111 &format!("{redis_key_prefix}relayer_health_check_queue"),
112 shared.clone(),
113 )
114 .await?,
115 })
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122
123 #[tokio::test]
124 async fn test_queue_storage_configuration() {
125 let namespace = "test_namespace";
127 let config = Config::default().set_namespace(namespace);
128
129 assert_eq!(config.get_namespace(), namespace);
130 }
131
132 #[derive(Clone, Debug)]
134 struct MockQueue {
135 pub namespace_transaction_request: String,
136 pub namespace_transaction_submission: String,
137 pub namespace_transaction_status: String,
138 pub namespace_transaction_status_evm: String,
139 pub namespace_transaction_status_stellar: String,
140 pub namespace_notification: String,
141 pub namespace_token_swap_request_queue: String,
142 pub namespace_relayer_health_check_queue: String,
143 }
144
145 impl MockQueue {
146 fn new() -> Self {
147 Self {
148 namespace_transaction_request: "transaction_request_queue".to_string(),
149 namespace_transaction_submission: "transaction_submission_queue".to_string(),
150 namespace_transaction_status: "transaction_status_queue".to_string(),
151 namespace_transaction_status_evm: "transaction_status_queue_evm".to_string(),
152 namespace_transaction_status_stellar: "transaction_status_queue_stellar"
153 .to_string(),
154 namespace_notification: "notification_queue".to_string(),
155 namespace_token_swap_request_queue: "token_swap_request_queue".to_string(),
156 namespace_relayer_health_check_queue: "relayer_health_check_queue".to_string(),
157 }
158 }
159 }
160
161 #[test]
162 fn test_queue_namespaces() {
163 let mock_queue = MockQueue::new();
164
165 assert_eq!(
166 mock_queue.namespace_transaction_request,
167 "transaction_request_queue"
168 );
169 assert_eq!(
170 mock_queue.namespace_transaction_submission,
171 "transaction_submission_queue"
172 );
173 assert_eq!(
174 mock_queue.namespace_transaction_status,
175 "transaction_status_queue"
176 );
177 assert_eq!(
178 mock_queue.namespace_transaction_status_evm,
179 "transaction_status_queue_evm"
180 );
181 assert_eq!(
182 mock_queue.namespace_transaction_status_stellar,
183 "transaction_status_queue_stellar"
184 );
185 assert_eq!(mock_queue.namespace_notification, "notification_queue");
186 assert_eq!(
187 mock_queue.namespace_token_swap_request_queue,
188 "token_swap_request_queue"
189 );
190 assert_eq!(
191 mock_queue.namespace_relayer_health_check_queue,
192 "relayer_health_check_queue"
193 );
194 }
195}