1use crate::{
6 config::ServerConfig,
7 constants::{
8 DEFAULT_CONCURRENCY_HEALTH_CHECK, DEFAULT_CONCURRENCY_NOTIFICATION,
9 DEFAULT_CONCURRENCY_STATUS_CHECKER, DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
10 DEFAULT_CONCURRENCY_STATUS_CHECKER_STELLAR, DEFAULT_CONCURRENCY_TOKEN_SWAP,
11 DEFAULT_CONCURRENCY_TRANSACTION_REQUEST, DEFAULT_CONCURRENCY_TRANSACTION_SENDER,
12 WORKER_NOTIFICATION_SENDER_RETRIES, WORKER_RELAYER_HEALTH_CHECK_RETRIES,
13 WORKER_TOKEN_SWAP_REQUEST_RETRIES, WORKER_TRANSACTION_CLEANUP_RETRIES,
14 WORKER_TRANSACTION_REQUEST_RETRIES, WORKER_TRANSACTION_STATUS_CHECKER_RETRIES,
15 WORKER_TRANSACTION_SUBMIT_RETRIES,
16 },
17 jobs::{
18 notification_handler, relayer_health_check_handler, token_swap_cron_handler,
19 token_swap_request_handler, transaction_cleanup_handler, transaction_request_handler,
20 transaction_status_handler, transaction_submission_handler, JobProducerTrait,
21 },
22 models::{
23 NetworkRepoModel, NotificationRepoModel, RelayerNetworkPolicy, RelayerRepoModel,
24 SignerRepoModel, ThinDataAppState, TransactionRepoModel,
25 },
26 repositories::{
27 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
28 Repository, TransactionCounterTrait, TransactionRepository,
29 },
30};
31use apalis::prelude::*;
32
33use apalis::layers::retry::backoff::MakeBackoff;
34use apalis::layers::retry::{backoff::ExponentialBackoffMaker, RetryPolicy};
35use apalis::layers::ErrorHandlingLayer;
36
37pub use tower::util::rng::HasherRng;
39
40use apalis_cron::CronStream;
41use eyre::Result;
42use std::{str::FromStr, time::Duration};
43use tokio::signal::unix::SignalKind;
44use tracing::{debug, error, info};
45
46const TRANSACTION_REQUEST: &str = "transaction_request";
47const TRANSACTION_SENDER: &str = "transaction_sender";
48const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
50const TRANSACTION_STATUS_CHECKER_EVM: &str = "transaction_status_checker_evm";
52const TRANSACTION_STATUS_CHECKER_STELLAR: &str = "transaction_status_checker_stellar";
53const NOTIFICATION_SENDER: &str = "notification_sender";
54const TOKEN_SWAP_REQUEST: &str = "token_swap_request";
55const TRANSACTION_CLEANUP: &str = "transaction_cleanup";
56const RELAYER_HEALTH_CHECK: &str = "relayer_health_check";
57
58fn create_backoff(initial_ms: u64, max_ms: u64, jitter: f64) -> Result<ExponentialBackoffMaker> {
68 let maker = ExponentialBackoffMaker::new(
69 Duration::from_millis(initial_ms),
70 Duration::from_millis(max_ms),
71 jitter,
72 HasherRng::default(),
73 )?;
74
75 Ok(maker)
76}
77
78pub async fn initialize_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
79 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
80) -> Result<()>
81where
82 J: JobProducerTrait + Send + Sync + 'static,
83 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
84 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
85 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
86 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
87 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
88 TCR: TransactionCounterTrait + Send + Sync + 'static,
89 PR: PluginRepositoryTrait + Send + Sync + 'static,
90 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
91{
92 let queue = app_state.job_producer.get_queue().await?;
93
94 let transaction_request_queue_worker = WorkerBuilder::new(TRANSACTION_REQUEST)
95 .layer(ErrorHandlingLayer::new())
96 .retry(
97 RetryPolicy::retries(WORKER_TRANSACTION_REQUEST_RETRIES)
98 .with_backoff(create_backoff(500, 5000, 0.99)?.make_backoff()),
99 )
100 .enable_tracing()
101 .catch_panic()
102 .concurrency(ServerConfig::get_worker_concurrency(
103 TRANSACTION_REQUEST,
104 DEFAULT_CONCURRENCY_TRANSACTION_REQUEST,
105 ))
106 .data(app_state.clone())
107 .backend(queue.transaction_request_queue.clone())
108 .build_fn(transaction_request_handler);
109
110 let transaction_submission_queue_worker = WorkerBuilder::new(TRANSACTION_SENDER)
111 .layer(ErrorHandlingLayer::new())
112 .enable_tracing()
113 .catch_panic()
114 .retry(
115 RetryPolicy::retries(WORKER_TRANSACTION_SUBMIT_RETRIES)
116 .with_backoff(create_backoff(500, 2000, 0.99)?.make_backoff()),
117 )
118 .concurrency(ServerConfig::get_worker_concurrency(
119 TRANSACTION_SENDER,
120 DEFAULT_CONCURRENCY_TRANSACTION_SENDER,
121 ))
122 .data(app_state.clone())
123 .backend(queue.transaction_submission_queue.clone())
124 .build_fn(transaction_submission_handler);
125
126 let transaction_status_queue_worker = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER)
129 .layer(ErrorHandlingLayer::new())
130 .enable_tracing()
131 .catch_panic()
132 .retry(
133 RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
134 .with_backoff(create_backoff(5000, 8000, 0.99)?.make_backoff()),
135 )
136 .concurrency(ServerConfig::get_worker_concurrency(
137 TRANSACTION_STATUS_CHECKER,
138 DEFAULT_CONCURRENCY_STATUS_CHECKER,
139 ))
140 .data(app_state.clone())
141 .backend(queue.transaction_status_queue.clone())
142 .build_fn(transaction_status_handler);
143
144 let transaction_status_queue_worker_evm = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_EVM)
147 .layer(ErrorHandlingLayer::new())
148 .enable_tracing()
149 .catch_panic()
150 .retry(
151 RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
152 .with_backoff(create_backoff(8000, 12000, 0.99)?.make_backoff()),
153 )
154 .concurrency(ServerConfig::get_worker_concurrency(
155 TRANSACTION_STATUS_CHECKER_EVM,
156 DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
157 ))
158 .data(app_state.clone())
159 .backend(queue.transaction_status_queue_evm.clone())
160 .build_fn(transaction_status_handler);
161
162 let transaction_status_queue_worker_stellar =
165 WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_STELLAR)
166 .layer(ErrorHandlingLayer::new())
167 .enable_tracing()
168 .catch_panic()
169 .retry(
170 RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
171 .with_backoff(create_backoff(2000, 3000, 0.99)?.make_backoff()),
172 )
173 .concurrency(ServerConfig::get_worker_concurrency(
174 TRANSACTION_STATUS_CHECKER_STELLAR,
175 DEFAULT_CONCURRENCY_STATUS_CHECKER_STELLAR,
176 ))
177 .data(app_state.clone())
178 .backend(queue.transaction_status_queue_stellar.clone())
179 .build_fn(transaction_status_handler);
180
181 let notification_queue_worker = WorkerBuilder::new(NOTIFICATION_SENDER)
182 .layer(ErrorHandlingLayer::new())
183 .enable_tracing()
184 .catch_panic()
185 .retry(
186 RetryPolicy::retries(WORKER_NOTIFICATION_SENDER_RETRIES)
187 .with_backoff(create_backoff(2000, 8000, 0.99)?.make_backoff()),
188 )
189 .concurrency(ServerConfig::get_worker_concurrency(
190 NOTIFICATION_SENDER,
191 DEFAULT_CONCURRENCY_NOTIFICATION,
192 ))
193 .data(app_state.clone())
194 .backend(queue.notification_queue.clone())
195 .build_fn(notification_handler);
196
197 let token_swap_request_queue_worker = WorkerBuilder::new(TOKEN_SWAP_REQUEST)
198 .layer(ErrorHandlingLayer::new())
199 .enable_tracing()
200 .catch_panic()
201 .retry(
202 RetryPolicy::retries(WORKER_TOKEN_SWAP_REQUEST_RETRIES)
203 .with_backoff(create_backoff(5000, 20000, 0.99)?.make_backoff()),
204 )
205 .concurrency(ServerConfig::get_worker_concurrency(
206 TOKEN_SWAP_REQUEST,
207 DEFAULT_CONCURRENCY_TOKEN_SWAP,
208 ))
209 .data(app_state.clone())
210 .backend(queue.token_swap_request_queue.clone())
211 .build_fn(token_swap_request_handler);
212
213 let transaction_cleanup_queue_worker = WorkerBuilder::new(TRANSACTION_CLEANUP)
214 .layer(ErrorHandlingLayer::new())
215 .enable_tracing()
216 .catch_panic()
217 .retry(
218 RetryPolicy::retries(WORKER_TRANSACTION_CLEANUP_RETRIES)
219 .with_backoff(create_backoff(5000, 20000, 0.99)?.make_backoff()),
220 )
221 .concurrency(ServerConfig::get_worker_concurrency(TRANSACTION_CLEANUP, 1)) .data(app_state.clone())
223 .backend(CronStream::new(
224 apalis_cron::Schedule::from_str("0 */30 * * * *")?,
226 ))
227 .build_fn(transaction_cleanup_handler);
228
229 let relayer_health_check_worker = WorkerBuilder::new(RELAYER_HEALTH_CHECK)
230 .layer(ErrorHandlingLayer::new())
231 .enable_tracing()
232 .catch_panic()
233 .retry(
234 RetryPolicy::retries(WORKER_RELAYER_HEALTH_CHECK_RETRIES)
235 .with_backoff(create_backoff(2000, 10000, 0.99)?.make_backoff()),
236 )
237 .concurrency(ServerConfig::get_worker_concurrency(
238 RELAYER_HEALTH_CHECK,
239 DEFAULT_CONCURRENCY_HEALTH_CHECK,
240 ))
241 .data(app_state.clone())
242 .backend(queue.relayer_health_check_queue.clone())
243 .build_fn(relayer_health_check_handler);
244
245 let monitor = Monitor::new()
246 .register(transaction_request_queue_worker)
247 .register(transaction_submission_queue_worker)
248 .register(transaction_status_queue_worker)
249 .register(transaction_status_queue_worker_evm)
250 .register(transaction_status_queue_worker_stellar)
251 .register(notification_queue_worker)
252 .register(token_swap_request_queue_worker)
253 .register(transaction_cleanup_queue_worker)
254 .register(relayer_health_check_worker)
255 .on_event(monitor_handle_event)
256 .shutdown_timeout(Duration::from_millis(5000));
257
258 let monitor_future = monitor.run_with_signal(async {
259 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
260 .expect("Failed to create SIGINT signal");
261 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
262 .expect("Failed to create SIGTERM signal");
263
264 debug!("Workers monitor started");
265
266 tokio::select! {
267 _ = sigint.recv() => debug!("Received SIGINT."),
268 _ = sigterm.recv() => debug!("Received SIGTERM."),
269 };
270
271 debug!("Workers monitor shutting down");
272
273 Ok(())
274 });
275 tokio::spawn(async move {
276 if let Err(e) = monitor_future.await {
277 error!(error = %e, "monitor error");
278 }
279 });
280 debug!("Workers monitor shutdown complete");
281
282 Ok(())
283}
284
285fn filter_relayers_for_swap(relayers: Vec<RelayerRepoModel>) -> Vec<RelayerRepoModel> {
291 relayers
292 .into_iter()
293 .filter(|relayer| {
294 match &relayer.policies {
295 RelayerNetworkPolicy::Solana(policy) => {
296 let swap_config = match policy.get_swap_config() {
297 Some(config) => config,
298 None => {
299 debug!(relayer_id = %relayer.id, "No Solana swap configuration specified; skipping");
300 return false;
301 }
302 };
303
304 if swap_config.cron_schedule.is_none() {
305 debug!(relayer_id = %relayer.id, "No cron schedule specified; skipping");
306 return false;
307 }
308 true
309 }
310 RelayerNetworkPolicy::Stellar(policy) => {
311 let swap_config = match policy.get_swap_config() {
312 Some(config) => config,
313 None => {
314 debug!(relayer_id = %relayer.id, "No Stellar swap configuration specified; skipping");
315 return false;
316 }
317 };
318
319 if swap_config.cron_schedule.is_none() {
320 debug!(relayer_id = %relayer.id, "No cron schedule specified; skipping");
321 return false;
322 }
323 true
324 }
325 _ => {
326 debug!(relayer_id = %relayer.id, "Network type does not support swap; skipping");
327 false
328 }
329 }
330 })
331 .collect()
332}
333
334pub async fn initialize_token_swap_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
337 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
338) -> Result<()>
339where
340 J: JobProducerTrait + Send + Sync + 'static,
341 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
342 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
343 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
344 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
345 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
346 TCR: TransactionCounterTrait + Send + Sync + 'static,
347 PR: PluginRepositoryTrait + Send + Sync + 'static,
348 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
349{
350 let active_relayers = app_state.relayer_repository.list_active().await?;
351 let relayers_with_swap_enabled = filter_relayers_for_swap(active_relayers);
352
353 if relayers_with_swap_enabled.is_empty() {
354 debug!("No relayers with swap enabled");
355 return Ok(());
356 }
357 info!(
358 "Found {} relayers with swap enabled",
359 relayers_with_swap_enabled.len()
360 );
361
362 let mut workers = Vec::new();
363
364 let swap_backoff = create_backoff(2000, 5000, 0.99)?.make_backoff();
365
366 for relayer in relayers_with_swap_enabled {
367 debug!(relayer = ?relayer, "found relayer with swap enabled");
368
369 let (cron_schedule, network_type) = match &relayer.policies {
370 RelayerNetworkPolicy::Solana(policy) => match policy.get_swap_config() {
371 Some(config) => match config.cron_schedule {
372 Some(schedule) => (schedule, "solana".to_string()),
373 None => {
374 debug!(relayer_id = %relayer.id, "No cron schedule specified for Solana relayer; skipping");
375 continue;
376 }
377 },
378 None => {
379 debug!(relayer_id = %relayer.id, "No swap configuration specified for Solana relayer; skipping");
380 continue;
381 }
382 },
383 RelayerNetworkPolicy::Stellar(policy) => match policy.get_swap_config() {
384 Some(config) => match config.cron_schedule {
385 Some(schedule) => (schedule, "stellar".to_string()),
386 None => {
387 debug!(relayer_id = %relayer.id, "No cron schedule specified for Stellar relayer; skipping");
388 continue;
389 }
390 },
391 None => {
392 debug!(relayer_id = %relayer.id, "No swap configuration specified for Stellar relayer; skipping");
393 continue;
394 }
395 },
396 RelayerNetworkPolicy::Evm(_) => {
397 debug!(relayer_id = %relayer.id, "EVM relayers do not support swap; skipping");
398 continue;
399 }
400 };
401
402 let calendar_schedule = match apalis_cron::Schedule::from_str(&cron_schedule) {
403 Ok(schedule) => schedule,
404 Err(e) => {
405 error!(relayer_id = %relayer.id, error = %e, "Failed to parse cron schedule; skipping");
406 continue;
407 }
408 };
409
410 let worker = WorkerBuilder::new(format!(
412 "{}-swap-schedule-{}",
413 network_type,
414 relayer.id.clone()
415 ))
416 .layer(ErrorHandlingLayer::new())
417 .enable_tracing()
418 .catch_panic()
419 .retry(
420 RetryPolicy::retries(WORKER_TOKEN_SWAP_REQUEST_RETRIES)
421 .with_backoff(swap_backoff.clone()),
422 )
423 .concurrency(1)
424 .data(relayer.id.clone())
425 .data(app_state.clone())
426 .backend(CronStream::new(calendar_schedule))
427 .build_fn(token_swap_cron_handler);
428
429 workers.push(worker);
430 debug!(
431 relayer_id = %relayer.id,
432 network_type = %network_type,
433 "Created worker for relayer with swap enabled"
434 );
435 }
436
437 let mut monitor = Monitor::new()
438 .on_event(monitor_handle_event)
439 .shutdown_timeout(Duration::from_millis(5000));
440
441 for worker in workers {
443 monitor = monitor.register(worker);
444 }
445
446 let monitor_future = monitor.run_with_signal(async {
447 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
448 .expect("Failed to create SIGINT signal");
449 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
450 .expect("Failed to create SIGTERM signal");
451
452 debug!("Swap Monitor started");
453
454 tokio::select! {
455 _ = sigint.recv() => debug!("Received SIGINT."),
456 _ = sigterm.recv() => debug!("Received SIGTERM."),
457 };
458
459 debug!("Swap Monitor shutting down");
460
461 Ok(())
462 });
463 tokio::spawn(async move {
464 if let Err(e) = monitor_future.await {
465 error!(error = %e, "monitor error");
466 }
467 });
468 Ok(())
469}
470
471fn monitor_handle_event(e: Worker<Event>) {
472 let worker_id = e.id();
473 match e.inner() {
474 Event::Engage(task_id) => {
475 debug!(worker_id = %worker_id, task_id = %task_id, "worker got a job");
476 }
477 Event::Error(e) => {
478 error!(worker_id = %worker_id, error = %e, "worker encountered an error");
479 }
480 Event::Exit => {
481 debug!(worker_id = %worker_id, "worker exited");
482 }
483 Event::Idle => {
484 debug!(worker_id = %worker_id, "worker is idle");
485 }
486 Event::Start => {
487 debug!(worker_id = %worker_id, "worker started");
488 }
489 Event::Stop => {
490 debug!(worker_id = %worker_id, "worker stopped");
491 }
492 _ => {}
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499 use crate::models::{
500 NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel, RelayerSolanaPolicy,
501 RelayerSolanaSwapConfig, RelayerStellarPolicy, RelayerStellarSwapConfig,
502 StellarFeePaymentStrategy, StellarSwapStrategy,
503 };
504
505 fn create_test_evm_relayer(id: &str) -> RelayerRepoModel {
506 RelayerRepoModel {
507 id: id.to_string(),
508 name: format!("EVM Relayer {}", id),
509 network: "sepolia".to_string(),
510 paused: false,
511 network_type: NetworkType::Evm,
512 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
513 signer_id: "test-signer".to_string(),
514 address: "0x742d35Cc6634C0532925a3b8D8C2e48a73F6ba2E".to_string(),
515 system_disabled: false,
516 ..Default::default()
517 }
518 }
519
520 fn create_test_solana_relayer_with_swap(
521 id: &str,
522 cron_schedule: Option<String>,
523 ) -> RelayerRepoModel {
524 RelayerRepoModel {
525 id: id.to_string(),
526 name: format!("Solana Relayer {}", id),
527 network: "mainnet-beta".to_string(),
528 paused: false,
529 network_type: NetworkType::Solana,
530 policies: RelayerNetworkPolicy::Solana(RelayerSolanaPolicy {
531 min_balance: Some(1000000000),
532 allowed_tokens: None,
533 allowed_programs: None,
534 max_signatures: None,
535 max_tx_data_size: None,
536 fee_payment_strategy: None,
537 fee_margin_percentage: None,
538 allowed_accounts: None,
539 disallowed_accounts: None,
540 max_allowed_fee_lamports: None,
541 swap_config: Some(RelayerSolanaSwapConfig {
542 strategy: None,
543 cron_schedule,
544 min_balance_threshold: Some(5000000000),
545 jupiter_swap_options: None,
546 }),
547 }),
548 signer_id: "test-signer".to_string(),
549 address: "5zWma6gn4QxRfC6xZk6KfpXWXXgV3Xt6VzPpXMKCMYW5".to_string(),
550 system_disabled: false,
551 ..Default::default()
552 }
553 }
554
555 fn create_test_stellar_relayer_with_swap(
556 id: &str,
557 cron_schedule: Option<String>,
558 ) -> RelayerRepoModel {
559 RelayerRepoModel {
560 id: id.to_string(),
561 name: format!("Stellar Relayer {}", id),
562 network: "testnet".to_string(),
563 paused: false,
564 network_type: NetworkType::Stellar,
565 policies: RelayerNetworkPolicy::Stellar(RelayerStellarPolicy {
566 min_balance: Some(1000000000),
567 max_fee: None,
568 timeout_seconds: None,
569 concurrent_transactions: None,
570 allowed_tokens: None,
571 fee_payment_strategy: Some(StellarFeePaymentStrategy::User),
572 slippage_percentage: None,
573 fee_margin_percentage: None,
574 swap_config: Some(RelayerStellarSwapConfig {
575 strategies: vec![StellarSwapStrategy::OrderBook],
576 cron_schedule,
577 min_balance_threshold: Some(5000000000),
578 }),
579 }),
580 signer_id: "test-signer".to_string(),
581 address: "GABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890ABCDEFGH".to_string(),
582 system_disabled: false,
583 ..Default::default()
584 }
585 }
586
587 #[test]
588 fn test_filter_relayers_for_swap_with_empty_list() {
589 let relayers = vec![];
590 let filtered = filter_relayers_for_swap(relayers);
591
592 assert_eq!(
593 filtered.len(),
594 0,
595 "Should return empty list when no relayers provided"
596 );
597 }
598
599 #[test]
600 fn test_filter_relayers_for_swap_filters_non_solana_stellar() {
601 let relayers = vec![
602 create_test_evm_relayer("evm-1"),
603 create_test_evm_relayer("evm-2"),
604 ];
605
606 let filtered = filter_relayers_for_swap(relayers);
607
608 assert_eq!(
609 filtered.len(),
610 0,
611 "Should filter out all non-Solana/Stellar relayers"
612 );
613 }
614
615 #[test]
616 fn test_filter_relayers_for_swap_filters_no_cron_schedule() {
617 let relayers = vec![
618 create_test_solana_relayer_with_swap("solana-1", None),
619 create_test_solana_relayer_with_swap("solana-2", None),
620 create_test_stellar_relayer_with_swap("stellar-1", None),
621 create_test_stellar_relayer_with_swap("stellar-2", None),
622 ];
623
624 let filtered = filter_relayers_for_swap(relayers);
625
626 assert_eq!(
627 filtered.len(),
628 0,
629 "Should filter out Solana and Stellar relayers without cron schedule"
630 );
631 }
632
633 #[test]
634 fn test_filter_relayers_for_swap_includes_valid_relayers() {
635 let relayers = vec![
636 create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())),
637 create_test_solana_relayer_with_swap("solana-2", Some("0 */2 * * * *".to_string())),
638 create_test_stellar_relayer_with_swap("stellar-1", Some("0 0 * * * *".to_string())),
639 create_test_stellar_relayer_with_swap("stellar-2", Some("0 */2 * * * *".to_string())),
640 ];
641
642 let filtered = filter_relayers_for_swap(relayers);
643
644 assert_eq!(
645 filtered.len(),
646 4,
647 "Should include all Solana and Stellar relayers with cron schedule"
648 );
649 let ids: Vec<&str> = filtered.iter().map(|r| r.id.as_str()).collect();
650 assert!(ids.contains(&"solana-1"), "Should include solana-1");
651 assert!(ids.contains(&"solana-2"), "Should include solana-2");
652 assert!(ids.contains(&"stellar-1"), "Should include stellar-1");
653 assert!(ids.contains(&"stellar-2"), "Should include stellar-2");
654 }
655
656 #[test]
657 fn test_filter_relayers_for_swap_with_mixed_relayers() {
658 let relayers = vec![
659 create_test_evm_relayer("evm-1"),
660 create_test_solana_relayer_with_swap("solana-no-cron", None),
661 create_test_solana_relayer_with_swap(
662 "solana-with-cron-1",
663 Some("0 0 * * * *".to_string()),
664 ),
665 create_test_evm_relayer("evm-2"),
666 create_test_solana_relayer_with_swap(
667 "solana-with-cron-2",
668 Some("0 */3 * * * *".to_string()),
669 ),
670 create_test_stellar_relayer_with_swap("stellar-no-cron", None),
671 create_test_stellar_relayer_with_swap(
672 "stellar-with-cron-1",
673 Some("0 0 * * * *".to_string()),
674 ),
675 create_test_stellar_relayer_with_swap(
676 "stellar-with-cron-2",
677 Some("0 */3 * * * *".to_string()),
678 ),
679 ];
680
681 let filtered = filter_relayers_for_swap(relayers);
682
683 assert_eq!(
684 filtered.len(),
685 4,
686 "Should only include Solana and Stellar relayers with cron schedule"
687 );
688
689 let ids: Vec<&str> = filtered.iter().map(|r| r.id.as_str()).collect();
691 assert!(
692 ids.contains(&"solana-with-cron-1"),
693 "Should include solana-with-cron-1"
694 );
695 assert!(
696 ids.contains(&"solana-with-cron-2"),
697 "Should include solana-with-cron-2"
698 );
699 assert!(
700 ids.contains(&"stellar-with-cron-1"),
701 "Should include stellar-with-cron-1"
702 );
703 assert!(
704 ids.contains(&"stellar-with-cron-2"),
705 "Should include stellar-with-cron-2"
706 );
707 assert!(!ids.contains(&"evm-1"), "Should not include EVM relayers");
708 assert!(
709 !ids.contains(&"solana-no-cron"),
710 "Should not include Solana without cron"
711 );
712 assert!(
713 !ids.contains(&"stellar-no-cron"),
714 "Should not include Stellar without cron"
715 );
716 }
717
718 #[test]
719 fn test_filter_relayers_for_swap_preserves_solana_relayer_data() {
720 let cron = "0 1 * * * *".to_string();
721 let relayers = vec![create_test_solana_relayer_with_swap(
722 "test-relayer",
723 Some(cron.clone()),
724 )];
725
726 let filtered = filter_relayers_for_swap(relayers);
727
728 assert_eq!(filtered.len(), 1);
729
730 let relayer = &filtered[0];
731 assert_eq!(relayer.id, "test-relayer");
732 assert_eq!(relayer.name, "Solana Relayer test-relayer");
733 assert_eq!(relayer.network_type, NetworkType::Solana);
734
735 let policy = relayer.policies.get_solana_policy();
737 let swap_config = policy.get_swap_config().expect("Should have swap config");
738 assert_eq!(swap_config.cron_schedule.as_ref(), Some(&cron));
739 }
740
741 #[test]
742 fn test_filter_relayers_for_swap_preserves_stellar_relayer_data() {
743 let cron = "0 1 * * * *".to_string();
744 let relayers = vec![create_test_stellar_relayer_with_swap(
745 "test-relayer",
746 Some(cron.clone()),
747 )];
748
749 let filtered = filter_relayers_for_swap(relayers);
750
751 assert_eq!(filtered.len(), 1);
752
753 let relayer = &filtered[0];
754 assert_eq!(relayer.id, "test-relayer");
755 assert_eq!(relayer.name, "Stellar Relayer test-relayer");
756 assert_eq!(relayer.network_type, NetworkType::Stellar);
757
758 let policy = relayer.policies.get_stellar_policy();
760 let swap_config = policy.get_swap_config().expect("Should have swap config");
761 assert_eq!(swap_config.cron_schedule.as_ref(), Some(&cron));
762 }
763
764 fn create_test_solana_relayer_without_swap_config(id: &str) -> RelayerRepoModel {
765 RelayerRepoModel {
766 id: id.to_string(),
767 name: format!("Solana Relayer {}", id),
768 network: "mainnet-beta".to_string(),
769 paused: false,
770 network_type: NetworkType::Solana,
771 policies: RelayerNetworkPolicy::Solana(RelayerSolanaPolicy {
772 min_balance: Some(1000000000),
773 allowed_tokens: None,
774 allowed_programs: None,
775 max_signatures: None,
776 max_tx_data_size: None,
777 fee_payment_strategy: None,
778 fee_margin_percentage: None,
779 allowed_accounts: None,
780 disallowed_accounts: None,
781 max_allowed_fee_lamports: None,
782 swap_config: None, }),
784 signer_id: "test-signer".to_string(),
785 address: "5zWma6gn4QxRfC6xZk6KfpXWXXgV3Xt6VzPpXMKCMYW5".to_string(),
786 system_disabled: false,
787 ..Default::default()
788 }
789 }
790
791 fn create_test_stellar_relayer_without_swap_config(id: &str) -> RelayerRepoModel {
792 RelayerRepoModel {
793 id: id.to_string(),
794 name: format!("Stellar Relayer {}", id),
795 network: "testnet".to_string(),
796 paused: false,
797 network_type: NetworkType::Stellar,
798 policies: RelayerNetworkPolicy::Stellar(RelayerStellarPolicy {
799 min_balance: Some(1000000000),
800 max_fee: None,
801 timeout_seconds: None,
802 concurrent_transactions: None,
803 allowed_tokens: None,
804 fee_payment_strategy: Some(StellarFeePaymentStrategy::User),
805 slippage_percentage: None,
806 fee_margin_percentage: None,
807 swap_config: None, }),
809 signer_id: "test-signer".to_string(),
810 address: "GABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890ABCDEFGH".to_string(),
811 system_disabled: false,
812 ..Default::default()
813 }
814 }
815
816 #[test]
817 fn test_filter_relayers_for_swap_filters_solana_without_swap_config() {
818 let relayers = vec![
819 create_test_solana_relayer_without_swap_config("solana-1"),
820 create_test_solana_relayer_without_swap_config("solana-2"),
821 ];
822
823 let filtered = filter_relayers_for_swap(relayers);
824
825 assert_eq!(
826 filtered.len(),
827 0,
828 "Should filter out Solana relayers without swap config"
829 );
830 }
831
832 #[test]
833 fn test_filter_relayers_for_swap_filters_stellar_without_swap_config() {
834 let relayers = vec![
835 create_test_stellar_relayer_without_swap_config("stellar-1"),
836 create_test_stellar_relayer_without_swap_config("stellar-2"),
837 ];
838
839 let filtered = filter_relayers_for_swap(relayers);
840
841 assert_eq!(
842 filtered.len(),
843 0,
844 "Should filter out Stellar relayers without swap config"
845 );
846 }
847
848 #[test]
849 fn test_filter_relayers_for_swap_with_mixed_swap_configs() {
850 let relayers = vec![
851 create_test_solana_relayer_without_swap_config("solana-no-config"),
852 create_test_solana_relayer_with_swap("solana-no-cron", None),
853 create_test_solana_relayer_with_swap(
854 "solana-with-cron",
855 Some("0 0 * * * *".to_string()),
856 ),
857 create_test_stellar_relayer_without_swap_config("stellar-no-config"),
858 create_test_stellar_relayer_with_swap("stellar-no-cron", None),
859 create_test_stellar_relayer_with_swap(
860 "stellar-with-cron",
861 Some("0 0 * * * *".to_string()),
862 ),
863 ];
864
865 let filtered = filter_relayers_for_swap(relayers);
866
867 assert_eq!(
868 filtered.len(),
869 2,
870 "Should only include relayers with swap config and cron schedule"
871 );
872
873 let ids: Vec<&str> = filtered.iter().map(|r| r.id.as_str()).collect();
874 assert!(
875 ids.contains(&"solana-with-cron"),
876 "Should include solana-with-cron"
877 );
878 assert!(
879 ids.contains(&"stellar-with-cron"),
880 "Should include stellar-with-cron"
881 );
882 assert!(
883 !ids.contains(&"solana-no-config"),
884 "Should not include solana without config"
885 );
886 assert!(
887 !ids.contains(&"solana-no-cron"),
888 "Should not include solana without cron"
889 );
890 assert!(
891 !ids.contains(&"stellar-no-config"),
892 "Should not include stellar without config"
893 );
894 assert!(
895 !ids.contains(&"stellar-no-cron"),
896 "Should not include stellar without cron"
897 );
898 }
899
900 #[test]
901 fn test_create_backoff_with_valid_parameters() {
902 let result = create_backoff(200, 5000, 0.99);
903 assert!(
904 result.is_ok(),
905 "Should create backoff with valid parameters"
906 );
907 }
908
909 #[test]
910 fn test_create_backoff_with_zero_initial() {
911 let result = create_backoff(0, 5000, 0.99);
912 assert!(
913 result.is_ok(),
914 "Should handle zero initial delay (edge case)"
915 );
916 }
917
918 #[test]
919 fn test_create_backoff_with_equal_initial_and_max() {
920 let result = create_backoff(1000, 1000, 0.5);
921 assert!(result.is_ok(), "Should handle equal initial and max delays");
922 }
923
924 #[test]
925 fn test_create_backoff_with_zero_jitter() {
926 let result = create_backoff(500, 5000, 0.0);
927 assert!(result.is_ok(), "Should handle zero jitter");
928 }
929
930 #[test]
931 fn test_create_backoff_with_max_jitter() {
932 let result = create_backoff(500, 5000, 1.0);
933 assert!(result.is_ok(), "Should handle maximum jitter (1.0)");
934 }
935
936 #[test]
937 fn test_create_backoff_with_small_values() {
938 let result = create_backoff(1, 10, 0.5);
939 assert!(result.is_ok(), "Should handle very small delay values");
940 }
941
942 #[test]
943 fn test_create_backoff_with_large_values() {
944 let result = create_backoff(10000, 60000, 0.99);
945 assert!(result.is_ok(), "Should handle large delay values");
946 }
947
948 #[test]
949 fn test_filter_relayers_preserves_order() {
950 let relayers = vec![
951 create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())),
952 create_test_stellar_relayer_with_swap("stellar-1", Some("0 0 * * * *".to_string())),
953 create_test_solana_relayer_with_swap("solana-2", Some("0 0 * * * *".to_string())),
954 create_test_stellar_relayer_with_swap("stellar-2", Some("0 0 * * * *".to_string())),
955 ];
956
957 let filtered = filter_relayers_for_swap(relayers);
958
959 assert_eq!(filtered.len(), 4);
960 assert_eq!(filtered[0].id, "solana-1");
961 assert_eq!(filtered[1].id, "stellar-1");
962 assert_eq!(filtered[2].id, "solana-2");
963 assert_eq!(filtered[3].id, "stellar-2");
964 }
965
966 #[test]
967 fn test_filter_relayers_with_different_cron_formats() {
968 let relayers = vec![
969 create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())), create_test_solana_relayer_with_swap("solana-2", Some("*/5 * * * * *".to_string())), create_test_stellar_relayer_with_swap("stellar-1", Some("0 0 12 * * *".to_string())), create_test_stellar_relayer_with_swap("stellar-2", Some("0 */15 * * * *".to_string())), ];
974
975 let filtered = filter_relayers_for_swap(relayers);
976
977 assert_eq!(
978 filtered.len(),
979 4,
980 "Should accept various valid cron schedule formats"
981 );
982 }
983
984 #[test]
985 fn test_filter_relayers_with_all_network_types() {
986 let relayers = vec![
987 create_test_evm_relayer("evm-1"),
988 create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())),
989 create_test_stellar_relayer_with_swap("stellar-1", Some("0 0 * * * *".to_string())),
990 ];
991
992 let filtered = filter_relayers_for_swap(relayers);
993
994 assert_eq!(filtered.len(), 2, "Should only include Solana and Stellar");
995
996 let network_types: Vec<NetworkType> =
997 filtered.iter().map(|r| r.network_type.clone()).collect();
998 assert!(
999 network_types.contains(&NetworkType::Solana),
1000 "Should include Solana"
1001 );
1002 assert!(
1003 network_types.contains(&NetworkType::Stellar),
1004 "Should include Stellar"
1005 );
1006 assert!(
1007 !network_types.contains(&NetworkType::Evm),
1008 "Should not include EVM"
1009 );
1010 }
1011}