openzeppelin_relayer/bootstrap/
initialize_workers.rs

1//! Worker initialization
2//!
3//! This module contains functions for initializing background workers,
4//! including job processors and other long-running tasks.
5use crate::{
6    config::ServerConfig,
7    constants::{
8        DEFAULT_CONCURRENCY_HEALTH_CHECK, DEFAULT_CONCURRENCY_NOTIFICATION,
9        DEFAULT_CONCURRENCY_STATUS_CHECKER, DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
10        DEFAULT_CONCURRENCY_STATUS_CHECKER_STELLAR, DEFAULT_CONCURRENCY_TOKEN_SWAP,
11        DEFAULT_CONCURRENCY_TRANSACTION_REQUEST, DEFAULT_CONCURRENCY_TRANSACTION_SENDER,
12        WORKER_NOTIFICATION_SENDER_RETRIES, WORKER_RELAYER_HEALTH_CHECK_RETRIES,
13        WORKER_TOKEN_SWAP_REQUEST_RETRIES, WORKER_TRANSACTION_CLEANUP_RETRIES,
14        WORKER_TRANSACTION_REQUEST_RETRIES, WORKER_TRANSACTION_STATUS_CHECKER_RETRIES,
15        WORKER_TRANSACTION_SUBMIT_RETRIES,
16    },
17    jobs::{
18        notification_handler, relayer_health_check_handler, token_swap_cron_handler,
19        token_swap_request_handler, transaction_cleanup_handler, transaction_request_handler,
20        transaction_status_handler, transaction_submission_handler, JobProducerTrait,
21    },
22    models::{
23        NetworkRepoModel, NotificationRepoModel, RelayerNetworkPolicy, RelayerRepoModel,
24        SignerRepoModel, ThinDataAppState, TransactionRepoModel,
25    },
26    repositories::{
27        ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
28        Repository, TransactionCounterTrait, TransactionRepository,
29    },
30};
31use apalis::prelude::*;
32
33use apalis::layers::retry::backoff::MakeBackoff;
34use apalis::layers::retry::{backoff::ExponentialBackoffMaker, RetryPolicy};
35use apalis::layers::ErrorHandlingLayer;
36
37/// Re-exports from [`tower::util`]
38pub use tower::util::rng::HasherRng;
39
40use apalis_cron::CronStream;
41use eyre::Result;
42use std::{str::FromStr, time::Duration};
43use tokio::signal::unix::SignalKind;
44use tracing::{debug, error, info};
45
46const TRANSACTION_REQUEST: &str = "transaction_request";
47const TRANSACTION_SENDER: &str = "transaction_sender";
48// Generic transaction status checker
49const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
50// Network specific status checkers
51const TRANSACTION_STATUS_CHECKER_EVM: &str = "transaction_status_checker_evm";
52const TRANSACTION_STATUS_CHECKER_STELLAR: &str = "transaction_status_checker_stellar";
53const NOTIFICATION_SENDER: &str = "notification_sender";
54const TOKEN_SWAP_REQUEST: &str = "token_swap_request";
55const TRANSACTION_CLEANUP: &str = "transaction_cleanup";
56const RELAYER_HEALTH_CHECK: &str = "relayer_health_check";
57
58/// Creates an exponential backoff with configurable parameters
59///
60/// # Arguments
61/// * `initial_ms` - Initial delay in milliseconds (e.g., 200)
62/// * `max_ms` - Maximum delay in milliseconds (e.g., 5000)
63/// * `jitter` - Jitter factor 0.0-1.0 (e.g., 0.99 for high jitter)
64///
65/// # Returns
66/// A configured backoff instance ready for use with RetryPolicy
67fn create_backoff(initial_ms: u64, max_ms: u64, jitter: f64) -> Result<ExponentialBackoffMaker> {
68    let maker = ExponentialBackoffMaker::new(
69        Duration::from_millis(initial_ms),
70        Duration::from_millis(max_ms),
71        jitter,
72        HasherRng::default(),
73    )?;
74
75    Ok(maker)
76}
77
78pub async fn initialize_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
79    app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
80) -> Result<()>
81where
82    J: JobProducerTrait + Send + Sync + 'static,
83    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
84    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
85    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
86    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
87    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
88    TCR: TransactionCounterTrait + Send + Sync + 'static,
89    PR: PluginRepositoryTrait + Send + Sync + 'static,
90    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
91{
92    let queue = app_state.job_producer.get_queue().await?;
93
94    let transaction_request_queue_worker = WorkerBuilder::new(TRANSACTION_REQUEST)
95        .layer(ErrorHandlingLayer::new())
96        .retry(
97            RetryPolicy::retries(WORKER_TRANSACTION_REQUEST_RETRIES)
98                .with_backoff(create_backoff(500, 5000, 0.99)?.make_backoff()),
99        )
100        .enable_tracing()
101        .catch_panic()
102        .concurrency(ServerConfig::get_worker_concurrency(
103            TRANSACTION_REQUEST,
104            DEFAULT_CONCURRENCY_TRANSACTION_REQUEST,
105        ))
106        .data(app_state.clone())
107        .backend(queue.transaction_request_queue.clone())
108        .build_fn(transaction_request_handler);
109
110    let transaction_submission_queue_worker = WorkerBuilder::new(TRANSACTION_SENDER)
111        .layer(ErrorHandlingLayer::new())
112        .enable_tracing()
113        .catch_panic()
114        .retry(
115            RetryPolicy::retries(WORKER_TRANSACTION_SUBMIT_RETRIES)
116                .with_backoff(create_backoff(500, 2000, 0.99)?.make_backoff()),
117        )
118        .concurrency(ServerConfig::get_worker_concurrency(
119            TRANSACTION_SENDER,
120            DEFAULT_CONCURRENCY_TRANSACTION_SENDER,
121        ))
122        .data(app_state.clone())
123        .backend(queue.transaction_submission_queue.clone())
124        .build_fn(transaction_submission_handler);
125
126    // Generic status checker
127    // Uses medium settings that work reasonably for most chains
128    let transaction_status_queue_worker = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER)
129        .layer(ErrorHandlingLayer::new())
130        .enable_tracing()
131        .catch_panic()
132        .retry(
133            RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
134                .with_backoff(create_backoff(5000, 8000, 0.99)?.make_backoff()),
135        )
136        .concurrency(ServerConfig::get_worker_concurrency(
137            TRANSACTION_STATUS_CHECKER,
138            DEFAULT_CONCURRENCY_STATUS_CHECKER,
139        ))
140        .data(app_state.clone())
141        .backend(queue.transaction_status_queue.clone())
142        .build_fn(transaction_status_handler);
143
144    // EVM status checker - slower retries to avoid premature resubmission
145    // EVM has longer block times (~12s) and needs time for resubmission logic
146    let transaction_status_queue_worker_evm = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_EVM)
147        .layer(ErrorHandlingLayer::new())
148        .enable_tracing()
149        .catch_panic()
150        .retry(
151            RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
152                .with_backoff(create_backoff(8000, 12000, 0.99)?.make_backoff()),
153        )
154        .concurrency(ServerConfig::get_worker_concurrency(
155            TRANSACTION_STATUS_CHECKER_EVM,
156            DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
157        ))
158        .data(app_state.clone())
159        .backend(queue.transaction_status_queue_evm.clone())
160        .build_fn(transaction_status_handler);
161
162    // Stellar status checker - fast retries for fast finality
163    // Stellar has sub-second finality, needs more frequent status checks
164    let transaction_status_queue_worker_stellar =
165        WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_STELLAR)
166            .layer(ErrorHandlingLayer::new())
167            .enable_tracing()
168            .catch_panic()
169            .retry(
170                RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
171                    .with_backoff(create_backoff(2000, 3000, 0.99)?.make_backoff()),
172            )
173            .concurrency(ServerConfig::get_worker_concurrency(
174                TRANSACTION_STATUS_CHECKER_STELLAR,
175                DEFAULT_CONCURRENCY_STATUS_CHECKER_STELLAR,
176            ))
177            .data(app_state.clone())
178            .backend(queue.transaction_status_queue_stellar.clone())
179            .build_fn(transaction_status_handler);
180
181    let notification_queue_worker = WorkerBuilder::new(NOTIFICATION_SENDER)
182        .layer(ErrorHandlingLayer::new())
183        .enable_tracing()
184        .catch_panic()
185        .retry(
186            RetryPolicy::retries(WORKER_NOTIFICATION_SENDER_RETRIES)
187                .with_backoff(create_backoff(2000, 8000, 0.99)?.make_backoff()),
188        )
189        .concurrency(ServerConfig::get_worker_concurrency(
190            NOTIFICATION_SENDER,
191            DEFAULT_CONCURRENCY_NOTIFICATION,
192        ))
193        .data(app_state.clone())
194        .backend(queue.notification_queue.clone())
195        .build_fn(notification_handler);
196
197    let token_swap_request_queue_worker = WorkerBuilder::new(TOKEN_SWAP_REQUEST)
198        .layer(ErrorHandlingLayer::new())
199        .enable_tracing()
200        .catch_panic()
201        .retry(
202            RetryPolicy::retries(WORKER_TOKEN_SWAP_REQUEST_RETRIES)
203                .with_backoff(create_backoff(5000, 20000, 0.99)?.make_backoff()),
204        )
205        .concurrency(ServerConfig::get_worker_concurrency(
206            TOKEN_SWAP_REQUEST,
207            DEFAULT_CONCURRENCY_TOKEN_SWAP,
208        ))
209        .data(app_state.clone())
210        .backend(queue.token_swap_request_queue.clone())
211        .build_fn(token_swap_request_handler);
212
213    let transaction_cleanup_queue_worker = WorkerBuilder::new(TRANSACTION_CLEANUP)
214        .layer(ErrorHandlingLayer::new())
215        .enable_tracing()
216        .catch_panic()
217        .retry(
218            RetryPolicy::retries(WORKER_TRANSACTION_CLEANUP_RETRIES)
219                .with_backoff(create_backoff(5000, 20000, 0.99)?.make_backoff()),
220        )
221        .concurrency(ServerConfig::get_worker_concurrency(TRANSACTION_CLEANUP, 1)) // Default to 1 to avoid DB conflicts
222        .data(app_state.clone())
223        .backend(CronStream::new(
224            // every 30 minutes
225            apalis_cron::Schedule::from_str("0 */30 * * * *")?,
226        ))
227        .build_fn(transaction_cleanup_handler);
228
229    let relayer_health_check_worker = WorkerBuilder::new(RELAYER_HEALTH_CHECK)
230        .layer(ErrorHandlingLayer::new())
231        .enable_tracing()
232        .catch_panic()
233        .retry(
234            RetryPolicy::retries(WORKER_RELAYER_HEALTH_CHECK_RETRIES)
235                .with_backoff(create_backoff(2000, 10000, 0.99)?.make_backoff()),
236        )
237        .concurrency(ServerConfig::get_worker_concurrency(
238            RELAYER_HEALTH_CHECK,
239            DEFAULT_CONCURRENCY_HEALTH_CHECK,
240        ))
241        .data(app_state.clone())
242        .backend(queue.relayer_health_check_queue.clone())
243        .build_fn(relayer_health_check_handler);
244
245    let monitor = Monitor::new()
246        .register(transaction_request_queue_worker)
247        .register(transaction_submission_queue_worker)
248        .register(transaction_status_queue_worker)
249        .register(transaction_status_queue_worker_evm)
250        .register(transaction_status_queue_worker_stellar)
251        .register(notification_queue_worker)
252        .register(token_swap_request_queue_worker)
253        .register(transaction_cleanup_queue_worker)
254        .register(relayer_health_check_worker)
255        .on_event(monitor_handle_event)
256        .shutdown_timeout(Duration::from_millis(5000));
257
258    let monitor_future = monitor.run_with_signal(async {
259        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
260            .expect("Failed to create SIGINT signal");
261        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
262            .expect("Failed to create SIGTERM signal");
263
264        debug!("Workers monitor started");
265
266        tokio::select! {
267            _ = sigint.recv() => debug!("Received SIGINT."),
268            _ = sigterm.recv() => debug!("Received SIGTERM."),
269        };
270
271        debug!("Workers monitor shutting down");
272
273        Ok(())
274    });
275    tokio::spawn(async move {
276        if let Err(e) = monitor_future.await {
277            error!(error = %e, "monitor error");
278        }
279    });
280    debug!("Workers monitor shutdown complete");
281
282    Ok(())
283}
284
285/// Filters relayers to find those eligible for swap workers (Solana or Stellar)
286/// Returns relayers that have:
287/// 1. Solana or Stellar network type
288/// 2. Swap configuration
289/// 3. Cron schedule defined
290fn filter_relayers_for_swap(relayers: Vec<RelayerRepoModel>) -> Vec<RelayerRepoModel> {
291    relayers
292        .into_iter()
293        .filter(|relayer| {
294            match &relayer.policies {
295                RelayerNetworkPolicy::Solana(policy) => {
296                    let swap_config = match policy.get_swap_config() {
297                        Some(config) => config,
298                        None => {
299                            debug!(relayer_id = %relayer.id, "No Solana swap configuration specified; skipping");
300                            return false;
301                        }
302                    };
303
304                    if swap_config.cron_schedule.is_none() {
305                        debug!(relayer_id = %relayer.id, "No cron schedule specified; skipping");
306                        return false;
307                    }
308                    true
309                }
310                RelayerNetworkPolicy::Stellar(policy) => {
311                    let swap_config = match policy.get_swap_config() {
312                        Some(config) => config,
313                        None => {
314                            debug!(relayer_id = %relayer.id, "No Stellar swap configuration specified; skipping");
315                            return false;
316                        }
317                    };
318
319                    if swap_config.cron_schedule.is_none() {
320                        debug!(relayer_id = %relayer.id, "No cron schedule specified; skipping");
321                        return false;
322                    }
323                    true
324                }
325                _ => {
326                    debug!(relayer_id = %relayer.id, "Network type does not support swap; skipping");
327                    false
328                }
329            }
330        })
331        .collect()
332}
333
334/// Initializes swap workers for Solana and Stellar relayers
335/// This function creates and registers workers for relayers that have swap enabled and cron schedule set.
336pub async fn initialize_token_swap_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
337    app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
338) -> Result<()>
339where
340    J: JobProducerTrait + Send + Sync + 'static,
341    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
342    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
343    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
344    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
345    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
346    TCR: TransactionCounterTrait + Send + Sync + 'static,
347    PR: PluginRepositoryTrait + Send + Sync + 'static,
348    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
349{
350    let active_relayers = app_state.relayer_repository.list_active().await?;
351    let relayers_with_swap_enabled = filter_relayers_for_swap(active_relayers);
352
353    if relayers_with_swap_enabled.is_empty() {
354        debug!("No relayers with swap enabled");
355        return Ok(());
356    }
357    info!(
358        "Found {} relayers with swap enabled",
359        relayers_with_swap_enabled.len()
360    );
361
362    let mut workers = Vec::new();
363
364    let swap_backoff = create_backoff(2000, 5000, 0.99)?.make_backoff();
365
366    for relayer in relayers_with_swap_enabled {
367        debug!(relayer = ?relayer, "found relayer with swap enabled");
368
369        let (cron_schedule, network_type) = match &relayer.policies {
370            RelayerNetworkPolicy::Solana(policy) => match policy.get_swap_config() {
371                Some(config) => match config.cron_schedule {
372                    Some(schedule) => (schedule, "solana".to_string()),
373                    None => {
374                        debug!(relayer_id = %relayer.id, "No cron schedule specified for Solana relayer; skipping");
375                        continue;
376                    }
377                },
378                None => {
379                    debug!(relayer_id = %relayer.id, "No swap configuration specified for Solana relayer; skipping");
380                    continue;
381                }
382            },
383            RelayerNetworkPolicy::Stellar(policy) => match policy.get_swap_config() {
384                Some(config) => match config.cron_schedule {
385                    Some(schedule) => (schedule, "stellar".to_string()),
386                    None => {
387                        debug!(relayer_id = %relayer.id, "No cron schedule specified for Stellar relayer; skipping");
388                        continue;
389                    }
390                },
391                None => {
392                    debug!(relayer_id = %relayer.id, "No swap configuration specified for Stellar relayer; skipping");
393                    continue;
394                }
395            },
396            RelayerNetworkPolicy::Evm(_) => {
397                debug!(relayer_id = %relayer.id, "EVM relayers do not support swap; skipping");
398                continue;
399            }
400        };
401
402        let calendar_schedule = match apalis_cron::Schedule::from_str(&cron_schedule) {
403            Ok(schedule) => schedule,
404            Err(e) => {
405                error!(relayer_id = %relayer.id, error = %e, "Failed to parse cron schedule; skipping");
406                continue;
407            }
408        };
409
410        // Create worker and add to the workers vector
411        let worker = WorkerBuilder::new(format!(
412            "{}-swap-schedule-{}",
413            network_type,
414            relayer.id.clone()
415        ))
416        .layer(ErrorHandlingLayer::new())
417        .enable_tracing()
418        .catch_panic()
419        .retry(
420            RetryPolicy::retries(WORKER_TOKEN_SWAP_REQUEST_RETRIES)
421                .with_backoff(swap_backoff.clone()),
422        )
423        .concurrency(1)
424        .data(relayer.id.clone())
425        .data(app_state.clone())
426        .backend(CronStream::new(calendar_schedule))
427        .build_fn(token_swap_cron_handler);
428
429        workers.push(worker);
430        debug!(
431            relayer_id = %relayer.id,
432            network_type = %network_type,
433            "Created worker for relayer with swap enabled"
434        );
435    }
436
437    let mut monitor = Monitor::new()
438        .on_event(monitor_handle_event)
439        .shutdown_timeout(Duration::from_millis(5000));
440
441    // Register all workers with the monitor
442    for worker in workers {
443        monitor = monitor.register(worker);
444    }
445
446    let monitor_future = monitor.run_with_signal(async {
447        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
448            .expect("Failed to create SIGINT signal");
449        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
450            .expect("Failed to create SIGTERM signal");
451
452        debug!("Swap Monitor started");
453
454        tokio::select! {
455            _ = sigint.recv() => debug!("Received SIGINT."),
456            _ = sigterm.recv() => debug!("Received SIGTERM."),
457        };
458
459        debug!("Swap Monitor shutting down");
460
461        Ok(())
462    });
463    tokio::spawn(async move {
464        if let Err(e) = monitor_future.await {
465            error!(error = %e, "monitor error");
466        }
467    });
468    Ok(())
469}
470
471fn monitor_handle_event(e: Worker<Event>) {
472    let worker_id = e.id();
473    match e.inner() {
474        Event::Engage(task_id) => {
475            debug!(worker_id = %worker_id, task_id = %task_id, "worker got a job");
476        }
477        Event::Error(e) => {
478            error!(worker_id = %worker_id, error = %e, "worker encountered an error");
479        }
480        Event::Exit => {
481            debug!(worker_id = %worker_id, "worker exited");
482        }
483        Event::Idle => {
484            debug!(worker_id = %worker_id, "worker is idle");
485        }
486        Event::Start => {
487            debug!(worker_id = %worker_id, "worker started");
488        }
489        Event::Stop => {
490            debug!(worker_id = %worker_id, "worker stopped");
491        }
492        _ => {}
493    }
494}
495
496#[cfg(test)]
497mod tests {
498    use super::*;
499    use crate::models::{
500        NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel, RelayerSolanaPolicy,
501        RelayerSolanaSwapConfig, RelayerStellarPolicy, RelayerStellarSwapConfig,
502        StellarFeePaymentStrategy, StellarSwapStrategy,
503    };
504
505    fn create_test_evm_relayer(id: &str) -> RelayerRepoModel {
506        RelayerRepoModel {
507            id: id.to_string(),
508            name: format!("EVM Relayer {}", id),
509            network: "sepolia".to_string(),
510            paused: false,
511            network_type: NetworkType::Evm,
512            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
513            signer_id: "test-signer".to_string(),
514            address: "0x742d35Cc6634C0532925a3b8D8C2e48a73F6ba2E".to_string(),
515            system_disabled: false,
516            ..Default::default()
517        }
518    }
519
520    fn create_test_solana_relayer_with_swap(
521        id: &str,
522        cron_schedule: Option<String>,
523    ) -> RelayerRepoModel {
524        RelayerRepoModel {
525            id: id.to_string(),
526            name: format!("Solana Relayer {}", id),
527            network: "mainnet-beta".to_string(),
528            paused: false,
529            network_type: NetworkType::Solana,
530            policies: RelayerNetworkPolicy::Solana(RelayerSolanaPolicy {
531                min_balance: Some(1000000000),
532                allowed_tokens: None,
533                allowed_programs: None,
534                max_signatures: None,
535                max_tx_data_size: None,
536                fee_payment_strategy: None,
537                fee_margin_percentage: None,
538                allowed_accounts: None,
539                disallowed_accounts: None,
540                max_allowed_fee_lamports: None,
541                swap_config: Some(RelayerSolanaSwapConfig {
542                    strategy: None,
543                    cron_schedule,
544                    min_balance_threshold: Some(5000000000),
545                    jupiter_swap_options: None,
546                }),
547            }),
548            signer_id: "test-signer".to_string(),
549            address: "5zWma6gn4QxRfC6xZk6KfpXWXXgV3Xt6VzPpXMKCMYW5".to_string(),
550            system_disabled: false,
551            ..Default::default()
552        }
553    }
554
555    fn create_test_stellar_relayer_with_swap(
556        id: &str,
557        cron_schedule: Option<String>,
558    ) -> RelayerRepoModel {
559        RelayerRepoModel {
560            id: id.to_string(),
561            name: format!("Stellar Relayer {}", id),
562            network: "testnet".to_string(),
563            paused: false,
564            network_type: NetworkType::Stellar,
565            policies: RelayerNetworkPolicy::Stellar(RelayerStellarPolicy {
566                min_balance: Some(1000000000),
567                max_fee: None,
568                timeout_seconds: None,
569                concurrent_transactions: None,
570                allowed_tokens: None,
571                fee_payment_strategy: Some(StellarFeePaymentStrategy::User),
572                slippage_percentage: None,
573                fee_margin_percentage: None,
574                swap_config: Some(RelayerStellarSwapConfig {
575                    strategies: vec![StellarSwapStrategy::OrderBook],
576                    cron_schedule,
577                    min_balance_threshold: Some(5000000000),
578                }),
579            }),
580            signer_id: "test-signer".to_string(),
581            address: "GABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890ABCDEFGH".to_string(),
582            system_disabled: false,
583            ..Default::default()
584        }
585    }
586
587    #[test]
588    fn test_filter_relayers_for_swap_with_empty_list() {
589        let relayers = vec![];
590        let filtered = filter_relayers_for_swap(relayers);
591
592        assert_eq!(
593            filtered.len(),
594            0,
595            "Should return empty list when no relayers provided"
596        );
597    }
598
599    #[test]
600    fn test_filter_relayers_for_swap_filters_non_solana_stellar() {
601        let relayers = vec![
602            create_test_evm_relayer("evm-1"),
603            create_test_evm_relayer("evm-2"),
604        ];
605
606        let filtered = filter_relayers_for_swap(relayers);
607
608        assert_eq!(
609            filtered.len(),
610            0,
611            "Should filter out all non-Solana/Stellar relayers"
612        );
613    }
614
615    #[test]
616    fn test_filter_relayers_for_swap_filters_no_cron_schedule() {
617        let relayers = vec![
618            create_test_solana_relayer_with_swap("solana-1", None),
619            create_test_solana_relayer_with_swap("solana-2", None),
620            create_test_stellar_relayer_with_swap("stellar-1", None),
621            create_test_stellar_relayer_with_swap("stellar-2", None),
622        ];
623
624        let filtered = filter_relayers_for_swap(relayers);
625
626        assert_eq!(
627            filtered.len(),
628            0,
629            "Should filter out Solana and Stellar relayers without cron schedule"
630        );
631    }
632
633    #[test]
634    fn test_filter_relayers_for_swap_includes_valid_relayers() {
635        let relayers = vec![
636            create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())),
637            create_test_solana_relayer_with_swap("solana-2", Some("0 */2 * * * *".to_string())),
638            create_test_stellar_relayer_with_swap("stellar-1", Some("0 0 * * * *".to_string())),
639            create_test_stellar_relayer_with_swap("stellar-2", Some("0 */2 * * * *".to_string())),
640        ];
641
642        let filtered = filter_relayers_for_swap(relayers);
643
644        assert_eq!(
645            filtered.len(),
646            4,
647            "Should include all Solana and Stellar relayers with cron schedule"
648        );
649        let ids: Vec<&str> = filtered.iter().map(|r| r.id.as_str()).collect();
650        assert!(ids.contains(&"solana-1"), "Should include solana-1");
651        assert!(ids.contains(&"solana-2"), "Should include solana-2");
652        assert!(ids.contains(&"stellar-1"), "Should include stellar-1");
653        assert!(ids.contains(&"stellar-2"), "Should include stellar-2");
654    }
655
656    #[test]
657    fn test_filter_relayers_for_swap_with_mixed_relayers() {
658        let relayers = vec![
659            create_test_evm_relayer("evm-1"),
660            create_test_solana_relayer_with_swap("solana-no-cron", None),
661            create_test_solana_relayer_with_swap(
662                "solana-with-cron-1",
663                Some("0 0 * * * *".to_string()),
664            ),
665            create_test_evm_relayer("evm-2"),
666            create_test_solana_relayer_with_swap(
667                "solana-with-cron-2",
668                Some("0 */3 * * * *".to_string()),
669            ),
670            create_test_stellar_relayer_with_swap("stellar-no-cron", None),
671            create_test_stellar_relayer_with_swap(
672                "stellar-with-cron-1",
673                Some("0 0 * * * *".to_string()),
674            ),
675            create_test_stellar_relayer_with_swap(
676                "stellar-with-cron-2",
677                Some("0 */3 * * * *".to_string()),
678            ),
679        ];
680
681        let filtered = filter_relayers_for_swap(relayers);
682
683        assert_eq!(
684            filtered.len(),
685            4,
686            "Should only include Solana and Stellar relayers with cron schedule"
687        );
688
689        // Verify the correct relayers were included
690        let ids: Vec<&str> = filtered.iter().map(|r| r.id.as_str()).collect();
691        assert!(
692            ids.contains(&"solana-with-cron-1"),
693            "Should include solana-with-cron-1"
694        );
695        assert!(
696            ids.contains(&"solana-with-cron-2"),
697            "Should include solana-with-cron-2"
698        );
699        assert!(
700            ids.contains(&"stellar-with-cron-1"),
701            "Should include stellar-with-cron-1"
702        );
703        assert!(
704            ids.contains(&"stellar-with-cron-2"),
705            "Should include stellar-with-cron-2"
706        );
707        assert!(!ids.contains(&"evm-1"), "Should not include EVM relayers");
708        assert!(
709            !ids.contains(&"solana-no-cron"),
710            "Should not include Solana without cron"
711        );
712        assert!(
713            !ids.contains(&"stellar-no-cron"),
714            "Should not include Stellar without cron"
715        );
716    }
717
718    #[test]
719    fn test_filter_relayers_for_swap_preserves_solana_relayer_data() {
720        let cron = "0 1 * * * *".to_string();
721        let relayers = vec![create_test_solana_relayer_with_swap(
722            "test-relayer",
723            Some(cron.clone()),
724        )];
725
726        let filtered = filter_relayers_for_swap(relayers);
727
728        assert_eq!(filtered.len(), 1);
729
730        let relayer = &filtered[0];
731        assert_eq!(relayer.id, "test-relayer");
732        assert_eq!(relayer.name, "Solana Relayer test-relayer");
733        assert_eq!(relayer.network_type, NetworkType::Solana);
734
735        // Verify swap config is preserved
736        let policy = relayer.policies.get_solana_policy();
737        let swap_config = policy.get_swap_config().expect("Should have swap config");
738        assert_eq!(swap_config.cron_schedule.as_ref(), Some(&cron));
739    }
740
741    #[test]
742    fn test_filter_relayers_for_swap_preserves_stellar_relayer_data() {
743        let cron = "0 1 * * * *".to_string();
744        let relayers = vec![create_test_stellar_relayer_with_swap(
745            "test-relayer",
746            Some(cron.clone()),
747        )];
748
749        let filtered = filter_relayers_for_swap(relayers);
750
751        assert_eq!(filtered.len(), 1);
752
753        let relayer = &filtered[0];
754        assert_eq!(relayer.id, "test-relayer");
755        assert_eq!(relayer.name, "Stellar Relayer test-relayer");
756        assert_eq!(relayer.network_type, NetworkType::Stellar);
757
758        // Verify swap config is preserved
759        let policy = relayer.policies.get_stellar_policy();
760        let swap_config = policy.get_swap_config().expect("Should have swap config");
761        assert_eq!(swap_config.cron_schedule.as_ref(), Some(&cron));
762    }
763
764    fn create_test_solana_relayer_without_swap_config(id: &str) -> RelayerRepoModel {
765        RelayerRepoModel {
766            id: id.to_string(),
767            name: format!("Solana Relayer {}", id),
768            network: "mainnet-beta".to_string(),
769            paused: false,
770            network_type: NetworkType::Solana,
771            policies: RelayerNetworkPolicy::Solana(RelayerSolanaPolicy {
772                min_balance: Some(1000000000),
773                allowed_tokens: None,
774                allowed_programs: None,
775                max_signatures: None,
776                max_tx_data_size: None,
777                fee_payment_strategy: None,
778                fee_margin_percentage: None,
779                allowed_accounts: None,
780                disallowed_accounts: None,
781                max_allowed_fee_lamports: None,
782                swap_config: None, // No swap config
783            }),
784            signer_id: "test-signer".to_string(),
785            address: "5zWma6gn4QxRfC6xZk6KfpXWXXgV3Xt6VzPpXMKCMYW5".to_string(),
786            system_disabled: false,
787            ..Default::default()
788        }
789    }
790
791    fn create_test_stellar_relayer_without_swap_config(id: &str) -> RelayerRepoModel {
792        RelayerRepoModel {
793            id: id.to_string(),
794            name: format!("Stellar Relayer {}", id),
795            network: "testnet".to_string(),
796            paused: false,
797            network_type: NetworkType::Stellar,
798            policies: RelayerNetworkPolicy::Stellar(RelayerStellarPolicy {
799                min_balance: Some(1000000000),
800                max_fee: None,
801                timeout_seconds: None,
802                concurrent_transactions: None,
803                allowed_tokens: None,
804                fee_payment_strategy: Some(StellarFeePaymentStrategy::User),
805                slippage_percentage: None,
806                fee_margin_percentage: None,
807                swap_config: None, // No swap config
808            }),
809            signer_id: "test-signer".to_string(),
810            address: "GABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890ABCDEFGH".to_string(),
811            system_disabled: false,
812            ..Default::default()
813        }
814    }
815
816    #[test]
817    fn test_filter_relayers_for_swap_filters_solana_without_swap_config() {
818        let relayers = vec![
819            create_test_solana_relayer_without_swap_config("solana-1"),
820            create_test_solana_relayer_without_swap_config("solana-2"),
821        ];
822
823        let filtered = filter_relayers_for_swap(relayers);
824
825        assert_eq!(
826            filtered.len(),
827            0,
828            "Should filter out Solana relayers without swap config"
829        );
830    }
831
832    #[test]
833    fn test_filter_relayers_for_swap_filters_stellar_without_swap_config() {
834        let relayers = vec![
835            create_test_stellar_relayer_without_swap_config("stellar-1"),
836            create_test_stellar_relayer_without_swap_config("stellar-2"),
837        ];
838
839        let filtered = filter_relayers_for_swap(relayers);
840
841        assert_eq!(
842            filtered.len(),
843            0,
844            "Should filter out Stellar relayers without swap config"
845        );
846    }
847
848    #[test]
849    fn test_filter_relayers_for_swap_with_mixed_swap_configs() {
850        let relayers = vec![
851            create_test_solana_relayer_without_swap_config("solana-no-config"),
852            create_test_solana_relayer_with_swap("solana-no-cron", None),
853            create_test_solana_relayer_with_swap(
854                "solana-with-cron",
855                Some("0 0 * * * *".to_string()),
856            ),
857            create_test_stellar_relayer_without_swap_config("stellar-no-config"),
858            create_test_stellar_relayer_with_swap("stellar-no-cron", None),
859            create_test_stellar_relayer_with_swap(
860                "stellar-with-cron",
861                Some("0 0 * * * *".to_string()),
862            ),
863        ];
864
865        let filtered = filter_relayers_for_swap(relayers);
866
867        assert_eq!(
868            filtered.len(),
869            2,
870            "Should only include relayers with swap config and cron schedule"
871        );
872
873        let ids: Vec<&str> = filtered.iter().map(|r| r.id.as_str()).collect();
874        assert!(
875            ids.contains(&"solana-with-cron"),
876            "Should include solana-with-cron"
877        );
878        assert!(
879            ids.contains(&"stellar-with-cron"),
880            "Should include stellar-with-cron"
881        );
882        assert!(
883            !ids.contains(&"solana-no-config"),
884            "Should not include solana without config"
885        );
886        assert!(
887            !ids.contains(&"solana-no-cron"),
888            "Should not include solana without cron"
889        );
890        assert!(
891            !ids.contains(&"stellar-no-config"),
892            "Should not include stellar without config"
893        );
894        assert!(
895            !ids.contains(&"stellar-no-cron"),
896            "Should not include stellar without cron"
897        );
898    }
899
900    #[test]
901    fn test_create_backoff_with_valid_parameters() {
902        let result = create_backoff(200, 5000, 0.99);
903        assert!(
904            result.is_ok(),
905            "Should create backoff with valid parameters"
906        );
907    }
908
909    #[test]
910    fn test_create_backoff_with_zero_initial() {
911        let result = create_backoff(0, 5000, 0.99);
912        assert!(
913            result.is_ok(),
914            "Should handle zero initial delay (edge case)"
915        );
916    }
917
918    #[test]
919    fn test_create_backoff_with_equal_initial_and_max() {
920        let result = create_backoff(1000, 1000, 0.5);
921        assert!(result.is_ok(), "Should handle equal initial and max delays");
922    }
923
924    #[test]
925    fn test_create_backoff_with_zero_jitter() {
926        let result = create_backoff(500, 5000, 0.0);
927        assert!(result.is_ok(), "Should handle zero jitter");
928    }
929
930    #[test]
931    fn test_create_backoff_with_max_jitter() {
932        let result = create_backoff(500, 5000, 1.0);
933        assert!(result.is_ok(), "Should handle maximum jitter (1.0)");
934    }
935
936    #[test]
937    fn test_create_backoff_with_small_values() {
938        let result = create_backoff(1, 10, 0.5);
939        assert!(result.is_ok(), "Should handle very small delay values");
940    }
941
942    #[test]
943    fn test_create_backoff_with_large_values() {
944        let result = create_backoff(10000, 60000, 0.99);
945        assert!(result.is_ok(), "Should handle large delay values");
946    }
947
948    #[test]
949    fn test_filter_relayers_preserves_order() {
950        let relayers = vec![
951            create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())),
952            create_test_stellar_relayer_with_swap("stellar-1", Some("0 0 * * * *".to_string())),
953            create_test_solana_relayer_with_swap("solana-2", Some("0 0 * * * *".to_string())),
954            create_test_stellar_relayer_with_swap("stellar-2", Some("0 0 * * * *".to_string())),
955        ];
956
957        let filtered = filter_relayers_for_swap(relayers);
958
959        assert_eq!(filtered.len(), 4);
960        assert_eq!(filtered[0].id, "solana-1");
961        assert_eq!(filtered[1].id, "stellar-1");
962        assert_eq!(filtered[2].id, "solana-2");
963        assert_eq!(filtered[3].id, "stellar-2");
964    }
965
966    #[test]
967    fn test_filter_relayers_with_different_cron_formats() {
968        let relayers = vec![
969            create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())), // Every hour
970            create_test_solana_relayer_with_swap("solana-2", Some("*/5 * * * * *".to_string())), // Every 5 seconds
971            create_test_stellar_relayer_with_swap("stellar-1", Some("0 0 12 * * *".to_string())), // Daily at noon
972            create_test_stellar_relayer_with_swap("stellar-2", Some("0 */15 * * * *".to_string())), // Every 15 minutes
973        ];
974
975        let filtered = filter_relayers_for_swap(relayers);
976
977        assert_eq!(
978            filtered.len(),
979            4,
980            "Should accept various valid cron schedule formats"
981        );
982    }
983
984    #[test]
985    fn test_filter_relayers_with_all_network_types() {
986        let relayers = vec![
987            create_test_evm_relayer("evm-1"),
988            create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())),
989            create_test_stellar_relayer_with_swap("stellar-1", Some("0 0 * * * *".to_string())),
990        ];
991
992        let filtered = filter_relayers_for_swap(relayers);
993
994        assert_eq!(filtered.len(), 2, "Should only include Solana and Stellar");
995
996        let network_types: Vec<NetworkType> =
997            filtered.iter().map(|r| r.network_type.clone()).collect();
998        assert!(
999            network_types.contains(&NetworkType::Solana),
1000            "Should include Solana"
1001        );
1002        assert!(
1003            network_types.contains(&NetworkType::Stellar),
1004            "Should include Stellar"
1005        );
1006        assert!(
1007            !network_types.contains(&NetworkType::Evm),
1008            "Should not include EVM"
1009        );
1010    }
1011}