openzeppelin_relayer/domain/transaction/stellar/
stellar_transaction.rs

1/// This module defines the `StellarRelayerTransaction` struct and its associated
2/// functionality for handling Stellar transactions.
3/// It includes methods for preparing, submitting, handling status, and
4/// managing notifications for transactions. The module leverages various
5/// services and repositories to perform these operations asynchronously.
6use crate::{
7    constants::DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS,
8    domain::transaction::{stellar::fetch_next_sequence_from_chain, Transaction},
9    jobs::{JobProducer, JobProducerTrait, TransactionRequest},
10    models::{
11        produce_transaction_update_notification_payload, NetworkTransactionRequest,
12        RelayerNetworkPolicy, RelayerRepoModel, TransactionError, TransactionRepoModel,
13        TransactionStatus, TransactionUpdateRequest,
14    },
15    repositories::{
16        RelayerRepositoryStorage, Repository, TransactionCounterRepositoryStorage,
17        TransactionCounterTrait, TransactionRepository, TransactionRepositoryStorage,
18    },
19    services::{
20        provider::{StellarProvider, StellarProviderTrait},
21        signer::{Signer, StellarSigner},
22        stellar_dex::{OrderBookService, StellarDexServiceTrait},
23    },
24    utils::calculate_scheduled_timestamp,
25};
26use async_trait::async_trait;
27use std::sync::Arc;
28use tracing::{error, info};
29
30use super::lane_gate;
31
32#[allow(dead_code)]
33pub struct StellarRelayerTransaction<R, T, J, S, P, C, D>
34where
35    R: Repository<RelayerRepoModel, String>,
36    T: TransactionRepository,
37    J: JobProducerTrait,
38    S: Signer,
39    P: StellarProviderTrait,
40    C: TransactionCounterTrait,
41    D: StellarDexServiceTrait + Send + Sync + 'static,
42{
43    relayer: RelayerRepoModel,
44    relayer_repository: Arc<R>,
45    transaction_repository: Arc<T>,
46    job_producer: Arc<J>,
47    signer: Arc<S>,
48    provider: P,
49    transaction_counter_service: Arc<C>,
50    dex_service: Arc<D>,
51}
52
53#[allow(dead_code)]
54impl<R, T, J, S, P, C, D> StellarRelayerTransaction<R, T, J, S, P, C, D>
55where
56    R: Repository<RelayerRepoModel, String>,
57    T: TransactionRepository,
58    J: JobProducerTrait,
59    S: Signer,
60    P: StellarProviderTrait,
61    C: TransactionCounterTrait,
62    D: StellarDexServiceTrait + Send + Sync + 'static,
63{
64    /// Creates a new `StellarRelayerTransaction`.
65    ///
66    /// # Arguments
67    ///
68    /// * `relayer` - The relayer model.
69    /// * `relayer_repository` - Storage for relayer repository.
70    /// * `transaction_repository` - Storage for transaction repository.
71    /// * `job_producer` - Producer for job queue.
72    /// * `signer` - The Stellar signer.
73    /// * `provider` - The Stellar provider.
74    /// * `transaction_counter_service` - Service for managing transaction counters.
75    /// * `dex_service` - The DEX service implementation for swap operations and validations.
76    ///
77    /// # Returns
78    ///
79    /// A result containing the new `StellarRelayerTransaction` or a `TransactionError`.
80    #[allow(clippy::too_many_arguments)]
81    pub fn new(
82        relayer: RelayerRepoModel,
83        relayer_repository: Arc<R>,
84        transaction_repository: Arc<T>,
85        job_producer: Arc<J>,
86        signer: Arc<S>,
87        provider: P,
88        transaction_counter_service: Arc<C>,
89        dex_service: Arc<D>,
90    ) -> Result<Self, TransactionError> {
91        Ok(Self {
92            relayer,
93            relayer_repository,
94            transaction_repository,
95            job_producer,
96            signer,
97            provider,
98            transaction_counter_service,
99            dex_service,
100        })
101    }
102
103    pub fn provider(&self) -> &P {
104        &self.provider
105    }
106
107    pub fn relayer(&self) -> &RelayerRepoModel {
108        &self.relayer
109    }
110
111    pub fn job_producer(&self) -> &J {
112        &self.job_producer
113    }
114
115    pub fn transaction_repository(&self) -> &T {
116        &self.transaction_repository
117    }
118
119    pub fn signer(&self) -> &S {
120        &self.signer
121    }
122
123    pub fn transaction_counter_service(&self) -> &C {
124        &self.transaction_counter_service
125    }
126
127    pub fn dex_service(&self) -> &D {
128        &self.dex_service
129    }
130
131    pub fn concurrent_transactions_enabled(&self) -> bool {
132        if let RelayerNetworkPolicy::Stellar(policy) = &self.relayer().policies {
133            policy
134                .concurrent_transactions
135                .unwrap_or(DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS)
136        } else {
137            DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS
138        }
139    }
140
141    /// Send a transaction-request job for the given transaction.
142    pub async fn send_transaction_request_job(
143        &self,
144        tx: &TransactionRepoModel,
145        delay_seconds: Option<i64>,
146    ) -> Result<(), TransactionError> {
147        let job = TransactionRequest::new(tx.id.clone(), tx.relayer_id.clone());
148        let scheduled_on = delay_seconds.map(calculate_scheduled_timestamp);
149        self.job_producer()
150            .produce_transaction_request_job(job, scheduled_on)
151            .await?;
152        Ok(())
153    }
154
155    /// Sends a transaction update notification if a notification ID is configured.
156    ///
157    /// This is a best-effort operation that logs errors but does not propagate them,
158    /// as notification failures should not affect the transaction lifecycle.
159    pub(super) async fn send_transaction_update_notification(&self, tx: &TransactionRepoModel) {
160        if let Some(notification_id) = &self.relayer().notification_id {
161            if let Err(e) = self
162                .job_producer()
163                .produce_send_notification_job(
164                    produce_transaction_update_notification_payload(notification_id, tx),
165                    None,
166                )
167                .await
168            {
169                error!(error = %e, "failed to produce notification job");
170            }
171        }
172    }
173
174    /// Helper function to update transaction status, save it, and send a notification.
175    pub async fn finalize_transaction_state(
176        &self,
177        tx_id: String,
178        update_req: TransactionUpdateRequest,
179    ) -> Result<TransactionRepoModel, TransactionError> {
180        let updated_tx = self
181            .transaction_repository()
182            .partial_update(tx_id, update_req)
183            .await?;
184
185        self.send_transaction_update_notification(&updated_tx).await;
186        Ok(updated_tx)
187    }
188
189    pub async fn enqueue_next_pending_transaction(
190        &self,
191        finished_tx_id: &str,
192    ) -> Result<(), TransactionError> {
193        if !self.concurrent_transactions_enabled() {
194            if let Some(next) = self
195                .find_oldest_pending_for_relayer(&self.relayer().id)
196                .await?
197            {
198                // Atomic hand-over while still owning the lane
199                info!(to_tx_id = %next.id, finished_tx_id = %finished_tx_id, "handing over lane");
200                lane_gate::pass_to(&self.relayer().id, finished_tx_id, &next.id);
201                self.send_transaction_request_job(&next, None).await?;
202            } else {
203                info!(finished_tx_id = %finished_tx_id, "releasing relayer lane");
204                lane_gate::free(&self.relayer().id, finished_tx_id);
205            }
206        }
207        Ok(())
208    }
209
210    /// Finds the oldest pending transaction for a relayer.
211    async fn find_oldest_pending_for_relayer(
212        &self,
213        relayer_id: &str,
214    ) -> Result<Option<TransactionRepoModel>, TransactionError> {
215        let pending_txs = self
216            .transaction_repository()
217            .find_by_status(relayer_id, &[TransactionStatus::Pending])
218            .await
219            .map_err(TransactionError::from)?;
220
221        Ok(pending_txs.into_iter().next())
222    }
223
224    /// Syncs the sequence number from the blockchain for the relayer's address.
225    /// This fetches the on-chain sequence number and updates the local counter to the next usable value.
226    pub async fn sync_sequence_from_chain(
227        &self,
228        relayer_address: &str,
229    ) -> Result<(), TransactionError> {
230        info!(address = %relayer_address, "syncing sequence number from chain");
231
232        // Use the shared helper to fetch the next sequence
233        let next_usable_seq = fetch_next_sequence_from_chain(self.provider(), relayer_address)
234            .await
235            .map_err(TransactionError::UnexpectedError)?;
236
237        // Update the local counter to the next usable sequence
238        self.transaction_counter_service()
239            .set(&self.relayer().id, relayer_address, next_usable_seq)
240            .await
241            .map_err(|e| {
242                TransactionError::UnexpectedError(format!("Failed to update sequence counter: {e}"))
243            })?;
244
245        info!(sequence = %next_usable_seq, "updated local sequence counter");
246        Ok(())
247    }
248
249    /// Resets a transaction to its pre-prepare state for reprocessing through the pipeline.
250    /// This is used when a transaction fails with a bad sequence error and needs to be retried.
251    pub async fn reset_transaction_for_retry(
252        &self,
253        tx: TransactionRepoModel,
254    ) -> Result<TransactionRepoModel, TransactionError> {
255        info!("resetting transaction for retry through pipeline");
256
257        // Use the model's built-in reset method
258        let update_req = tx.create_reset_update_request()?;
259
260        // Update the transaction
261        let reset_tx = self
262            .transaction_repository()
263            .partial_update(tx.id.clone(), update_req)
264            .await?;
265
266        info!("transaction reset successfully to pre-prepare state");
267        Ok(reset_tx)
268    }
269}
270
271#[async_trait]
272impl<R, T, J, S, P, C, D> Transaction for StellarRelayerTransaction<R, T, J, S, P, C, D>
273where
274    R: Repository<RelayerRepoModel, String> + Send + Sync,
275    T: TransactionRepository + Send + Sync,
276    J: JobProducerTrait + Send + Sync,
277    S: Signer + Send + Sync,
278    P: StellarProviderTrait + Send + Sync,
279    C: TransactionCounterTrait + Send + Sync,
280    D: StellarDexServiceTrait + Send + Sync + 'static,
281{
282    async fn prepare_transaction(
283        &self,
284        tx: TransactionRepoModel,
285    ) -> Result<TransactionRepoModel, TransactionError> {
286        self.prepare_transaction_impl(tx).await
287    }
288
289    async fn submit_transaction(
290        &self,
291        tx: TransactionRepoModel,
292    ) -> Result<TransactionRepoModel, TransactionError> {
293        self.submit_transaction_impl(tx).await
294    }
295
296    async fn resubmit_transaction(
297        &self,
298        tx: TransactionRepoModel,
299    ) -> Result<TransactionRepoModel, TransactionError> {
300        Ok(tx)
301    }
302
303    async fn handle_transaction_status(
304        &self,
305        tx: TransactionRepoModel,
306    ) -> Result<TransactionRepoModel, TransactionError> {
307        self.handle_transaction_status_impl(tx).await
308    }
309
310    async fn cancel_transaction(
311        &self,
312        tx: TransactionRepoModel,
313    ) -> Result<TransactionRepoModel, TransactionError> {
314        Ok(tx)
315    }
316
317    async fn replace_transaction(
318        &self,
319        _old_tx: TransactionRepoModel,
320        _new_tx_request: NetworkTransactionRequest,
321    ) -> Result<TransactionRepoModel, TransactionError> {
322        Ok(_old_tx)
323    }
324
325    async fn sign_transaction(
326        &self,
327        tx: TransactionRepoModel,
328    ) -> Result<TransactionRepoModel, TransactionError> {
329        Ok(tx)
330    }
331
332    async fn validate_transaction(
333        &self,
334        _tx: TransactionRepoModel,
335    ) -> Result<bool, TransactionError> {
336        Ok(true)
337    }
338}
339
340pub type DefaultStellarTransaction = StellarRelayerTransaction<
341    RelayerRepositoryStorage,
342    TransactionRepositoryStorage,
343    JobProducer,
344    StellarSigner,
345    StellarProvider,
346    TransactionCounterRepositoryStorage,
347    OrderBookService<StellarProvider, StellarSigner>,
348>;
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353    use crate::{
354        models::{NetworkTransactionData, RepositoryError},
355        services::provider::ProviderError,
356    };
357    use std::sync::Arc;
358
359    use crate::domain::transaction::stellar::test_helpers::*;
360
361    #[test]
362    fn new_returns_ok() {
363        let relayer = create_test_relayer();
364        let mocks = default_test_mocks();
365        let result = StellarRelayerTransaction::new(
366            relayer,
367            Arc::new(mocks.relayer_repo),
368            Arc::new(mocks.tx_repo),
369            Arc::new(mocks.job_producer),
370            Arc::new(mocks.signer),
371            mocks.provider,
372            Arc::new(mocks.counter),
373            Arc::new(mocks.dex_service),
374        );
375        assert!(result.is_ok());
376    }
377
378    #[test]
379    fn accessor_methods_return_correct_references() {
380        let relayer = create_test_relayer();
381        let mocks = default_test_mocks();
382        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
383
384        // Test all accessor methods
385        assert_eq!(handler.relayer().id, "relayer-1");
386        assert_eq!(handler.relayer().address, TEST_PK);
387
388        // These should not panic and return valid references
389        let _ = handler.provider();
390        let _ = handler.job_producer();
391        let _ = handler.transaction_repository();
392        let _ = handler.signer();
393        let _ = handler.transaction_counter_service();
394    }
395
396    #[tokio::test]
397    async fn send_transaction_request_job_success() {
398        let relayer = create_test_relayer();
399        let mut mocks = default_test_mocks();
400
401        mocks
402            .job_producer
403            .expect_produce_transaction_request_job()
404            .withf(|job, delay| {
405                job.transaction_id == "tx-1" && job.relayer_id == "relayer-1" && delay.is_none()
406            })
407            .times(1)
408            .returning(|_, _| Box::pin(async { Ok(()) }));
409
410        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
411        let tx = create_test_transaction(&relayer.id);
412
413        let result = handler.send_transaction_request_job(&tx, None).await;
414        assert!(result.is_ok());
415    }
416
417    #[tokio::test]
418    async fn send_transaction_request_job_with_delay() {
419        let relayer = create_test_relayer();
420        let mut mocks = default_test_mocks();
421
422        mocks
423            .job_producer
424            .expect_produce_transaction_request_job()
425            .withf(|job, delay| {
426                job.transaction_id == "tx-1"
427                    && job.relayer_id == "relayer-1"
428                    && delay.is_some()
429                    && delay.unwrap() > chrono::Utc::now().timestamp()
430            })
431            .times(1)
432            .returning(|_, _| Box::pin(async { Ok(()) }));
433
434        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
435        let tx = create_test_transaction(&relayer.id);
436
437        let result = handler.send_transaction_request_job(&tx, Some(60)).await;
438        assert!(result.is_ok());
439    }
440
441    #[tokio::test]
442    async fn finalize_transaction_state_success() {
443        let relayer = create_test_relayer();
444        let mut mocks = default_test_mocks();
445
446        // Mock repository update
447        mocks
448            .tx_repo
449            .expect_partial_update()
450            .withf(|tx_id, update| {
451                tx_id == "tx-1"
452                    && update.status == Some(TransactionStatus::Confirmed)
453                    && update.status_reason == Some("Transaction confirmed".to_string())
454            })
455            .times(1)
456            .returning(|tx_id, update| {
457                let mut tx = create_test_transaction("relayer-1");
458                tx.id = tx_id;
459                tx.status = update.status.unwrap();
460                tx.status_reason = update.status_reason;
461                tx.confirmed_at = update.confirmed_at;
462                Ok::<_, RepositoryError>(tx)
463            });
464
465        // Mock notification
466        mocks
467            .job_producer
468            .expect_produce_send_notification_job()
469            .times(1)
470            .returning(|_, _| Box::pin(async { Ok(()) }));
471
472        let handler = make_stellar_tx_handler(relayer, mocks);
473
474        let update_request = TransactionUpdateRequest {
475            status: Some(TransactionStatus::Confirmed),
476            status_reason: Some("Transaction confirmed".to_string()),
477            confirmed_at: Some("2023-01-01T00:00:00Z".to_string()),
478            ..Default::default()
479        };
480
481        let result = handler
482            .finalize_transaction_state("tx-1".to_string(), update_request)
483            .await;
484
485        assert!(result.is_ok());
486        let updated_tx = result.unwrap();
487        assert_eq!(updated_tx.status, TransactionStatus::Confirmed);
488        assert_eq!(
489            updated_tx.status_reason,
490            Some("Transaction confirmed".to_string())
491        );
492    }
493
494    #[tokio::test]
495    async fn enqueue_next_pending_transaction_with_pending_tx() {
496        let relayer = create_test_relayer();
497        let mut mocks = default_test_mocks();
498
499        // Mock finding a pending transaction
500        let mut pending_tx = create_test_transaction(&relayer.id);
501        pending_tx.id = "pending-tx-1".to_string();
502
503        mocks
504            .tx_repo
505            .expect_find_by_status()
506            .withf(|relayer_id, statuses| {
507                relayer_id == "relayer-1" && statuses == [TransactionStatus::Pending]
508            })
509            .times(1)
510            .returning(move |_, _| {
511                let mut tx = create_test_transaction("relayer-1");
512                tx.id = "pending-tx-1".to_string();
513                Ok(vec![tx])
514            });
515
516        // Mock job production for the next transaction
517        mocks
518            .job_producer
519            .expect_produce_transaction_request_job()
520            .withf(|job, delay| job.transaction_id == "pending-tx-1" && delay.is_none())
521            .times(1)
522            .returning(|_, _| Box::pin(async { Ok(()) }));
523
524        let handler = make_stellar_tx_handler(relayer, mocks);
525
526        let result = handler
527            .enqueue_next_pending_transaction("finished-tx")
528            .await;
529        assert!(result.is_ok());
530    }
531
532    #[tokio::test]
533    async fn enqueue_next_pending_transaction_no_pending_tx() {
534        let relayer = create_test_relayer();
535        let mut mocks = default_test_mocks();
536
537        // Mock finding no pending transactions
538        mocks
539            .tx_repo
540            .expect_find_by_status()
541            .times(1)
542            .returning(|_, _| Ok(vec![]));
543
544        let handler = make_stellar_tx_handler(relayer, mocks);
545
546        let result = handler
547            .enqueue_next_pending_transaction("finished-tx")
548            .await;
549        assert!(result.is_ok());
550    }
551
552    #[tokio::test]
553    async fn test_sync_sequence_from_chain() {
554        let relayer = create_test_relayer();
555        let mut mocks = default_test_mocks();
556
557        // Mock provider to return account with sequence 100
558        mocks
559            .provider
560            .expect_get_account()
561            .withf(|addr| addr == TEST_PK)
562            .times(1)
563            .returning(|_| {
564                Box::pin(async {
565                    use soroban_rs::xdr::{
566                        AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
567                        String32, Thresholds, Uint256,
568                    };
569                    use stellar_strkey::ed25519;
570
571                    // Create a dummy public key for account ID
572                    let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
573                    let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
574
575                    Ok(AccountEntry {
576                        account_id,
577                        balance: 1000000,
578                        seq_num: SequenceNumber(100),
579                        num_sub_entries: 0,
580                        inflation_dest: None,
581                        flags: 0,
582                        home_domain: String32::default(),
583                        thresholds: Thresholds([1, 1, 1, 1]),
584                        signers: Default::default(),
585                        ext: AccountEntryExt::V0,
586                    })
587                })
588            });
589
590        // Mock counter set to verify it's called with next usable sequence (101)
591        mocks
592            .counter
593            .expect_set()
594            .withf(|relayer_id, addr, seq| {
595                relayer_id == "relayer-1" && addr == TEST_PK && *seq == 101
596            })
597            .times(1)
598            .returning(|_, _, _| Box::pin(async { Ok(()) }));
599
600        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
601
602        let result = handler.sync_sequence_from_chain(&relayer.address).await;
603        assert!(result.is_ok());
604    }
605
606    #[tokio::test]
607    async fn test_sync_sequence_from_chain_provider_error() {
608        let relayer = create_test_relayer();
609        let mut mocks = default_test_mocks();
610
611        // Mock provider to fail
612        mocks.provider.expect_get_account().times(1).returning(|_| {
613            Box::pin(async { Err(ProviderError::Other("Account not found".to_string())) })
614        });
615
616        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
617
618        let result = handler.sync_sequence_from_chain(&relayer.address).await;
619        assert!(result.is_err());
620        match result.unwrap_err() {
621            TransactionError::UnexpectedError(msg) => {
622                assert!(msg.contains("Failed to fetch account from chain"));
623            }
624            _ => panic!("Expected UnexpectedError"),
625        }
626    }
627
628    #[tokio::test]
629    async fn test_sync_sequence_from_chain_counter_error() {
630        let relayer = create_test_relayer();
631        let mut mocks = default_test_mocks();
632
633        // Mock provider success
634        mocks.provider.expect_get_account().times(1).returning(|_| {
635            Box::pin(async {
636                use soroban_rs::xdr::{
637                    AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber, String32,
638                    Thresholds, Uint256,
639                };
640                use stellar_strkey::ed25519;
641
642                // Create a dummy public key for account ID
643                let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
644                let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
645
646                Ok(AccountEntry {
647                    account_id,
648                    balance: 1000000,
649                    seq_num: SequenceNumber(100),
650                    num_sub_entries: 0,
651                    inflation_dest: None,
652                    flags: 0,
653                    home_domain: String32::default(),
654                    thresholds: Thresholds([1, 1, 1, 1]),
655                    signers: Default::default(),
656                    ext: AccountEntryExt::V0,
657                })
658            })
659        });
660
661        // Mock counter set to fail
662        mocks.counter.expect_set().times(1).returning(|_, _, _| {
663            Box::pin(async {
664                Err(RepositoryError::Unknown(
665                    "Counter update failed".to_string(),
666                ))
667            })
668        });
669
670        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
671
672        let result = handler.sync_sequence_from_chain(&relayer.address).await;
673        assert!(result.is_err());
674        match result.unwrap_err() {
675            TransactionError::UnexpectedError(msg) => {
676                assert!(msg.contains("Failed to update sequence counter"));
677            }
678            _ => panic!("Expected UnexpectedError"),
679        }
680    }
681
682    #[test]
683    fn test_concurrent_transactions_enabled() {
684        // Test with concurrent transactions explicitly enabled
685        let mut relayer = create_test_relayer();
686        if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
687            policy.concurrent_transactions = Some(true);
688        }
689        let mocks = default_test_mocks();
690        let handler = make_stellar_tx_handler(relayer, mocks);
691        assert!(handler.concurrent_transactions_enabled());
692
693        // Test with concurrent transactions explicitly disabled
694        let mut relayer = create_test_relayer();
695        if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
696            policy.concurrent_transactions = Some(false);
697        }
698        let mocks = default_test_mocks();
699        let handler = make_stellar_tx_handler(relayer, mocks);
700        assert!(!handler.concurrent_transactions_enabled());
701
702        // Test with default (None) - should use DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS
703        let relayer = create_test_relayer();
704        let mocks = default_test_mocks();
705        let handler = make_stellar_tx_handler(relayer, mocks);
706        assert_eq!(
707            handler.concurrent_transactions_enabled(),
708            DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS
709        );
710    }
711
712    #[tokio::test]
713    async fn test_enqueue_next_pending_transaction_with_concurrency_enabled() {
714        // With concurrent transactions enabled, lane management should be skipped
715        let mut relayer = create_test_relayer();
716        if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
717            policy.concurrent_transactions = Some(true);
718        }
719        let mut mocks = default_test_mocks();
720
721        // Should NOT look for pending transactions when concurrency is enabled
722        mocks.tx_repo.expect_find_by_status().times(0); // Expect zero calls
723
724        // Should NOT produce any job when concurrency is enabled
725        mocks
726            .job_producer
727            .expect_produce_transaction_request_job()
728            .times(0); // Expect zero calls
729
730        let handler = make_stellar_tx_handler(relayer, mocks);
731
732        let result = handler
733            .enqueue_next_pending_transaction("finished-tx")
734            .await;
735        assert!(result.is_ok());
736    }
737
738    #[tokio::test]
739    async fn test_reset_transaction_for_retry() {
740        let relayer = create_test_relayer();
741        let mut mocks = default_test_mocks();
742
743        // Create a transaction with stellar data that has been prepared
744        let mut tx = create_test_transaction(&relayer.id);
745        if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
746            data.sequence_number = Some(42);
747            data.signatures.push(dummy_signature());
748            data.hash = Some("test-hash".to_string());
749            data.signed_envelope_xdr = Some("test-xdr".to_string());
750        }
751
752        // Mock partial_update to reset transaction
753        mocks
754            .tx_repo
755            .expect_partial_update()
756            .withf(|tx_id, upd| {
757                tx_id == "tx-1"
758                    && upd.status == Some(TransactionStatus::Pending)
759                    && upd.sent_at.is_none()
760                    && upd.confirmed_at.is_none()
761            })
762            .times(1)
763            .returning(|id, upd| {
764                let mut tx = create_test_transaction("relayer-1");
765                tx.id = id;
766                tx.status = upd.status.unwrap();
767                if let Some(network_data) = upd.network_data {
768                    tx.network_data = network_data;
769                }
770                Ok::<_, RepositoryError>(tx)
771            });
772
773        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
774
775        let result = handler.reset_transaction_for_retry(tx).await;
776        assert!(result.is_ok());
777
778        let reset_tx = result.unwrap();
779        assert_eq!(reset_tx.status, TransactionStatus::Pending);
780
781        // Verify stellar data was reset
782        if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
783            assert!(data.sequence_number.is_none());
784            assert!(data.signatures.is_empty());
785            assert!(data.hash.is_none());
786            assert!(data.signed_envelope_xdr.is_none());
787        } else {
788            panic!("Expected Stellar transaction data");
789        }
790    }
791}