1use crate::{
10 jobs::{
11 Job, NotificationSend, Queue, RelayerHealthCheck, TransactionRequest, TransactionSend,
12 TransactionStatusCheck,
13 },
14 models::RelayerError,
15 observability::request_id::get_request_id,
16};
17use apalis::prelude::Storage;
18use apalis_redis::RedisError;
19use async_trait::async_trait;
20use serde::Serialize;
21use thiserror::Error;
22use tokio::sync::Mutex;
23use tracing::{debug, error};
24
25use super::{JobType, TokenSwapRequest};
26
27#[cfg(test)]
28use mockall::automock;
29
30#[derive(Debug, Error, Serialize, Clone)]
31pub enum JobProducerError {
32 #[error("Queue error: {0}")]
33 QueueError(String),
34}
35
36impl From<RedisError> for JobProducerError {
37 fn from(_: RedisError) -> Self {
38 JobProducerError::QueueError("Queue error".to_string())
39 }
40}
41
42impl From<JobProducerError> for RelayerError {
43 fn from(_: JobProducerError) -> Self {
44 RelayerError::QueueError("Queue error".to_string())
45 }
46}
47
48#[derive(Debug)]
49pub struct JobProducer {
50 queue: Mutex<Queue>,
51}
52
53impl Clone for JobProducer {
54 fn clone(&self) -> Self {
55 let queue = self
58 .queue
59 .try_lock()
60 .expect("Failed to lock queue for cloning")
61 .clone();
62
63 Self {
64 queue: Mutex::new(queue),
65 }
66 }
67}
68
69#[async_trait]
70#[cfg_attr(test, automock)]
71pub trait JobProducerTrait: Send + Sync {
72 async fn produce_transaction_request_job(
73 &self,
74 transaction_process_job: TransactionRequest,
75 scheduled_on: Option<i64>,
76 ) -> Result<(), JobProducerError>;
77
78 async fn produce_submit_transaction_job(
79 &self,
80 transaction_submit_job: TransactionSend,
81 scheduled_on: Option<i64>,
82 ) -> Result<(), JobProducerError>;
83
84 async fn produce_check_transaction_status_job(
85 &self,
86 transaction_status_check_job: TransactionStatusCheck,
87 scheduled_on: Option<i64>,
88 ) -> Result<(), JobProducerError>;
89
90 async fn produce_send_notification_job(
91 &self,
92 notification_send_job: NotificationSend,
93 scheduled_on: Option<i64>,
94 ) -> Result<(), JobProducerError>;
95
96 async fn produce_token_swap_request_job(
97 &self,
98 swap_request_job: TokenSwapRequest,
99 scheduled_on: Option<i64>,
100 ) -> Result<(), JobProducerError>;
101
102 async fn produce_relayer_health_check_job(
103 &self,
104 relayer_health_check_job: RelayerHealthCheck,
105 scheduled_on: Option<i64>,
106 ) -> Result<(), JobProducerError>;
107
108 async fn get_queue(&self) -> Result<Queue, JobProducerError>;
109}
110
111impl JobProducer {
112 pub fn new(queue: Queue) -> Self {
113 Self {
114 queue: Mutex::new(queue.clone()),
115 }
116 }
117
118 pub async fn get_queue(&self) -> Result<Queue, JobProducerError> {
119 let queue = self.queue.lock().await;
120
121 Ok(queue.clone())
122 }
123}
124
125#[async_trait]
126impl JobProducerTrait for JobProducer {
127 async fn get_queue(&self) -> Result<Queue, JobProducerError> {
128 let queue = self.queue.lock().await;
129
130 Ok(queue.clone())
131 }
132
133 async fn produce_transaction_request_job(
134 &self,
135 transaction_process_job: TransactionRequest,
136 scheduled_on: Option<i64>,
137 ) -> Result<(), JobProducerError> {
138 debug!(
139 "Producing transaction request job: {:?}",
140 transaction_process_job
141 );
142 let mut queue = self.queue.lock().await;
143 let job = Job::new(JobType::TransactionRequest, transaction_process_job)
144 .with_request_id(get_request_id());
145
146 match scheduled_on {
147 Some(scheduled_on) => {
148 queue
149 .transaction_request_queue
150 .schedule(job, scheduled_on)
151 .await?;
152 }
153 None => {
154 queue.transaction_request_queue.push(job).await?;
155 }
156 }
157 debug!("Transaction job produced successfully");
158
159 Ok(())
160 }
161
162 async fn produce_submit_transaction_job(
163 &self,
164 transaction_submit_job: TransactionSend,
165 scheduled_on: Option<i64>,
166 ) -> Result<(), JobProducerError> {
167 let mut queue = self.queue.lock().await;
168 let job = Job::new(JobType::TransactionSend, transaction_submit_job)
169 .with_request_id(get_request_id());
170
171 match scheduled_on {
172 Some(on) => {
173 queue.transaction_submission_queue.schedule(job, on).await?;
174 }
175 None => {
176 queue.transaction_submission_queue.push(job).await?;
177 }
178 }
179 debug!("Transaction Submit job produced successfully");
180
181 Ok(())
182 }
183
184 async fn produce_check_transaction_status_job(
185 &self,
186 transaction_status_check_job: TransactionStatusCheck,
187 scheduled_on: Option<i64>,
188 ) -> Result<(), JobProducerError> {
189 let mut queue = self.queue.lock().await;
190 let job = Job::new(
191 JobType::TransactionStatusCheck,
192 transaction_status_check_job.clone(),
193 )
194 .with_request_id(get_request_id());
195
196 use crate::models::NetworkType;
198 let status_queue = match transaction_status_check_job.network_type {
199 Some(NetworkType::Evm) => &mut queue.transaction_status_queue_evm,
200 Some(NetworkType::Stellar) => &mut queue.transaction_status_queue_stellar,
201 _ => &mut queue.transaction_status_queue, };
203
204 match scheduled_on {
205 Some(on) => {
206 status_queue.schedule(job, on).await?;
207 }
208 None => {
209 status_queue.push(job).await?;
210 }
211 }
212 debug!(
213 network_type = ?transaction_status_check_job.network_type,
214 "Transaction Status Check job produced successfully"
215 );
216 Ok(())
217 }
218
219 async fn produce_send_notification_job(
220 &self,
221 notification_send_job: NotificationSend,
222 scheduled_on: Option<i64>,
223 ) -> Result<(), JobProducerError> {
224 let mut queue = self.queue.lock().await;
225 let job = Job::new(JobType::NotificationSend, notification_send_job)
226 .with_request_id(get_request_id());
227
228 match scheduled_on {
229 Some(on) => {
230 queue.notification_queue.schedule(job, on).await?;
231 }
232 None => {
233 queue.notification_queue.push(job).await?;
234 }
235 }
236
237 debug!("Notification Send job produced successfully");
238 Ok(())
239 }
240
241 async fn produce_token_swap_request_job(
242 &self,
243 swap_request_job: TokenSwapRequest,
244 scheduled_on: Option<i64>,
245 ) -> Result<(), JobProducerError> {
246 let mut queue = self.queue.lock().await;
247 let job =
248 Job::new(JobType::TokenSwapRequest, swap_request_job).with_request_id(get_request_id());
249
250 match scheduled_on {
251 Some(on) => {
252 queue.token_swap_request_queue.schedule(job, on).await?;
253 }
254 None => {
255 queue.token_swap_request_queue.push(job).await?;
256 }
257 }
258
259 debug!("Token swap job produced successfully");
260 Ok(())
261 }
262
263 async fn produce_relayer_health_check_job(
264 &self,
265 relayer_health_check_job: RelayerHealthCheck,
266 scheduled_on: Option<i64>,
267 ) -> Result<(), JobProducerError> {
268 let job = Job::new(
269 JobType::RelayerHealthCheck,
270 relayer_health_check_job.clone(),
271 )
272 .with_request_id(get_request_id());
273
274 let mut queue = self.queue.lock().await;
275
276 match scheduled_on {
277 Some(scheduled_on) => {
278 queue
279 .relayer_health_check_queue
280 .schedule(job, scheduled_on)
281 .await?;
282 }
283 None => {
284 queue.relayer_health_check_queue.push(job).await?;
285 }
286 }
287
288 Ok(())
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295 use crate::models::{
296 EvmTransactionResponse, TransactionResponse, TransactionStatus, WebhookNotification,
297 WebhookPayload, U256,
298 };
299 use crate::utils::calculate_scheduled_timestamp;
300
301 #[derive(Clone, Debug)]
302 struct TestRedisStorage<T> {
304 pub push_called: bool,
305 pub schedule_called: bool,
306 _phantom: std::marker::PhantomData<T>,
307 }
308
309 impl<T> TestRedisStorage<T> {
310 fn new() -> Self {
311 Self {
312 push_called: false,
313 schedule_called: false,
314 _phantom: std::marker::PhantomData,
315 }
316 }
317
318 async fn push(&mut self, _job: T) -> Result<(), JobProducerError> {
319 self.push_called = true;
320 Ok(())
321 }
322
323 async fn schedule(&mut self, _job: T, _timestamp: i64) -> Result<(), JobProducerError> {
324 self.schedule_called = true;
325 Ok(())
326 }
327 }
328
329 #[derive(Clone, Debug)]
331 struct TestQueue {
332 pub transaction_request_queue: TestRedisStorage<Job<TransactionRequest>>,
333 pub transaction_submission_queue: TestRedisStorage<Job<TransactionSend>>,
334 pub transaction_status_queue: TestRedisStorage<Job<TransactionStatusCheck>>,
335 pub transaction_status_queue_evm: TestRedisStorage<Job<TransactionStatusCheck>>,
336 pub transaction_status_queue_stellar: TestRedisStorage<Job<TransactionStatusCheck>>,
337 pub notification_queue: TestRedisStorage<Job<NotificationSend>>,
338 pub token_swap_request_queue: TestRedisStorage<Job<TokenSwapRequest>>,
339 pub relayer_health_check_queue: TestRedisStorage<Job<RelayerHealthCheck>>,
340 }
341
342 impl TestQueue {
343 fn new() -> Self {
344 Self {
345 transaction_request_queue: TestRedisStorage::new(),
346 transaction_submission_queue: TestRedisStorage::new(),
347 transaction_status_queue: TestRedisStorage::new(),
348 transaction_status_queue_evm: TestRedisStorage::new(),
349 transaction_status_queue_stellar: TestRedisStorage::new(),
350 notification_queue: TestRedisStorage::new(),
351 token_swap_request_queue: TestRedisStorage::new(),
352 relayer_health_check_queue: TestRedisStorage::new(),
353 }
354 }
355 }
356
357 struct TestJobProducer {
359 queue: Mutex<TestQueue>,
360 }
361
362 impl Clone for TestJobProducer {
363 fn clone(&self) -> Self {
364 let queue = self
365 .queue
366 .try_lock()
367 .expect("Failed to lock queue for cloning")
368 .clone();
369 Self {
370 queue: Mutex::new(queue),
371 }
372 }
373 }
374
375 impl TestJobProducer {
376 fn new() -> Self {
377 Self {
378 queue: Mutex::new(TestQueue::new()),
379 }
380 }
381
382 async fn get_queue(&self) -> TestQueue {
383 self.queue.lock().await.clone()
384 }
385 }
386
387 #[async_trait]
388 impl JobProducerTrait for TestJobProducer {
389 async fn get_queue(&self) -> Result<Queue, JobProducerError> {
390 unimplemented!("get_queue not used in tests")
391 }
392
393 async fn produce_transaction_request_job(
394 &self,
395 transaction_process_job: TransactionRequest,
396 scheduled_on: Option<i64>,
397 ) -> Result<(), JobProducerError> {
398 let mut queue = self.queue.lock().await;
399 let job = Job::new(JobType::TransactionRequest, transaction_process_job);
400
401 match scheduled_on {
402 Some(scheduled_on) => {
403 queue
404 .transaction_request_queue
405 .schedule(job, scheduled_on)
406 .await?;
407 }
408 None => {
409 queue.transaction_request_queue.push(job).await?;
410 }
411 }
412
413 Ok(())
414 }
415
416 async fn produce_submit_transaction_job(
417 &self,
418 transaction_submit_job: TransactionSend,
419 scheduled_on: Option<i64>,
420 ) -> Result<(), JobProducerError> {
421 let mut queue = self.queue.lock().await;
422 let job = Job::new(JobType::TransactionSend, transaction_submit_job);
423
424 match scheduled_on {
425 Some(on) => {
426 queue.transaction_submission_queue.schedule(job, on).await?;
427 }
428 None => {
429 queue.transaction_submission_queue.push(job).await?;
430 }
431 }
432
433 Ok(())
434 }
435
436 async fn produce_check_transaction_status_job(
437 &self,
438 transaction_status_check_job: TransactionStatusCheck,
439 scheduled_on: Option<i64>,
440 ) -> Result<(), JobProducerError> {
441 let mut queue = self.queue.lock().await;
442 let job = Job::new(
443 JobType::TransactionStatusCheck,
444 transaction_status_check_job.clone(),
445 );
446
447 use crate::models::NetworkType;
449 let status_queue = match transaction_status_check_job.network_type {
450 Some(NetworkType::Evm) => &mut queue.transaction_status_queue_evm,
451 Some(NetworkType::Stellar) => &mut queue.transaction_status_queue_stellar,
452 Some(NetworkType::Solana) => &mut queue.transaction_status_queue, None => &mut queue.transaction_status_queue, };
455
456 match scheduled_on {
457 Some(on) => {
458 status_queue.schedule(job, on).await?;
459 }
460 None => {
461 status_queue.push(job).await?;
462 }
463 }
464
465 Ok(())
466 }
467
468 async fn produce_send_notification_job(
469 &self,
470 notification_send_job: NotificationSend,
471 scheduled_on: Option<i64>,
472 ) -> Result<(), JobProducerError> {
473 let mut queue = self.queue.lock().await;
474 let job = Job::new(JobType::NotificationSend, notification_send_job);
475
476 match scheduled_on {
477 Some(on) => {
478 queue.notification_queue.schedule(job, on).await?;
479 }
480 None => {
481 queue.notification_queue.push(job).await?;
482 }
483 }
484
485 Ok(())
486 }
487
488 async fn produce_token_swap_request_job(
489 &self,
490 swap_request_job: TokenSwapRequest,
491 scheduled_on: Option<i64>,
492 ) -> Result<(), JobProducerError> {
493 let mut queue = self.queue.lock().await;
494 let job = Job::new(JobType::TokenSwapRequest, swap_request_job);
495
496 match scheduled_on {
497 Some(on) => {
498 queue.token_swap_request_queue.schedule(job, on).await?;
499 }
500 None => {
501 queue.token_swap_request_queue.push(job).await?;
502 }
503 }
504
505 Ok(())
506 }
507
508 async fn produce_relayer_health_check_job(
509 &self,
510 relayer_health_check_job: RelayerHealthCheck,
511 scheduled_on: Option<i64>,
512 ) -> Result<(), JobProducerError> {
513 let mut queue = self.queue.lock().await;
514 let job = Job::new(JobType::RelayerHealthCheck, relayer_health_check_job);
515
516 match scheduled_on {
517 Some(scheduled_on) => {
518 queue
519 .relayer_health_check_queue
520 .schedule(job, scheduled_on)
521 .await?;
522 }
523 None => {
524 queue.relayer_health_check_queue.push(job).await?;
525 }
526 }
527
528 Ok(())
529 }
530 }
531
532 #[tokio::test]
533 async fn test_job_producer_operations() {
534 let producer = TestJobProducer::new();
535
536 let request = TransactionRequest::new("tx123", "relayer-1");
538 let result = producer
539 .produce_transaction_request_job(request, None)
540 .await;
541 assert!(result.is_ok());
542
543 let queue = producer.get_queue().await;
544 assert!(queue.transaction_request_queue.push_called);
545
546 let producer = TestJobProducer::new();
548 let request = TransactionRequest::new("tx123", "relayer-1");
549 let scheduled_timestamp = calculate_scheduled_timestamp(10); let result = producer
551 .produce_transaction_request_job(request, Some(scheduled_timestamp))
552 .await;
553 assert!(result.is_ok());
554
555 let queue = producer.get_queue().await;
556 assert!(queue.transaction_request_queue.schedule_called);
557 }
558
559 #[tokio::test]
560 async fn test_submit_transaction_job() {
561 let producer = TestJobProducer::new();
562
563 let submit_job = TransactionSend::submit("tx123", "relayer-1");
565 let result = producer
566 .produce_submit_transaction_job(submit_job, None)
567 .await;
568 assert!(result.is_ok());
569
570 let queue = producer.get_queue().await;
571 assert!(queue.transaction_submission_queue.push_called);
572 }
573
574 #[tokio::test]
575 async fn test_check_status_job() {
576 use crate::models::NetworkType;
577 let producer = TestJobProducer::new();
578
579 let status_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
581 let result = producer
582 .produce_check_transaction_status_job(status_job, None)
583 .await;
584 assert!(result.is_ok());
585
586 let queue = producer.get_queue().await;
587 assert!(queue.transaction_status_queue_evm.push_called);
588 }
589
590 #[tokio::test]
591 async fn test_notification_job() {
592 let producer = TestJobProducer::new();
593
594 let notification = WebhookNotification::new(
596 "test_event".to_string(),
597 WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
598 EvmTransactionResponse {
599 id: "tx123".to_string(),
600 hash: Some("0x123".to_string()),
601 status: TransactionStatus::Confirmed,
602 status_reason: None,
603 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
604 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
605 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
606 gas_price: Some(1000000000),
607 gas_limit: Some(21000),
608 nonce: Some(1),
609 value: U256::from(1000000000000000000_u64),
610 from: "0xabc".to_string(),
611 to: Some("0xdef".to_string()),
612 relayer_id: "relayer-1".to_string(),
613 data: None,
614 max_fee_per_gas: None,
615 max_priority_fee_per_gas: None,
616 signature: None,
617 speed: None,
618 },
619 ))),
620 );
621 let job = NotificationSend::new("notification-1".to_string(), notification);
622
623 let result = producer.produce_send_notification_job(job, None).await;
624 assert!(result.is_ok());
625
626 let queue = producer.get_queue().await;
627 assert!(queue.notification_queue.push_called);
628 }
629
630 #[tokio::test]
631 async fn test_relayer_health_check_job() {
632 let producer = TestJobProducer::new();
633
634 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
636 let result = producer
637 .produce_relayer_health_check_job(health_check, None)
638 .await;
639 assert!(result.is_ok());
640
641 let queue = producer.get_queue().await;
642 assert!(queue.relayer_health_check_queue.push_called);
643
644 let producer = TestJobProducer::new();
646 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
647 let scheduled_timestamp = calculate_scheduled_timestamp(60);
648 let result = producer
649 .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
650 .await;
651 assert!(result.is_ok());
652
653 let queue = producer.get_queue().await;
654 assert!(queue.relayer_health_check_queue.schedule_called);
655 }
656
657 #[test]
658 fn test_job_producer_error_conversion() {
659 let job_error = JobProducerError::QueueError("Test error".to_string());
661 let relayer_error: RelayerError = job_error.into();
662
663 match relayer_error {
664 RelayerError::QueueError(msg) => {
665 assert_eq!(msg, "Queue error");
666 }
667 _ => panic!("Unexpected error type"),
668 }
669 }
670
671 #[tokio::test]
672 async fn test_get_queue() {
673 let producer = TestJobProducer::new();
674
675 let queue = producer.get_queue().await;
677
678 assert!(!queue.transaction_request_queue.push_called);
680 assert!(!queue.transaction_request_queue.schedule_called);
681 assert!(!queue.transaction_submission_queue.push_called);
682 assert!(!queue.notification_queue.push_called);
683 assert!(!queue.token_swap_request_queue.push_called);
684 assert!(!queue.relayer_health_check_queue.push_called);
685 }
686
687 #[tokio::test]
688 async fn test_produce_relayer_health_check_job_immediate() {
689 let producer = TestJobProducer::new();
690
691 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
693 let result = producer
694 .produce_relayer_health_check_job(health_check, None)
695 .await;
696
697 assert!(result.is_ok());
699
700 let queue = producer.get_queue().await;
702 assert!(queue.relayer_health_check_queue.push_called);
703 assert!(!queue.relayer_health_check_queue.schedule_called);
704
705 assert!(!queue.transaction_request_queue.push_called);
707 assert!(!queue.transaction_submission_queue.push_called);
708 assert!(!queue.transaction_status_queue.push_called);
709 assert!(!queue.notification_queue.push_called);
710 assert!(!queue.token_swap_request_queue.push_called);
711 }
712
713 #[tokio::test]
714 async fn test_produce_relayer_health_check_job_scheduled() {
715 let producer = TestJobProducer::new();
716
717 let health_check = RelayerHealthCheck::new("relayer-2".to_string());
719 let scheduled_timestamp = calculate_scheduled_timestamp(300); let result = producer
721 .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
722 .await;
723
724 assert!(result.is_ok());
726
727 let queue = producer.get_queue().await;
729 assert!(queue.relayer_health_check_queue.schedule_called);
730 assert!(!queue.relayer_health_check_queue.push_called);
731
732 assert!(!queue.transaction_request_queue.push_called);
734 assert!(!queue.transaction_submission_queue.push_called);
735 assert!(!queue.transaction_status_queue.push_called);
736 assert!(!queue.notification_queue.push_called);
737 assert!(!queue.token_swap_request_queue.push_called);
738 }
739
740 #[tokio::test]
741 async fn test_produce_relayer_health_check_job_multiple_relayers() {
742 let producer = TestJobProducer::new();
743
744 let relayer_ids = vec!["relayer-1", "relayer-2", "relayer-3"];
746
747 for relayer_id in &relayer_ids {
748 let health_check = RelayerHealthCheck::new(relayer_id.to_string());
749 let result = producer
750 .produce_relayer_health_check_job(health_check, None)
751 .await;
752 assert!(result.is_ok());
753 }
754
755 let queue = producer.get_queue().await;
757 assert!(queue.relayer_health_check_queue.push_called);
758 }
759
760 #[tokio::test]
761 async fn test_status_check_routes_to_evm_queue() {
762 use crate::models::NetworkType;
763 let producer = TestJobProducer::new();
764
765 let status_job = TransactionStatusCheck::new("tx-evm", "relayer-1", NetworkType::Evm);
766 let result = producer
767 .produce_check_transaction_status_job(status_job, None)
768 .await;
769
770 assert!(result.is_ok());
771 let queue = producer.get_queue().await;
772 assert!(queue.transaction_status_queue_evm.push_called);
773 assert!(!queue.transaction_status_queue_stellar.push_called);
774 assert!(!queue.transaction_status_queue.push_called);
775 }
776
777 #[tokio::test]
778 async fn test_status_check_routes_to_stellar_queue() {
779 use crate::models::NetworkType;
780 let producer = TestJobProducer::new();
781
782 let status_job =
783 TransactionStatusCheck::new("tx-stellar", "relayer-2", NetworkType::Stellar);
784 let result = producer
785 .produce_check_transaction_status_job(status_job, None)
786 .await;
787
788 assert!(result.is_ok());
789 let queue = producer.get_queue().await;
790 assert!(queue.transaction_status_queue_stellar.push_called);
791 assert!(!queue.transaction_status_queue_evm.push_called);
792 assert!(!queue.transaction_status_queue.push_called);
793 }
794
795 #[tokio::test]
796 async fn test_status_check_routes_to_default_queue_for_solana() {
797 use crate::models::NetworkType;
798 let producer = TestJobProducer::new();
799
800 let status_job = TransactionStatusCheck::new("tx-solana", "relayer-3", NetworkType::Solana);
801 let result = producer
802 .produce_check_transaction_status_job(status_job, None)
803 .await;
804
805 assert!(result.is_ok());
806 let queue = producer.get_queue().await;
807 assert!(queue.transaction_status_queue.push_called);
808 assert!(!queue.transaction_status_queue_evm.push_called);
809 assert!(!queue.transaction_status_queue_stellar.push_called);
810 }
811
812 #[tokio::test]
813 async fn test_status_check_scheduled_evm() {
814 use crate::models::NetworkType;
815 let producer = TestJobProducer::new();
816
817 let status_job =
818 TransactionStatusCheck::new("tx-evm-scheduled", "relayer-1", NetworkType::Evm);
819 let scheduled_timestamp = calculate_scheduled_timestamp(30);
820 let result = producer
821 .produce_check_transaction_status_job(status_job, Some(scheduled_timestamp))
822 .await;
823
824 assert!(result.is_ok());
825 let queue = producer.get_queue().await;
826 assert!(queue.transaction_status_queue_evm.schedule_called);
827 assert!(!queue.transaction_status_queue_evm.push_called);
828 }
829
830 #[tokio::test]
831 async fn test_submit_transaction_scheduled() {
832 let producer = TestJobProducer::new();
833
834 let submit_job = TransactionSend::submit("tx-scheduled", "relayer-1");
835 let scheduled_timestamp = calculate_scheduled_timestamp(15);
836 let result = producer
837 .produce_submit_transaction_job(submit_job, Some(scheduled_timestamp))
838 .await;
839
840 assert!(result.is_ok());
841 let queue = producer.get_queue().await;
842 assert!(queue.transaction_submission_queue.schedule_called);
843 assert!(!queue.transaction_submission_queue.push_called);
844 }
845
846 #[tokio::test]
847 async fn test_notification_job_scheduled() {
848 let producer = TestJobProducer::new();
849
850 let notification = WebhookNotification::new(
851 "test_scheduled_event".to_string(),
852 WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
853 EvmTransactionResponse {
854 id: "tx-notify-scheduled".to_string(),
855 hash: Some("0xabc123".to_string()),
856 status: TransactionStatus::Confirmed,
857 status_reason: None,
858 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
859 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
860 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
861 gas_price: Some(1000000000),
862 gas_limit: Some(21000),
863 nonce: Some(1),
864 value: U256::from(1000000000000000000_u64),
865 from: "0xabc".to_string(),
866 to: Some("0xdef".to_string()),
867 relayer_id: "relayer-1".to_string(),
868 data: None,
869 max_fee_per_gas: None,
870 max_priority_fee_per_gas: None,
871 signature: None,
872 speed: None,
873 },
874 ))),
875 );
876 let job = NotificationSend::new("notification-scheduled".to_string(), notification);
877
878 let scheduled_timestamp = calculate_scheduled_timestamp(5);
879 let result = producer
880 .produce_send_notification_job(job, Some(scheduled_timestamp))
881 .await;
882
883 assert!(result.is_ok());
884 let queue = producer.get_queue().await;
885 assert!(queue.notification_queue.schedule_called);
886 assert!(!queue.notification_queue.push_called);
887 }
888
889 #[tokio::test]
890 async fn test_solana_swap_job_immediate() {
891 let producer = TestJobProducer::new();
892
893 let swap_job = TokenSwapRequest::new("relayer-solana".to_string());
894 let result = producer
895 .produce_token_swap_request_job(swap_job, None)
896 .await;
897
898 assert!(result.is_ok());
899 let queue = producer.get_queue().await;
900 assert!(queue.token_swap_request_queue.push_called);
901 assert!(!queue.token_swap_request_queue.schedule_called);
902 }
903
904 #[tokio::test]
905 async fn test_token_swap_job_scheduled() {
906 let producer = TestJobProducer::new();
907
908 let swap_job = TokenSwapRequest::new("relayer-solana".to_string());
909 let scheduled_timestamp = calculate_scheduled_timestamp(20);
910 let result = producer
911 .produce_token_swap_request_job(swap_job, Some(scheduled_timestamp))
912 .await;
913
914 assert!(result.is_ok());
915 let queue = producer.get_queue().await;
916 assert!(queue.token_swap_request_queue.schedule_called);
917 assert!(!queue.token_swap_request_queue.push_called);
918 }
919
920 #[tokio::test]
921 async fn test_transaction_send_cancel_job() {
922 let producer = TestJobProducer::new();
923
924 let cancel_job = TransactionSend::cancel("tx-cancel", "relayer-1", "user requested");
925 let result = producer
926 .produce_submit_transaction_job(cancel_job, None)
927 .await;
928
929 assert!(result.is_ok());
930 let queue = producer.get_queue().await;
931 assert!(queue.transaction_submission_queue.push_called);
932 }
933
934 #[tokio::test]
935 async fn test_transaction_send_resubmit_job() {
936 let producer = TestJobProducer::new();
937
938 let resubmit_job = TransactionSend::resubmit("tx-resubmit", "relayer-1");
939 let result = producer
940 .produce_submit_transaction_job(resubmit_job, None)
941 .await;
942
943 assert!(result.is_ok());
944 let queue = producer.get_queue().await;
945 assert!(queue.transaction_submission_queue.push_called);
946 }
947
948 #[tokio::test]
949 async fn test_transaction_send_resend_job() {
950 let producer = TestJobProducer::new();
951
952 let resend_job = TransactionSend::resend("tx-resend", "relayer-1");
953 let result = producer
954 .produce_submit_transaction_job(resend_job, None)
955 .await;
956
957 assert!(result.is_ok());
958 let queue = producer.get_queue().await;
959 assert!(queue.transaction_submission_queue.push_called);
960 }
961
962 #[tokio::test]
963 async fn test_multiple_jobs_different_queues() {
964 let producer = TestJobProducer::new();
965
966 let request = TransactionRequest::new("tx1", "relayer-1");
968 producer
969 .produce_transaction_request_job(request, None)
970 .await
971 .unwrap();
972
973 let submit = TransactionSend::submit("tx2", "relayer-1");
974 producer
975 .produce_submit_transaction_job(submit, None)
976 .await
977 .unwrap();
978
979 use crate::models::NetworkType;
980 let status = TransactionStatusCheck::new("tx3", "relayer-1", NetworkType::Evm);
981 producer
982 .produce_check_transaction_status_job(status, None)
983 .await
984 .unwrap();
985
986 let queue = producer.get_queue().await;
988 assert!(queue.transaction_request_queue.push_called);
989 assert!(queue.transaction_submission_queue.push_called);
990 assert!(queue.transaction_status_queue_evm.push_called);
991 }
992
993 #[test]
994 fn test_job_producer_clone() {
995 let producer = TestJobProducer::new();
996 let cloned_producer = producer.clone();
997
998 assert!(std::ptr::addr_of!(producer) != std::ptr::addr_of!(cloned_producer));
1001 }
1002
1003 #[tokio::test]
1004 async fn test_transaction_request_with_metadata() {
1005 let producer = TestJobProducer::new();
1006
1007 let mut metadata = std::collections::HashMap::new();
1008 metadata.insert("retry_count".to_string(), "3".to_string());
1009
1010 let request = TransactionRequest::new("tx-meta", "relayer-1").with_metadata(metadata);
1011
1012 let result = producer
1013 .produce_transaction_request_job(request, None)
1014 .await;
1015
1016 assert!(result.is_ok());
1017 let queue = producer.get_queue().await;
1018 assert!(queue.transaction_request_queue.push_called);
1019 }
1020
1021 #[tokio::test]
1022 async fn test_status_check_with_metadata() {
1023 use crate::models::NetworkType;
1024 let producer = TestJobProducer::new();
1025
1026 let mut metadata = std::collections::HashMap::new();
1027 metadata.insert("attempt".to_string(), "2".to_string());
1028
1029 let status =
1030 TransactionStatusCheck::new("tx-status-meta", "relayer-1", NetworkType::Stellar)
1031 .with_metadata(metadata);
1032
1033 let result = producer
1034 .produce_check_transaction_status_job(status, None)
1035 .await;
1036
1037 assert!(result.is_ok());
1038 let queue = producer.get_queue().await;
1039 assert!(queue.transaction_status_queue_stellar.push_called);
1040 }
1041
1042 #[tokio::test]
1043 async fn test_scheduled_jobs_with_different_delays() {
1044 let producer = TestJobProducer::new();
1045
1046 let delays = vec![1, 10, 60, 300, 3600]; for (idx, delay) in delays.iter().enumerate() {
1050 let request = TransactionRequest::new(format!("tx-delay-{}", idx), "relayer-1");
1051 let timestamp = calculate_scheduled_timestamp(*delay);
1052
1053 let result = producer
1054 .produce_transaction_request_job(request, Some(timestamp))
1055 .await;
1056
1057 assert!(
1058 result.is_ok(),
1059 "Failed to schedule job with delay {}",
1060 delay
1061 );
1062 }
1063 }
1064
1065 #[test]
1066 fn test_job_producer_error_display() {
1067 let error = JobProducerError::QueueError("Test queue error".to_string());
1068 let error_string = error.to_string();
1069
1070 assert!(error_string.contains("Queue error"));
1071 assert!(error_string.contains("Test queue error"));
1072 }
1073
1074 #[test]
1075 fn test_job_producer_error_to_relayer_error() {
1076 let job_error = JobProducerError::QueueError("Connection failed".to_string());
1077 let relayer_error: RelayerError = job_error.into();
1078
1079 match relayer_error {
1080 RelayerError::QueueError(msg) => {
1081 assert_eq!(msg, "Queue error");
1082 }
1083 _ => panic!("Expected QueueError variant"),
1084 }
1085 }
1086}