1use crate::{
7 constants::DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS,
8 domain::transaction::{stellar::fetch_next_sequence_from_chain, Transaction},
9 jobs::{JobProducer, JobProducerTrait, TransactionRequest},
10 models::{
11 produce_transaction_update_notification_payload, NetworkTransactionRequest,
12 RelayerNetworkPolicy, RelayerRepoModel, TransactionError, TransactionRepoModel,
13 TransactionStatus, TransactionUpdateRequest,
14 },
15 repositories::{
16 RelayerRepositoryStorage, Repository, TransactionCounterRepositoryStorage,
17 TransactionCounterTrait, TransactionRepository, TransactionRepositoryStorage,
18 },
19 services::{
20 provider::{StellarProvider, StellarProviderTrait},
21 signer::{Signer, StellarSigner},
22 stellar_dex::{OrderBookService, StellarDexServiceTrait},
23 },
24 utils::calculate_scheduled_timestamp,
25};
26use async_trait::async_trait;
27use std::sync::Arc;
28use tracing::{error, info};
29
30use super::lane_gate;
31
32#[allow(dead_code)]
33pub struct StellarRelayerTransaction<R, T, J, S, P, C, D>
34where
35 R: Repository<RelayerRepoModel, String>,
36 T: TransactionRepository,
37 J: JobProducerTrait,
38 S: Signer,
39 P: StellarProviderTrait,
40 C: TransactionCounterTrait,
41 D: StellarDexServiceTrait + Send + Sync + 'static,
42{
43 relayer: RelayerRepoModel,
44 relayer_repository: Arc<R>,
45 transaction_repository: Arc<T>,
46 job_producer: Arc<J>,
47 signer: Arc<S>,
48 provider: P,
49 transaction_counter_service: Arc<C>,
50 dex_service: Arc<D>,
51}
52
53#[allow(dead_code)]
54impl<R, T, J, S, P, C, D> StellarRelayerTransaction<R, T, J, S, P, C, D>
55where
56 R: Repository<RelayerRepoModel, String>,
57 T: TransactionRepository,
58 J: JobProducerTrait,
59 S: Signer,
60 P: StellarProviderTrait,
61 C: TransactionCounterTrait,
62 D: StellarDexServiceTrait + Send + Sync + 'static,
63{
64 #[allow(clippy::too_many_arguments)]
81 pub fn new(
82 relayer: RelayerRepoModel,
83 relayer_repository: Arc<R>,
84 transaction_repository: Arc<T>,
85 job_producer: Arc<J>,
86 signer: Arc<S>,
87 provider: P,
88 transaction_counter_service: Arc<C>,
89 dex_service: Arc<D>,
90 ) -> Result<Self, TransactionError> {
91 Ok(Self {
92 relayer,
93 relayer_repository,
94 transaction_repository,
95 job_producer,
96 signer,
97 provider,
98 transaction_counter_service,
99 dex_service,
100 })
101 }
102
103 pub fn provider(&self) -> &P {
104 &self.provider
105 }
106
107 pub fn relayer(&self) -> &RelayerRepoModel {
108 &self.relayer
109 }
110
111 pub fn job_producer(&self) -> &J {
112 &self.job_producer
113 }
114
115 pub fn transaction_repository(&self) -> &T {
116 &self.transaction_repository
117 }
118
119 pub fn signer(&self) -> &S {
120 &self.signer
121 }
122
123 pub fn transaction_counter_service(&self) -> &C {
124 &self.transaction_counter_service
125 }
126
127 pub fn dex_service(&self) -> &D {
128 &self.dex_service
129 }
130
131 pub fn concurrent_transactions_enabled(&self) -> bool {
132 if let RelayerNetworkPolicy::Stellar(policy) = &self.relayer().policies {
133 policy
134 .concurrent_transactions
135 .unwrap_or(DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS)
136 } else {
137 DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS
138 }
139 }
140
141 pub async fn send_transaction_request_job(
143 &self,
144 tx: &TransactionRepoModel,
145 delay_seconds: Option<i64>,
146 ) -> Result<(), TransactionError> {
147 let job = TransactionRequest::new(tx.id.clone(), tx.relayer_id.clone());
148 let scheduled_on = delay_seconds.map(calculate_scheduled_timestamp);
149 self.job_producer()
150 .produce_transaction_request_job(job, scheduled_on)
151 .await?;
152 Ok(())
153 }
154
155 pub(super) async fn send_transaction_update_notification(&self, tx: &TransactionRepoModel) {
160 if let Some(notification_id) = &self.relayer().notification_id {
161 if let Err(e) = self
162 .job_producer()
163 .produce_send_notification_job(
164 produce_transaction_update_notification_payload(notification_id, tx),
165 None,
166 )
167 .await
168 {
169 error!(error = %e, "failed to produce notification job");
170 }
171 }
172 }
173
174 pub async fn finalize_transaction_state(
176 &self,
177 tx_id: String,
178 update_req: TransactionUpdateRequest,
179 ) -> Result<TransactionRepoModel, TransactionError> {
180 let updated_tx = self
181 .transaction_repository()
182 .partial_update(tx_id, update_req)
183 .await?;
184
185 self.send_transaction_update_notification(&updated_tx).await;
186 Ok(updated_tx)
187 }
188
189 pub async fn enqueue_next_pending_transaction(
190 &self,
191 finished_tx_id: &str,
192 ) -> Result<(), TransactionError> {
193 if !self.concurrent_transactions_enabled() {
194 if let Some(next) = self
195 .find_oldest_pending_for_relayer(&self.relayer().id)
196 .await?
197 {
198 info!(to_tx_id = %next.id, finished_tx_id = %finished_tx_id, "handing over lane");
200 lane_gate::pass_to(&self.relayer().id, finished_tx_id, &next.id);
201 self.send_transaction_request_job(&next, None).await?;
202 } else {
203 info!(finished_tx_id = %finished_tx_id, "releasing relayer lane");
204 lane_gate::free(&self.relayer().id, finished_tx_id);
205 }
206 }
207 Ok(())
208 }
209
210 async fn find_oldest_pending_for_relayer(
212 &self,
213 relayer_id: &str,
214 ) -> Result<Option<TransactionRepoModel>, TransactionError> {
215 let pending_txs = self
216 .transaction_repository()
217 .find_by_status(relayer_id, &[TransactionStatus::Pending])
218 .await
219 .map_err(TransactionError::from)?;
220
221 Ok(pending_txs.into_iter().next())
222 }
223
224 pub async fn sync_sequence_from_chain(
227 &self,
228 relayer_address: &str,
229 ) -> Result<(), TransactionError> {
230 info!(address = %relayer_address, "syncing sequence number from chain");
231
232 let next_usable_seq = fetch_next_sequence_from_chain(self.provider(), relayer_address)
234 .await
235 .map_err(TransactionError::UnexpectedError)?;
236
237 self.transaction_counter_service()
239 .set(&self.relayer().id, relayer_address, next_usable_seq)
240 .await
241 .map_err(|e| {
242 TransactionError::UnexpectedError(format!("Failed to update sequence counter: {e}"))
243 })?;
244
245 info!(sequence = %next_usable_seq, "updated local sequence counter");
246 Ok(())
247 }
248
249 pub async fn reset_transaction_for_retry(
252 &self,
253 tx: TransactionRepoModel,
254 ) -> Result<TransactionRepoModel, TransactionError> {
255 info!("resetting transaction for retry through pipeline");
256
257 let update_req = tx.create_reset_update_request()?;
259
260 let reset_tx = self
262 .transaction_repository()
263 .partial_update(tx.id.clone(), update_req)
264 .await?;
265
266 info!("transaction reset successfully to pre-prepare state");
267 Ok(reset_tx)
268 }
269}
270
271#[async_trait]
272impl<R, T, J, S, P, C, D> Transaction for StellarRelayerTransaction<R, T, J, S, P, C, D>
273where
274 R: Repository<RelayerRepoModel, String> + Send + Sync,
275 T: TransactionRepository + Send + Sync,
276 J: JobProducerTrait + Send + Sync,
277 S: Signer + Send + Sync,
278 P: StellarProviderTrait + Send + Sync,
279 C: TransactionCounterTrait + Send + Sync,
280 D: StellarDexServiceTrait + Send + Sync + 'static,
281{
282 async fn prepare_transaction(
283 &self,
284 tx: TransactionRepoModel,
285 ) -> Result<TransactionRepoModel, TransactionError> {
286 self.prepare_transaction_impl(tx).await
287 }
288
289 async fn submit_transaction(
290 &self,
291 tx: TransactionRepoModel,
292 ) -> Result<TransactionRepoModel, TransactionError> {
293 self.submit_transaction_impl(tx).await
294 }
295
296 async fn resubmit_transaction(
297 &self,
298 tx: TransactionRepoModel,
299 ) -> Result<TransactionRepoModel, TransactionError> {
300 Ok(tx)
301 }
302
303 async fn handle_transaction_status(
304 &self,
305 tx: TransactionRepoModel,
306 ) -> Result<TransactionRepoModel, TransactionError> {
307 self.handle_transaction_status_impl(tx).await
308 }
309
310 async fn cancel_transaction(
311 &self,
312 tx: TransactionRepoModel,
313 ) -> Result<TransactionRepoModel, TransactionError> {
314 Ok(tx)
315 }
316
317 async fn replace_transaction(
318 &self,
319 _old_tx: TransactionRepoModel,
320 _new_tx_request: NetworkTransactionRequest,
321 ) -> Result<TransactionRepoModel, TransactionError> {
322 Ok(_old_tx)
323 }
324
325 async fn sign_transaction(
326 &self,
327 tx: TransactionRepoModel,
328 ) -> Result<TransactionRepoModel, TransactionError> {
329 Ok(tx)
330 }
331
332 async fn validate_transaction(
333 &self,
334 _tx: TransactionRepoModel,
335 ) -> Result<bool, TransactionError> {
336 Ok(true)
337 }
338}
339
340pub type DefaultStellarTransaction = StellarRelayerTransaction<
341 RelayerRepositoryStorage,
342 TransactionRepositoryStorage,
343 JobProducer,
344 StellarSigner,
345 StellarProvider,
346 TransactionCounterRepositoryStorage,
347 OrderBookService<StellarProvider, StellarSigner>,
348>;
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353 use crate::{
354 models::{NetworkTransactionData, RepositoryError},
355 services::provider::ProviderError,
356 };
357 use std::sync::Arc;
358
359 use crate::domain::transaction::stellar::test_helpers::*;
360
361 #[test]
362 fn new_returns_ok() {
363 let relayer = create_test_relayer();
364 let mocks = default_test_mocks();
365 let result = StellarRelayerTransaction::new(
366 relayer,
367 Arc::new(mocks.relayer_repo),
368 Arc::new(mocks.tx_repo),
369 Arc::new(mocks.job_producer),
370 Arc::new(mocks.signer),
371 mocks.provider,
372 Arc::new(mocks.counter),
373 Arc::new(mocks.dex_service),
374 );
375 assert!(result.is_ok());
376 }
377
378 #[test]
379 fn accessor_methods_return_correct_references() {
380 let relayer = create_test_relayer();
381 let mocks = default_test_mocks();
382 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
383
384 assert_eq!(handler.relayer().id, "relayer-1");
386 assert_eq!(handler.relayer().address, TEST_PK);
387
388 let _ = handler.provider();
390 let _ = handler.job_producer();
391 let _ = handler.transaction_repository();
392 let _ = handler.signer();
393 let _ = handler.transaction_counter_service();
394 }
395
396 #[tokio::test]
397 async fn send_transaction_request_job_success() {
398 let relayer = create_test_relayer();
399 let mut mocks = default_test_mocks();
400
401 mocks
402 .job_producer
403 .expect_produce_transaction_request_job()
404 .withf(|job, delay| {
405 job.transaction_id == "tx-1" && job.relayer_id == "relayer-1" && delay.is_none()
406 })
407 .times(1)
408 .returning(|_, _| Box::pin(async { Ok(()) }));
409
410 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
411 let tx = create_test_transaction(&relayer.id);
412
413 let result = handler.send_transaction_request_job(&tx, None).await;
414 assert!(result.is_ok());
415 }
416
417 #[tokio::test]
418 async fn send_transaction_request_job_with_delay() {
419 let relayer = create_test_relayer();
420 let mut mocks = default_test_mocks();
421
422 mocks
423 .job_producer
424 .expect_produce_transaction_request_job()
425 .withf(|job, delay| {
426 job.transaction_id == "tx-1"
427 && job.relayer_id == "relayer-1"
428 && delay.is_some()
429 && delay.unwrap() > chrono::Utc::now().timestamp()
430 })
431 .times(1)
432 .returning(|_, _| Box::pin(async { Ok(()) }));
433
434 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
435 let tx = create_test_transaction(&relayer.id);
436
437 let result = handler.send_transaction_request_job(&tx, Some(60)).await;
438 assert!(result.is_ok());
439 }
440
441 #[tokio::test]
442 async fn finalize_transaction_state_success() {
443 let relayer = create_test_relayer();
444 let mut mocks = default_test_mocks();
445
446 mocks
448 .tx_repo
449 .expect_partial_update()
450 .withf(|tx_id, update| {
451 tx_id == "tx-1"
452 && update.status == Some(TransactionStatus::Confirmed)
453 && update.status_reason == Some("Transaction confirmed".to_string())
454 })
455 .times(1)
456 .returning(|tx_id, update| {
457 let mut tx = create_test_transaction("relayer-1");
458 tx.id = tx_id;
459 tx.status = update.status.unwrap();
460 tx.status_reason = update.status_reason;
461 tx.confirmed_at = update.confirmed_at;
462 Ok::<_, RepositoryError>(tx)
463 });
464
465 mocks
467 .job_producer
468 .expect_produce_send_notification_job()
469 .times(1)
470 .returning(|_, _| Box::pin(async { Ok(()) }));
471
472 let handler = make_stellar_tx_handler(relayer, mocks);
473
474 let update_request = TransactionUpdateRequest {
475 status: Some(TransactionStatus::Confirmed),
476 status_reason: Some("Transaction confirmed".to_string()),
477 confirmed_at: Some("2023-01-01T00:00:00Z".to_string()),
478 ..Default::default()
479 };
480
481 let result = handler
482 .finalize_transaction_state("tx-1".to_string(), update_request)
483 .await;
484
485 assert!(result.is_ok());
486 let updated_tx = result.unwrap();
487 assert_eq!(updated_tx.status, TransactionStatus::Confirmed);
488 assert_eq!(
489 updated_tx.status_reason,
490 Some("Transaction confirmed".to_string())
491 );
492 }
493
494 #[tokio::test]
495 async fn enqueue_next_pending_transaction_with_pending_tx() {
496 let relayer = create_test_relayer();
497 let mut mocks = default_test_mocks();
498
499 let mut pending_tx = create_test_transaction(&relayer.id);
501 pending_tx.id = "pending-tx-1".to_string();
502
503 mocks
504 .tx_repo
505 .expect_find_by_status()
506 .withf(|relayer_id, statuses| {
507 relayer_id == "relayer-1" && statuses == [TransactionStatus::Pending]
508 })
509 .times(1)
510 .returning(move |_, _| {
511 let mut tx = create_test_transaction("relayer-1");
512 tx.id = "pending-tx-1".to_string();
513 Ok(vec![tx])
514 });
515
516 mocks
518 .job_producer
519 .expect_produce_transaction_request_job()
520 .withf(|job, delay| job.transaction_id == "pending-tx-1" && delay.is_none())
521 .times(1)
522 .returning(|_, _| Box::pin(async { Ok(()) }));
523
524 let handler = make_stellar_tx_handler(relayer, mocks);
525
526 let result = handler
527 .enqueue_next_pending_transaction("finished-tx")
528 .await;
529 assert!(result.is_ok());
530 }
531
532 #[tokio::test]
533 async fn enqueue_next_pending_transaction_no_pending_tx() {
534 let relayer = create_test_relayer();
535 let mut mocks = default_test_mocks();
536
537 mocks
539 .tx_repo
540 .expect_find_by_status()
541 .times(1)
542 .returning(|_, _| Ok(vec![]));
543
544 let handler = make_stellar_tx_handler(relayer, mocks);
545
546 let result = handler
547 .enqueue_next_pending_transaction("finished-tx")
548 .await;
549 assert!(result.is_ok());
550 }
551
552 #[tokio::test]
553 async fn test_sync_sequence_from_chain() {
554 let relayer = create_test_relayer();
555 let mut mocks = default_test_mocks();
556
557 mocks
559 .provider
560 .expect_get_account()
561 .withf(|addr| addr == TEST_PK)
562 .times(1)
563 .returning(|_| {
564 Box::pin(async {
565 use soroban_rs::xdr::{
566 AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
567 String32, Thresholds, Uint256,
568 };
569 use stellar_strkey::ed25519;
570
571 let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
573 let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
574
575 Ok(AccountEntry {
576 account_id,
577 balance: 1000000,
578 seq_num: SequenceNumber(100),
579 num_sub_entries: 0,
580 inflation_dest: None,
581 flags: 0,
582 home_domain: String32::default(),
583 thresholds: Thresholds([1, 1, 1, 1]),
584 signers: Default::default(),
585 ext: AccountEntryExt::V0,
586 })
587 })
588 });
589
590 mocks
592 .counter
593 .expect_set()
594 .withf(|relayer_id, addr, seq| {
595 relayer_id == "relayer-1" && addr == TEST_PK && *seq == 101
596 })
597 .times(1)
598 .returning(|_, _, _| Box::pin(async { Ok(()) }));
599
600 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
601
602 let result = handler.sync_sequence_from_chain(&relayer.address).await;
603 assert!(result.is_ok());
604 }
605
606 #[tokio::test]
607 async fn test_sync_sequence_from_chain_provider_error() {
608 let relayer = create_test_relayer();
609 let mut mocks = default_test_mocks();
610
611 mocks.provider.expect_get_account().times(1).returning(|_| {
613 Box::pin(async { Err(ProviderError::Other("Account not found".to_string())) })
614 });
615
616 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
617
618 let result = handler.sync_sequence_from_chain(&relayer.address).await;
619 assert!(result.is_err());
620 match result.unwrap_err() {
621 TransactionError::UnexpectedError(msg) => {
622 assert!(msg.contains("Failed to fetch account from chain"));
623 }
624 _ => panic!("Expected UnexpectedError"),
625 }
626 }
627
628 #[tokio::test]
629 async fn test_sync_sequence_from_chain_counter_error() {
630 let relayer = create_test_relayer();
631 let mut mocks = default_test_mocks();
632
633 mocks.provider.expect_get_account().times(1).returning(|_| {
635 Box::pin(async {
636 use soroban_rs::xdr::{
637 AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber, String32,
638 Thresholds, Uint256,
639 };
640 use stellar_strkey::ed25519;
641
642 let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
644 let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
645
646 Ok(AccountEntry {
647 account_id,
648 balance: 1000000,
649 seq_num: SequenceNumber(100),
650 num_sub_entries: 0,
651 inflation_dest: None,
652 flags: 0,
653 home_domain: String32::default(),
654 thresholds: Thresholds([1, 1, 1, 1]),
655 signers: Default::default(),
656 ext: AccountEntryExt::V0,
657 })
658 })
659 });
660
661 mocks.counter.expect_set().times(1).returning(|_, _, _| {
663 Box::pin(async {
664 Err(RepositoryError::Unknown(
665 "Counter update failed".to_string(),
666 ))
667 })
668 });
669
670 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
671
672 let result = handler.sync_sequence_from_chain(&relayer.address).await;
673 assert!(result.is_err());
674 match result.unwrap_err() {
675 TransactionError::UnexpectedError(msg) => {
676 assert!(msg.contains("Failed to update sequence counter"));
677 }
678 _ => panic!("Expected UnexpectedError"),
679 }
680 }
681
682 #[test]
683 fn test_concurrent_transactions_enabled() {
684 let mut relayer = create_test_relayer();
686 if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
687 policy.concurrent_transactions = Some(true);
688 }
689 let mocks = default_test_mocks();
690 let handler = make_stellar_tx_handler(relayer, mocks);
691 assert!(handler.concurrent_transactions_enabled());
692
693 let mut relayer = create_test_relayer();
695 if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
696 policy.concurrent_transactions = Some(false);
697 }
698 let mocks = default_test_mocks();
699 let handler = make_stellar_tx_handler(relayer, mocks);
700 assert!(!handler.concurrent_transactions_enabled());
701
702 let relayer = create_test_relayer();
704 let mocks = default_test_mocks();
705 let handler = make_stellar_tx_handler(relayer, mocks);
706 assert_eq!(
707 handler.concurrent_transactions_enabled(),
708 DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS
709 );
710 }
711
712 #[tokio::test]
713 async fn test_enqueue_next_pending_transaction_with_concurrency_enabled() {
714 let mut relayer = create_test_relayer();
716 if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
717 policy.concurrent_transactions = Some(true);
718 }
719 let mut mocks = default_test_mocks();
720
721 mocks.tx_repo.expect_find_by_status().times(0); mocks
726 .job_producer
727 .expect_produce_transaction_request_job()
728 .times(0); let handler = make_stellar_tx_handler(relayer, mocks);
731
732 let result = handler
733 .enqueue_next_pending_transaction("finished-tx")
734 .await;
735 assert!(result.is_ok());
736 }
737
738 #[tokio::test]
739 async fn test_reset_transaction_for_retry() {
740 let relayer = create_test_relayer();
741 let mut mocks = default_test_mocks();
742
743 let mut tx = create_test_transaction(&relayer.id);
745 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
746 data.sequence_number = Some(42);
747 data.signatures.push(dummy_signature());
748 data.hash = Some("test-hash".to_string());
749 data.signed_envelope_xdr = Some("test-xdr".to_string());
750 }
751
752 mocks
754 .tx_repo
755 .expect_partial_update()
756 .withf(|tx_id, upd| {
757 tx_id == "tx-1"
758 && upd.status == Some(TransactionStatus::Pending)
759 && upd.sent_at.is_none()
760 && upd.confirmed_at.is_none()
761 })
762 .times(1)
763 .returning(|id, upd| {
764 let mut tx = create_test_transaction("relayer-1");
765 tx.id = id;
766 tx.status = upd.status.unwrap();
767 if let Some(network_data) = upd.network_data {
768 tx.network_data = network_data;
769 }
770 Ok::<_, RepositoryError>(tx)
771 });
772
773 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
774
775 let result = handler.reset_transaction_for_retry(tx).await;
776 assert!(result.is_ok());
777
778 let reset_tx = result.unwrap();
779 assert_eq!(reset_tx.status, TransactionStatus::Pending);
780
781 if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
783 assert!(data.sequence_number.is_none());
784 assert!(data.signatures.is_empty());
785 assert!(data.hash.is_none());
786 assert!(data.signed_envelope_xdr.is_none());
787 } else {
788 panic!("Expected Stellar transaction data");
789 }
790 }
791}