openzeppelin_relayer/jobs/
job_producer.rs

1//! Job producer module for enqueueing jobs to Redis queues.
2//!
3//! Provides functionality for producing various types of jobs:
4//! - Transaction processing jobs
5//! - Transaction submission jobs
6//! - Status monitoring jobs
7//! - Notification jobs
8
9use crate::{
10    jobs::{
11        Job, NotificationSend, Queue, RelayerHealthCheck, TransactionRequest, TransactionSend,
12        TransactionStatusCheck,
13    },
14    models::RelayerError,
15    observability::request_id::get_request_id,
16};
17use apalis::prelude::Storage;
18use apalis_redis::RedisError;
19use async_trait::async_trait;
20use serde::Serialize;
21use thiserror::Error;
22use tokio::sync::Mutex;
23use tracing::{debug, error};
24
25use super::{JobType, TokenSwapRequest};
26
27#[cfg(test)]
28use mockall::automock;
29
30#[derive(Debug, Error, Serialize, Clone)]
31pub enum JobProducerError {
32    #[error("Queue error: {0}")]
33    QueueError(String),
34}
35
36impl From<RedisError> for JobProducerError {
37    fn from(_: RedisError) -> Self {
38        JobProducerError::QueueError("Queue error".to_string())
39    }
40}
41
42impl From<JobProducerError> for RelayerError {
43    fn from(_: JobProducerError) -> Self {
44        RelayerError::QueueError("Queue error".to_string())
45    }
46}
47
48#[derive(Debug)]
49pub struct JobProducer {
50    queue: Mutex<Queue>,
51}
52
53impl Clone for JobProducer {
54    fn clone(&self) -> Self {
55        // We can't clone the Mutex directly, but we can create a new one with a cloned Queue
56        // This requires getting the lock first
57        let queue = self
58            .queue
59            .try_lock()
60            .expect("Failed to lock queue for cloning")
61            .clone();
62
63        Self {
64            queue: Mutex::new(queue),
65        }
66    }
67}
68
69#[async_trait]
70#[cfg_attr(test, automock)]
71pub trait JobProducerTrait: Send + Sync {
72    async fn produce_transaction_request_job(
73        &self,
74        transaction_process_job: TransactionRequest,
75        scheduled_on: Option<i64>,
76    ) -> Result<(), JobProducerError>;
77
78    async fn produce_submit_transaction_job(
79        &self,
80        transaction_submit_job: TransactionSend,
81        scheduled_on: Option<i64>,
82    ) -> Result<(), JobProducerError>;
83
84    async fn produce_check_transaction_status_job(
85        &self,
86        transaction_status_check_job: TransactionStatusCheck,
87        scheduled_on: Option<i64>,
88    ) -> Result<(), JobProducerError>;
89
90    async fn produce_send_notification_job(
91        &self,
92        notification_send_job: NotificationSend,
93        scheduled_on: Option<i64>,
94    ) -> Result<(), JobProducerError>;
95
96    async fn produce_token_swap_request_job(
97        &self,
98        swap_request_job: TokenSwapRequest,
99        scheduled_on: Option<i64>,
100    ) -> Result<(), JobProducerError>;
101
102    async fn produce_relayer_health_check_job(
103        &self,
104        relayer_health_check_job: RelayerHealthCheck,
105        scheduled_on: Option<i64>,
106    ) -> Result<(), JobProducerError>;
107
108    async fn get_queue(&self) -> Result<Queue, JobProducerError>;
109}
110
111impl JobProducer {
112    pub fn new(queue: Queue) -> Self {
113        Self {
114            queue: Mutex::new(queue.clone()),
115        }
116    }
117
118    pub async fn get_queue(&self) -> Result<Queue, JobProducerError> {
119        let queue = self.queue.lock().await;
120
121        Ok(queue.clone())
122    }
123}
124
125#[async_trait]
126impl JobProducerTrait for JobProducer {
127    async fn get_queue(&self) -> Result<Queue, JobProducerError> {
128        let queue = self.queue.lock().await;
129
130        Ok(queue.clone())
131    }
132
133    async fn produce_transaction_request_job(
134        &self,
135        transaction_process_job: TransactionRequest,
136        scheduled_on: Option<i64>,
137    ) -> Result<(), JobProducerError> {
138        debug!(
139            "Producing transaction request job: {:?}",
140            transaction_process_job
141        );
142        let mut queue = self.queue.lock().await;
143        let job = Job::new(JobType::TransactionRequest, transaction_process_job)
144            .with_request_id(get_request_id());
145
146        match scheduled_on {
147            Some(scheduled_on) => {
148                queue
149                    .transaction_request_queue
150                    .schedule(job, scheduled_on)
151                    .await?;
152            }
153            None => {
154                queue.transaction_request_queue.push(job).await?;
155            }
156        }
157        debug!("Transaction job produced successfully");
158
159        Ok(())
160    }
161
162    async fn produce_submit_transaction_job(
163        &self,
164        transaction_submit_job: TransactionSend,
165        scheduled_on: Option<i64>,
166    ) -> Result<(), JobProducerError> {
167        let mut queue = self.queue.lock().await;
168        let job = Job::new(JobType::TransactionSend, transaction_submit_job)
169            .with_request_id(get_request_id());
170
171        match scheduled_on {
172            Some(on) => {
173                queue.transaction_submission_queue.schedule(job, on).await?;
174            }
175            None => {
176                queue.transaction_submission_queue.push(job).await?;
177            }
178        }
179        debug!("Transaction Submit job produced successfully");
180
181        Ok(())
182    }
183
184    async fn produce_check_transaction_status_job(
185        &self,
186        transaction_status_check_job: TransactionStatusCheck,
187        scheduled_on: Option<i64>,
188    ) -> Result<(), JobProducerError> {
189        let mut queue = self.queue.lock().await;
190        let job = Job::new(
191            JobType::TransactionStatusCheck,
192            transaction_status_check_job.clone(),
193        )
194        .with_request_id(get_request_id());
195
196        // Route to the appropriate queue based on network type
197        use crate::models::NetworkType;
198        let status_queue = match transaction_status_check_job.network_type {
199            Some(NetworkType::Evm) => &mut queue.transaction_status_queue_evm,
200            Some(NetworkType::Stellar) => &mut queue.transaction_status_queue_stellar,
201            _ => &mut queue.transaction_status_queue, // Generic queue or legacy messages without network_type
202        };
203
204        match scheduled_on {
205            Some(on) => {
206                status_queue.schedule(job, on).await?;
207            }
208            None => {
209                status_queue.push(job).await?;
210            }
211        }
212        debug!(
213            network_type = ?transaction_status_check_job.network_type,
214            "Transaction Status Check job produced successfully"
215        );
216        Ok(())
217    }
218
219    async fn produce_send_notification_job(
220        &self,
221        notification_send_job: NotificationSend,
222        scheduled_on: Option<i64>,
223    ) -> Result<(), JobProducerError> {
224        let mut queue = self.queue.lock().await;
225        let job = Job::new(JobType::NotificationSend, notification_send_job)
226            .with_request_id(get_request_id());
227
228        match scheduled_on {
229            Some(on) => {
230                queue.notification_queue.schedule(job, on).await?;
231            }
232            None => {
233                queue.notification_queue.push(job).await?;
234            }
235        }
236
237        debug!("Notification Send job produced successfully");
238        Ok(())
239    }
240
241    async fn produce_token_swap_request_job(
242        &self,
243        swap_request_job: TokenSwapRequest,
244        scheduled_on: Option<i64>,
245    ) -> Result<(), JobProducerError> {
246        let mut queue = self.queue.lock().await;
247        let job =
248            Job::new(JobType::TokenSwapRequest, swap_request_job).with_request_id(get_request_id());
249
250        match scheduled_on {
251            Some(on) => {
252                queue.token_swap_request_queue.schedule(job, on).await?;
253            }
254            None => {
255                queue.token_swap_request_queue.push(job).await?;
256            }
257        }
258
259        debug!("Token swap job produced successfully");
260        Ok(())
261    }
262
263    async fn produce_relayer_health_check_job(
264        &self,
265        relayer_health_check_job: RelayerHealthCheck,
266        scheduled_on: Option<i64>,
267    ) -> Result<(), JobProducerError> {
268        let job = Job::new(
269            JobType::RelayerHealthCheck,
270            relayer_health_check_job.clone(),
271        )
272        .with_request_id(get_request_id());
273
274        let mut queue = self.queue.lock().await;
275
276        match scheduled_on {
277            Some(scheduled_on) => {
278                queue
279                    .relayer_health_check_queue
280                    .schedule(job, scheduled_on)
281                    .await?;
282            }
283            None => {
284                queue.relayer_health_check_queue.push(job).await?;
285            }
286        }
287
288        Ok(())
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295    use crate::models::{
296        EvmTransactionResponse, TransactionResponse, TransactionStatus, WebhookNotification,
297        WebhookPayload, U256,
298    };
299    use crate::utils::calculate_scheduled_timestamp;
300
301    #[derive(Clone, Debug)]
302    // Define a simplified queue for testing without using complex mocks
303    struct TestRedisStorage<T> {
304        pub push_called: bool,
305        pub schedule_called: bool,
306        _phantom: std::marker::PhantomData<T>,
307    }
308
309    impl<T> TestRedisStorage<T> {
310        fn new() -> Self {
311            Self {
312                push_called: false,
313                schedule_called: false,
314                _phantom: std::marker::PhantomData,
315            }
316        }
317
318        async fn push(&mut self, _job: T) -> Result<(), JobProducerError> {
319            self.push_called = true;
320            Ok(())
321        }
322
323        async fn schedule(&mut self, _job: T, _timestamp: i64) -> Result<(), JobProducerError> {
324            self.schedule_called = true;
325            Ok(())
326        }
327    }
328
329    // A test version of the Queue
330    #[derive(Clone, Debug)]
331    struct TestQueue {
332        pub transaction_request_queue: TestRedisStorage<Job<TransactionRequest>>,
333        pub transaction_submission_queue: TestRedisStorage<Job<TransactionSend>>,
334        pub transaction_status_queue: TestRedisStorage<Job<TransactionStatusCheck>>,
335        pub transaction_status_queue_evm: TestRedisStorage<Job<TransactionStatusCheck>>,
336        pub transaction_status_queue_stellar: TestRedisStorage<Job<TransactionStatusCheck>>,
337        pub notification_queue: TestRedisStorage<Job<NotificationSend>>,
338        pub token_swap_request_queue: TestRedisStorage<Job<TokenSwapRequest>>,
339        pub relayer_health_check_queue: TestRedisStorage<Job<RelayerHealthCheck>>,
340    }
341
342    impl TestQueue {
343        fn new() -> Self {
344            Self {
345                transaction_request_queue: TestRedisStorage::new(),
346                transaction_submission_queue: TestRedisStorage::new(),
347                transaction_status_queue: TestRedisStorage::new(),
348                transaction_status_queue_evm: TestRedisStorage::new(),
349                transaction_status_queue_stellar: TestRedisStorage::new(),
350                notification_queue: TestRedisStorage::new(),
351                token_swap_request_queue: TestRedisStorage::new(),
352                relayer_health_check_queue: TestRedisStorage::new(),
353            }
354        }
355    }
356
357    // A test version of JobProducer
358    struct TestJobProducer {
359        queue: Mutex<TestQueue>,
360    }
361
362    impl Clone for TestJobProducer {
363        fn clone(&self) -> Self {
364            let queue = self
365                .queue
366                .try_lock()
367                .expect("Failed to lock queue for cloning")
368                .clone();
369            Self {
370                queue: Mutex::new(queue),
371            }
372        }
373    }
374
375    impl TestJobProducer {
376        fn new() -> Self {
377            Self {
378                queue: Mutex::new(TestQueue::new()),
379            }
380        }
381
382        async fn get_queue(&self) -> TestQueue {
383            self.queue.lock().await.clone()
384        }
385    }
386
387    #[async_trait]
388    impl JobProducerTrait for TestJobProducer {
389        async fn get_queue(&self) -> Result<Queue, JobProducerError> {
390            unimplemented!("get_queue not used in tests")
391        }
392
393        async fn produce_transaction_request_job(
394            &self,
395            transaction_process_job: TransactionRequest,
396            scheduled_on: Option<i64>,
397        ) -> Result<(), JobProducerError> {
398            let mut queue = self.queue.lock().await;
399            let job = Job::new(JobType::TransactionRequest, transaction_process_job);
400
401            match scheduled_on {
402                Some(scheduled_on) => {
403                    queue
404                        .transaction_request_queue
405                        .schedule(job, scheduled_on)
406                        .await?;
407                }
408                None => {
409                    queue.transaction_request_queue.push(job).await?;
410                }
411            }
412
413            Ok(())
414        }
415
416        async fn produce_submit_transaction_job(
417            &self,
418            transaction_submit_job: TransactionSend,
419            scheduled_on: Option<i64>,
420        ) -> Result<(), JobProducerError> {
421            let mut queue = self.queue.lock().await;
422            let job = Job::new(JobType::TransactionSend, transaction_submit_job);
423
424            match scheduled_on {
425                Some(on) => {
426                    queue.transaction_submission_queue.schedule(job, on).await?;
427                }
428                None => {
429                    queue.transaction_submission_queue.push(job).await?;
430                }
431            }
432
433            Ok(())
434        }
435
436        async fn produce_check_transaction_status_job(
437            &self,
438            transaction_status_check_job: TransactionStatusCheck,
439            scheduled_on: Option<i64>,
440        ) -> Result<(), JobProducerError> {
441            let mut queue = self.queue.lock().await;
442            let job = Job::new(
443                JobType::TransactionStatusCheck,
444                transaction_status_check_job.clone(),
445            );
446
447            // Route to the appropriate queue based on network type
448            use crate::models::NetworkType;
449            let status_queue = match transaction_status_check_job.network_type {
450                Some(NetworkType::Evm) => &mut queue.transaction_status_queue_evm,
451                Some(NetworkType::Stellar) => &mut queue.transaction_status_queue_stellar,
452                Some(NetworkType::Solana) => &mut queue.transaction_status_queue, // Use default queue
453                None => &mut queue.transaction_status_queue, // Legacy messages without network_type
454            };
455
456            match scheduled_on {
457                Some(on) => {
458                    status_queue.schedule(job, on).await?;
459                }
460                None => {
461                    status_queue.push(job).await?;
462                }
463            }
464
465            Ok(())
466        }
467
468        async fn produce_send_notification_job(
469            &self,
470            notification_send_job: NotificationSend,
471            scheduled_on: Option<i64>,
472        ) -> Result<(), JobProducerError> {
473            let mut queue = self.queue.lock().await;
474            let job = Job::new(JobType::NotificationSend, notification_send_job);
475
476            match scheduled_on {
477                Some(on) => {
478                    queue.notification_queue.schedule(job, on).await?;
479                }
480                None => {
481                    queue.notification_queue.push(job).await?;
482                }
483            }
484
485            Ok(())
486        }
487
488        async fn produce_token_swap_request_job(
489            &self,
490            swap_request_job: TokenSwapRequest,
491            scheduled_on: Option<i64>,
492        ) -> Result<(), JobProducerError> {
493            let mut queue = self.queue.lock().await;
494            let job = Job::new(JobType::TokenSwapRequest, swap_request_job);
495
496            match scheduled_on {
497                Some(on) => {
498                    queue.token_swap_request_queue.schedule(job, on).await?;
499                }
500                None => {
501                    queue.token_swap_request_queue.push(job).await?;
502                }
503            }
504
505            Ok(())
506        }
507
508        async fn produce_relayer_health_check_job(
509            &self,
510            relayer_health_check_job: RelayerHealthCheck,
511            scheduled_on: Option<i64>,
512        ) -> Result<(), JobProducerError> {
513            let mut queue = self.queue.lock().await;
514            let job = Job::new(JobType::RelayerHealthCheck, relayer_health_check_job);
515
516            match scheduled_on {
517                Some(scheduled_on) => {
518                    queue
519                        .relayer_health_check_queue
520                        .schedule(job, scheduled_on)
521                        .await?;
522                }
523                None => {
524                    queue.relayer_health_check_queue.push(job).await?;
525                }
526            }
527
528            Ok(())
529        }
530    }
531
532    #[tokio::test]
533    async fn test_job_producer_operations() {
534        let producer = TestJobProducer::new();
535
536        // Test transaction request job
537        let request = TransactionRequest::new("tx123", "relayer-1");
538        let result = producer
539            .produce_transaction_request_job(request, None)
540            .await;
541        assert!(result.is_ok());
542
543        let queue = producer.get_queue().await;
544        assert!(queue.transaction_request_queue.push_called);
545
546        // Test scheduled job
547        let producer = TestJobProducer::new();
548        let request = TransactionRequest::new("tx123", "relayer-1");
549        let scheduled_timestamp = calculate_scheduled_timestamp(10); // Schedule for 10 seconds from now
550        let result = producer
551            .produce_transaction_request_job(request, Some(scheduled_timestamp))
552            .await;
553        assert!(result.is_ok());
554
555        let queue = producer.get_queue().await;
556        assert!(queue.transaction_request_queue.schedule_called);
557    }
558
559    #[tokio::test]
560    async fn test_submit_transaction_job() {
561        let producer = TestJobProducer::new();
562
563        // Test submit transaction job
564        let submit_job = TransactionSend::submit("tx123", "relayer-1");
565        let result = producer
566            .produce_submit_transaction_job(submit_job, None)
567            .await;
568        assert!(result.is_ok());
569
570        let queue = producer.get_queue().await;
571        assert!(queue.transaction_submission_queue.push_called);
572    }
573
574    #[tokio::test]
575    async fn test_check_status_job() {
576        use crate::models::NetworkType;
577        let producer = TestJobProducer::new();
578
579        // Test status check job for EVM
580        let status_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
581        let result = producer
582            .produce_check_transaction_status_job(status_job, None)
583            .await;
584        assert!(result.is_ok());
585
586        let queue = producer.get_queue().await;
587        assert!(queue.transaction_status_queue_evm.push_called);
588    }
589
590    #[tokio::test]
591    async fn test_notification_job() {
592        let producer = TestJobProducer::new();
593
594        // Create a simple notification for testing
595        let notification = WebhookNotification::new(
596            "test_event".to_string(),
597            WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
598                EvmTransactionResponse {
599                    id: "tx123".to_string(),
600                    hash: Some("0x123".to_string()),
601                    status: TransactionStatus::Confirmed,
602                    status_reason: None,
603                    created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
604                    sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
605                    confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
606                    gas_price: Some(1000000000),
607                    gas_limit: Some(21000),
608                    nonce: Some(1),
609                    value: U256::from(1000000000000000000_u64),
610                    from: "0xabc".to_string(),
611                    to: Some("0xdef".to_string()),
612                    relayer_id: "relayer-1".to_string(),
613                    data: None,
614                    max_fee_per_gas: None,
615                    max_priority_fee_per_gas: None,
616                    signature: None,
617                    speed: None,
618                },
619            ))),
620        );
621        let job = NotificationSend::new("notification-1".to_string(), notification);
622
623        let result = producer.produce_send_notification_job(job, None).await;
624        assert!(result.is_ok());
625
626        let queue = producer.get_queue().await;
627        assert!(queue.notification_queue.push_called);
628    }
629
630    #[tokio::test]
631    async fn test_relayer_health_check_job() {
632        let producer = TestJobProducer::new();
633
634        // Test immediate health check job
635        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
636        let result = producer
637            .produce_relayer_health_check_job(health_check, None)
638            .await;
639        assert!(result.is_ok());
640
641        let queue = producer.get_queue().await;
642        assert!(queue.relayer_health_check_queue.push_called);
643
644        // Test scheduled health check job
645        let producer = TestJobProducer::new();
646        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
647        let scheduled_timestamp = calculate_scheduled_timestamp(60);
648        let result = producer
649            .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
650            .await;
651        assert!(result.is_ok());
652
653        let queue = producer.get_queue().await;
654        assert!(queue.relayer_health_check_queue.schedule_called);
655    }
656
657    #[test]
658    fn test_job_producer_error_conversion() {
659        // Test error conversion without using specific Redis error types
660        let job_error = JobProducerError::QueueError("Test error".to_string());
661        let relayer_error: RelayerError = job_error.into();
662
663        match relayer_error {
664            RelayerError::QueueError(msg) => {
665                assert_eq!(msg, "Queue error");
666            }
667            _ => panic!("Unexpected error type"),
668        }
669    }
670
671    #[tokio::test]
672    async fn test_get_queue() {
673        let producer = TestJobProducer::new();
674
675        // Get the queue
676        let queue = producer.get_queue().await;
677
678        // Verify the queue is valid and has the expected structure
679        assert!(!queue.transaction_request_queue.push_called);
680        assert!(!queue.transaction_request_queue.schedule_called);
681        assert!(!queue.transaction_submission_queue.push_called);
682        assert!(!queue.notification_queue.push_called);
683        assert!(!queue.token_swap_request_queue.push_called);
684        assert!(!queue.relayer_health_check_queue.push_called);
685    }
686
687    #[tokio::test]
688    async fn test_produce_relayer_health_check_job_immediate() {
689        let producer = TestJobProducer::new();
690
691        // Test immediate health check job (no scheduling)
692        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
693        let result = producer
694            .produce_relayer_health_check_job(health_check, None)
695            .await;
696
697        // Should succeed
698        assert!(result.is_ok());
699
700        // Verify the job was pushed (not scheduled)
701        let queue = producer.get_queue().await;
702        assert!(queue.relayer_health_check_queue.push_called);
703        assert!(!queue.relayer_health_check_queue.schedule_called);
704
705        // Other queues should not be affected
706        assert!(!queue.transaction_request_queue.push_called);
707        assert!(!queue.transaction_submission_queue.push_called);
708        assert!(!queue.transaction_status_queue.push_called);
709        assert!(!queue.notification_queue.push_called);
710        assert!(!queue.token_swap_request_queue.push_called);
711    }
712
713    #[tokio::test]
714    async fn test_produce_relayer_health_check_job_scheduled() {
715        let producer = TestJobProducer::new();
716
717        // Test scheduled health check job
718        let health_check = RelayerHealthCheck::new("relayer-2".to_string());
719        let scheduled_timestamp = calculate_scheduled_timestamp(300); // 5 minutes from now
720        let result = producer
721            .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
722            .await;
723
724        // Should succeed
725        assert!(result.is_ok());
726
727        // Verify the job was scheduled (not pushed)
728        let queue = producer.get_queue().await;
729        assert!(queue.relayer_health_check_queue.schedule_called);
730        assert!(!queue.relayer_health_check_queue.push_called);
731
732        // Other queues should not be affected
733        assert!(!queue.transaction_request_queue.push_called);
734        assert!(!queue.transaction_submission_queue.push_called);
735        assert!(!queue.transaction_status_queue.push_called);
736        assert!(!queue.notification_queue.push_called);
737        assert!(!queue.token_swap_request_queue.push_called);
738    }
739
740    #[tokio::test]
741    async fn test_produce_relayer_health_check_job_multiple_relayers() {
742        let producer = TestJobProducer::new();
743
744        // Produce health check jobs for multiple relayers
745        let relayer_ids = vec!["relayer-1", "relayer-2", "relayer-3"];
746
747        for relayer_id in &relayer_ids {
748            let health_check = RelayerHealthCheck::new(relayer_id.to_string());
749            let result = producer
750                .produce_relayer_health_check_job(health_check, None)
751                .await;
752            assert!(result.is_ok());
753        }
754
755        // Verify jobs were produced
756        let queue = producer.get_queue().await;
757        assert!(queue.relayer_health_check_queue.push_called);
758    }
759
760    #[tokio::test]
761    async fn test_status_check_routes_to_evm_queue() {
762        use crate::models::NetworkType;
763        let producer = TestJobProducer::new();
764
765        let status_job = TransactionStatusCheck::new("tx-evm", "relayer-1", NetworkType::Evm);
766        let result = producer
767            .produce_check_transaction_status_job(status_job, None)
768            .await;
769
770        assert!(result.is_ok());
771        let queue = producer.get_queue().await;
772        assert!(queue.transaction_status_queue_evm.push_called);
773        assert!(!queue.transaction_status_queue_stellar.push_called);
774        assert!(!queue.transaction_status_queue.push_called);
775    }
776
777    #[tokio::test]
778    async fn test_status_check_routes_to_stellar_queue() {
779        use crate::models::NetworkType;
780        let producer = TestJobProducer::new();
781
782        let status_job =
783            TransactionStatusCheck::new("tx-stellar", "relayer-2", NetworkType::Stellar);
784        let result = producer
785            .produce_check_transaction_status_job(status_job, None)
786            .await;
787
788        assert!(result.is_ok());
789        let queue = producer.get_queue().await;
790        assert!(queue.transaction_status_queue_stellar.push_called);
791        assert!(!queue.transaction_status_queue_evm.push_called);
792        assert!(!queue.transaction_status_queue.push_called);
793    }
794
795    #[tokio::test]
796    async fn test_status_check_routes_to_default_queue_for_solana() {
797        use crate::models::NetworkType;
798        let producer = TestJobProducer::new();
799
800        let status_job = TransactionStatusCheck::new("tx-solana", "relayer-3", NetworkType::Solana);
801        let result = producer
802            .produce_check_transaction_status_job(status_job, None)
803            .await;
804
805        assert!(result.is_ok());
806        let queue = producer.get_queue().await;
807        assert!(queue.transaction_status_queue.push_called);
808        assert!(!queue.transaction_status_queue_evm.push_called);
809        assert!(!queue.transaction_status_queue_stellar.push_called);
810    }
811
812    #[tokio::test]
813    async fn test_status_check_scheduled_evm() {
814        use crate::models::NetworkType;
815        let producer = TestJobProducer::new();
816
817        let status_job =
818            TransactionStatusCheck::new("tx-evm-scheduled", "relayer-1", NetworkType::Evm);
819        let scheduled_timestamp = calculate_scheduled_timestamp(30);
820        let result = producer
821            .produce_check_transaction_status_job(status_job, Some(scheduled_timestamp))
822            .await;
823
824        assert!(result.is_ok());
825        let queue = producer.get_queue().await;
826        assert!(queue.transaction_status_queue_evm.schedule_called);
827        assert!(!queue.transaction_status_queue_evm.push_called);
828    }
829
830    #[tokio::test]
831    async fn test_submit_transaction_scheduled() {
832        let producer = TestJobProducer::new();
833
834        let submit_job = TransactionSend::submit("tx-scheduled", "relayer-1");
835        let scheduled_timestamp = calculate_scheduled_timestamp(15);
836        let result = producer
837            .produce_submit_transaction_job(submit_job, Some(scheduled_timestamp))
838            .await;
839
840        assert!(result.is_ok());
841        let queue = producer.get_queue().await;
842        assert!(queue.transaction_submission_queue.schedule_called);
843        assert!(!queue.transaction_submission_queue.push_called);
844    }
845
846    #[tokio::test]
847    async fn test_notification_job_scheduled() {
848        let producer = TestJobProducer::new();
849
850        let notification = WebhookNotification::new(
851            "test_scheduled_event".to_string(),
852            WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
853                EvmTransactionResponse {
854                    id: "tx-notify-scheduled".to_string(),
855                    hash: Some("0xabc123".to_string()),
856                    status: TransactionStatus::Confirmed,
857                    status_reason: None,
858                    created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
859                    sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
860                    confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
861                    gas_price: Some(1000000000),
862                    gas_limit: Some(21000),
863                    nonce: Some(1),
864                    value: U256::from(1000000000000000000_u64),
865                    from: "0xabc".to_string(),
866                    to: Some("0xdef".to_string()),
867                    relayer_id: "relayer-1".to_string(),
868                    data: None,
869                    max_fee_per_gas: None,
870                    max_priority_fee_per_gas: None,
871                    signature: None,
872                    speed: None,
873                },
874            ))),
875        );
876        let job = NotificationSend::new("notification-scheduled".to_string(), notification);
877
878        let scheduled_timestamp = calculate_scheduled_timestamp(5);
879        let result = producer
880            .produce_send_notification_job(job, Some(scheduled_timestamp))
881            .await;
882
883        assert!(result.is_ok());
884        let queue = producer.get_queue().await;
885        assert!(queue.notification_queue.schedule_called);
886        assert!(!queue.notification_queue.push_called);
887    }
888
889    #[tokio::test]
890    async fn test_solana_swap_job_immediate() {
891        let producer = TestJobProducer::new();
892
893        let swap_job = TokenSwapRequest::new("relayer-solana".to_string());
894        let result = producer
895            .produce_token_swap_request_job(swap_job, None)
896            .await;
897
898        assert!(result.is_ok());
899        let queue = producer.get_queue().await;
900        assert!(queue.token_swap_request_queue.push_called);
901        assert!(!queue.token_swap_request_queue.schedule_called);
902    }
903
904    #[tokio::test]
905    async fn test_token_swap_job_scheduled() {
906        let producer = TestJobProducer::new();
907
908        let swap_job = TokenSwapRequest::new("relayer-solana".to_string());
909        let scheduled_timestamp = calculate_scheduled_timestamp(20);
910        let result = producer
911            .produce_token_swap_request_job(swap_job, Some(scheduled_timestamp))
912            .await;
913
914        assert!(result.is_ok());
915        let queue = producer.get_queue().await;
916        assert!(queue.token_swap_request_queue.schedule_called);
917        assert!(!queue.token_swap_request_queue.push_called);
918    }
919
920    #[tokio::test]
921    async fn test_transaction_send_cancel_job() {
922        let producer = TestJobProducer::new();
923
924        let cancel_job = TransactionSend::cancel("tx-cancel", "relayer-1", "user requested");
925        let result = producer
926            .produce_submit_transaction_job(cancel_job, None)
927            .await;
928
929        assert!(result.is_ok());
930        let queue = producer.get_queue().await;
931        assert!(queue.transaction_submission_queue.push_called);
932    }
933
934    #[tokio::test]
935    async fn test_transaction_send_resubmit_job() {
936        let producer = TestJobProducer::new();
937
938        let resubmit_job = TransactionSend::resubmit("tx-resubmit", "relayer-1");
939        let result = producer
940            .produce_submit_transaction_job(resubmit_job, None)
941            .await;
942
943        assert!(result.is_ok());
944        let queue = producer.get_queue().await;
945        assert!(queue.transaction_submission_queue.push_called);
946    }
947
948    #[tokio::test]
949    async fn test_transaction_send_resend_job() {
950        let producer = TestJobProducer::new();
951
952        let resend_job = TransactionSend::resend("tx-resend", "relayer-1");
953        let result = producer
954            .produce_submit_transaction_job(resend_job, None)
955            .await;
956
957        assert!(result.is_ok());
958        let queue = producer.get_queue().await;
959        assert!(queue.transaction_submission_queue.push_called);
960    }
961
962    #[tokio::test]
963    async fn test_multiple_jobs_different_queues() {
964        let producer = TestJobProducer::new();
965
966        // Produce different types of jobs
967        let request = TransactionRequest::new("tx1", "relayer-1");
968        producer
969            .produce_transaction_request_job(request, None)
970            .await
971            .unwrap();
972
973        let submit = TransactionSend::submit("tx2", "relayer-1");
974        producer
975            .produce_submit_transaction_job(submit, None)
976            .await
977            .unwrap();
978
979        use crate::models::NetworkType;
980        let status = TransactionStatusCheck::new("tx3", "relayer-1", NetworkType::Evm);
981        producer
982            .produce_check_transaction_status_job(status, None)
983            .await
984            .unwrap();
985
986        // Verify all queues were used
987        let queue = producer.get_queue().await;
988        assert!(queue.transaction_request_queue.push_called);
989        assert!(queue.transaction_submission_queue.push_called);
990        assert!(queue.transaction_status_queue_evm.push_called);
991    }
992
993    #[test]
994    fn test_job_producer_clone() {
995        let producer = TestJobProducer::new();
996        let cloned_producer = producer.clone();
997
998        // Both should be valid instances
999        // The clone creates a new Mutex with a cloned Queue
1000        assert!(std::ptr::addr_of!(producer) != std::ptr::addr_of!(cloned_producer));
1001    }
1002
1003    #[tokio::test]
1004    async fn test_transaction_request_with_metadata() {
1005        let producer = TestJobProducer::new();
1006
1007        let mut metadata = std::collections::HashMap::new();
1008        metadata.insert("retry_count".to_string(), "3".to_string());
1009
1010        let request = TransactionRequest::new("tx-meta", "relayer-1").with_metadata(metadata);
1011
1012        let result = producer
1013            .produce_transaction_request_job(request, None)
1014            .await;
1015
1016        assert!(result.is_ok());
1017        let queue = producer.get_queue().await;
1018        assert!(queue.transaction_request_queue.push_called);
1019    }
1020
1021    #[tokio::test]
1022    async fn test_status_check_with_metadata() {
1023        use crate::models::NetworkType;
1024        let producer = TestJobProducer::new();
1025
1026        let mut metadata = std::collections::HashMap::new();
1027        metadata.insert("attempt".to_string(), "2".to_string());
1028
1029        let status =
1030            TransactionStatusCheck::new("tx-status-meta", "relayer-1", NetworkType::Stellar)
1031                .with_metadata(metadata);
1032
1033        let result = producer
1034            .produce_check_transaction_status_job(status, None)
1035            .await;
1036
1037        assert!(result.is_ok());
1038        let queue = producer.get_queue().await;
1039        assert!(queue.transaction_status_queue_stellar.push_called);
1040    }
1041
1042    #[tokio::test]
1043    async fn test_scheduled_jobs_with_different_delays() {
1044        let producer = TestJobProducer::new();
1045
1046        // Test with various scheduling delays
1047        let delays = vec![1, 10, 60, 300, 3600]; // 1s, 10s, 1m, 5m, 1h
1048
1049        for (idx, delay) in delays.iter().enumerate() {
1050            let request = TransactionRequest::new(format!("tx-delay-{}", idx), "relayer-1");
1051            let timestamp = calculate_scheduled_timestamp(*delay);
1052
1053            let result = producer
1054                .produce_transaction_request_job(request, Some(timestamp))
1055                .await;
1056
1057            assert!(
1058                result.is_ok(),
1059                "Failed to schedule job with delay {}",
1060                delay
1061            );
1062        }
1063    }
1064
1065    #[test]
1066    fn test_job_producer_error_display() {
1067        let error = JobProducerError::QueueError("Test queue error".to_string());
1068        let error_string = error.to_string();
1069
1070        assert!(error_string.contains("Queue error"));
1071        assert!(error_string.contains("Test queue error"));
1072    }
1073
1074    #[test]
1075    fn test_job_producer_error_to_relayer_error() {
1076        let job_error = JobProducerError::QueueError("Connection failed".to_string());
1077        let relayer_error: RelayerError = job_error.into();
1078
1079        match relayer_error {
1080            RelayerError::QueueError(msg) => {
1081                assert_eq!(msg, "Queue error");
1082            }
1083            _ => panic!("Expected QueueError variant"),
1084        }
1085    }
1086}