openzeppelin_relayer/domain/transaction/stellar/
submit.rs

1//! This module contains the submission-related functionality for Stellar transactions.
2//! It includes methods for submitting transactions with robust error handling,
3//! ensuring proper transaction state management on failure.
4
5use chrono::Utc;
6use tracing::{info, warn};
7
8use super::{is_final_state, utils::is_bad_sequence_error, StellarRelayerTransaction};
9use crate::{
10    constants::STELLAR_BAD_SEQUENCE_RETRY_DELAY_SECONDS,
11    jobs::JobProducerTrait,
12    models::{
13        NetworkTransactionData, RelayerRepoModel, TransactionError, TransactionRepoModel,
14        TransactionStatus, TransactionUpdateRequest,
15    },
16    repositories::{Repository, TransactionCounterTrait, TransactionRepository},
17    services::{provider::StellarProviderTrait, signer::Signer},
18    utils::calculate_scheduled_timestamp,
19};
20
21impl<R, T, J, S, P, C, D> StellarRelayerTransaction<R, T, J, S, P, C, D>
22where
23    R: Repository<RelayerRepoModel, String> + Send + Sync,
24    T: TransactionRepository + Send + Sync,
25    J: JobProducerTrait + Send + Sync,
26    S: Signer + Send + Sync,
27    P: StellarProviderTrait + Send + Sync,
28    C: TransactionCounterTrait + Send + Sync,
29    D: crate::services::stellar_dex::StellarDexServiceTrait + Send + Sync + 'static,
30{
31    /// Main submission method with robust error handling.
32    /// Unlike prepare, submit doesn't claim lanes but still needs proper error handling.
33    pub async fn submit_transaction_impl(
34        &self,
35        tx: TransactionRepoModel,
36    ) -> Result<TransactionRepoModel, TransactionError> {
37        info!(tx_id = %tx.id, status = ?tx.status, "submitting stellar transaction");
38
39        // Defensive check: if transaction is in a final state or unexpected state, don't retry
40        if is_final_state(&tx.status) {
41            warn!(
42                tx_id = %tx.id,
43                status = ?tx.status,
44                "transaction already in final state, skipping submission"
45            );
46            return Ok(tx);
47        }
48
49        // Call core submission logic with error handling
50        match self.submit_core(tx.clone()).await {
51            Ok(submitted_tx) => Ok(submitted_tx),
52            Err(error) => {
53                // Handle submission failure - mark as failed and send notification
54                self.handle_submit_failure(tx, error).await
55            }
56        }
57    }
58
59    /// Core submission logic - pure business logic without error handling concerns.
60    async fn submit_core(
61        &self,
62        tx: TransactionRepoModel,
63    ) -> Result<TransactionRepoModel, TransactionError> {
64        let stellar_data = tx.network_data.get_stellar_transaction_data()?;
65        let tx_envelope = stellar_data
66            .get_envelope_for_submission()
67            .map_err(TransactionError::from)?;
68
69        let hash = self
70            .provider()
71            .send_transaction(&tx_envelope)
72            .await
73            .map_err(TransactionError::from)?;
74
75        let tx_hash_hex = hex::encode(hash.as_slice());
76        let updated_stellar_data = stellar_data.with_hash(tx_hash_hex.clone());
77
78        let mut hashes = tx.hashes.clone();
79        hashes.push(tx_hash_hex);
80
81        let update_req = TransactionUpdateRequest {
82            status: Some(TransactionStatus::Submitted),
83            sent_at: Some(Utc::now().to_rfc3339()),
84            network_data: Some(NetworkTransactionData::Stellar(updated_stellar_data)),
85            hashes: Some(hashes),
86            ..Default::default()
87        };
88
89        let updated_tx = self
90            .transaction_repository()
91            .partial_update(tx.id.clone(), update_req)
92            .await?;
93
94        // Send notification
95        self.send_transaction_update_notification(&updated_tx).await;
96
97        Ok(updated_tx)
98    }
99
100    /// Handles submission failures with comprehensive cleanup and error reporting.
101    /// For bad sequence errors, resets the transaction and re-enqueues it for retry.
102    async fn handle_submit_failure(
103        &self,
104        tx: TransactionRepoModel,
105        error: TransactionError,
106    ) -> Result<TransactionRepoModel, TransactionError> {
107        let error_reason = format!("Submission failed: {error}");
108        let tx_id = tx.id.clone();
109        warn!(reason = %error_reason, "transaction submission failed");
110
111        if is_bad_sequence_error(&error_reason) {
112            // For bad sequence errors, sync sequence from chain first
113            if let Ok(stellar_data) = tx.network_data.get_stellar_transaction_data() {
114                info!("syncing sequence from chain after bad sequence error");
115                match self
116                    .sync_sequence_from_chain(&stellar_data.source_account)
117                    .await
118                {
119                    Ok(()) => {
120                        info!("successfully synced sequence from chain");
121                    }
122                    Err(sync_error) => {
123                        warn!(error = %sync_error, "failed to sync sequence from chain");
124                    }
125                }
126            }
127
128            // Reset the transaction and re-enqueue it
129            info!("bad sequence error detected, resetting and re-enqueueing");
130
131            // Reset the transaction to pending state
132            match self.reset_transaction_for_retry(tx.clone()).await {
133                Ok(reset_tx) => {
134                    // Re-enqueue the transaction to go through the pipeline again
135                    if let Err(e) = self
136                        .send_transaction_request_job(
137                            &reset_tx,
138                            Some(calculate_scheduled_timestamp(
139                                STELLAR_BAD_SEQUENCE_RETRY_DELAY_SECONDS,
140                            )),
141                        )
142                        .await
143                    {
144                        warn!(error = %e, "failed to re-enqueue transaction after reset");
145                    } else {
146                        info!("transaction reset and re-enqueued for retry through pipeline");
147                    }
148
149                    // Return success since we're handling the retry
150                    return Ok(reset_tx);
151                }
152                Err(reset_error) => {
153                    warn!(error = %reset_error, "failed to reset transaction for retry");
154                    // Fall through to normal failure handling
155                }
156            }
157        }
158
159        // For non-bad-sequence errors or if reset failed, mark as failed
160        // Step 1: Mark transaction as Failed with detailed reason
161        let update_request = TransactionUpdateRequest {
162            status: Some(TransactionStatus::Failed),
163            status_reason: Some(error_reason.clone()),
164            ..Default::default()
165        };
166        let _failed_tx = match self
167            .finalize_transaction_state(tx_id.clone(), update_request)
168            .await
169        {
170            Ok(updated_tx) => updated_tx,
171            Err(finalize_error) => {
172                warn!(error = %finalize_error, "failed to mark transaction as failed, continuing with lane cleanup");
173                tx
174            }
175        };
176
177        // Attempt to enqueue next pending transaction or release lane
178        if let Err(enqueue_error) = self.enqueue_next_pending_transaction(&tx_id).await {
179            warn!(error = %enqueue_error, "failed to enqueue next pending transaction after submission failure");
180        }
181
182        info!(error = %error_reason, "transaction submission failure handled");
183
184        Err(error)
185    }
186
187    /// Resubmit transaction - delegates to submit_transaction_impl
188    pub async fn resubmit_transaction_impl(
189        &self,
190        tx: TransactionRepoModel,
191    ) -> Result<TransactionRepoModel, TransactionError> {
192        self.submit_transaction_impl(tx).await
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use soroban_rs::xdr::{Hash, WriteXdr};
200
201    use crate::domain::transaction::stellar::test_helpers::*;
202
203    mod submit_transaction_tests {
204        use crate::{models::RepositoryError, services::provider::ProviderError};
205
206        use super::*;
207
208        #[tokio::test]
209        async fn submit_transaction_happy_path() {
210            let relayer = create_test_relayer();
211            let mut mocks = default_test_mocks();
212
213            // provider gives a hash
214            mocks
215                .provider
216                .expect_send_transaction()
217                .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
218
219            // expect partial update to Submitted
220            mocks
221                .tx_repo
222                .expect_partial_update()
223                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
224                .returning(|id, upd| {
225                    let mut tx = create_test_transaction("relayer-1");
226                    tx.id = id;
227                    tx.status = upd.status.unwrap();
228                    Ok::<_, RepositoryError>(tx)
229                });
230
231            // Expect notification
232            mocks
233                .job_producer
234                .expect_produce_send_notification_job()
235                .times(1)
236                .returning(|_, _| Box::pin(async { Ok(()) }));
237
238            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
239
240            let mut tx = create_test_transaction(&relayer.id);
241            if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
242                d.signatures.push(dummy_signature());
243            }
244
245            let res = handler.submit_transaction_impl(tx).await.unwrap();
246            assert_eq!(res.status, TransactionStatus::Submitted);
247        }
248
249        #[tokio::test]
250        async fn submit_transaction_provider_error_marks_failed() {
251            let relayer = create_test_relayer();
252            let mut mocks = default_test_mocks();
253
254            // Provider fails with non-bad-sequence error
255            mocks.provider.expect_send_transaction().returning(|_| {
256                Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
257            });
258
259            // Mock finalize_transaction_state for failure handling
260            mocks
261                .tx_repo
262                .expect_partial_update()
263                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
264                .returning(|id, upd| {
265                    let mut tx = create_test_transaction("relayer-1");
266                    tx.id = id;
267                    tx.status = upd.status.unwrap();
268                    Ok::<_, RepositoryError>(tx)
269                });
270
271            // Mock notification for failed transaction
272            mocks
273                .job_producer
274                .expect_produce_send_notification_job()
275                .times(1)
276                .returning(|_, _| Box::pin(async { Ok(()) }));
277
278            // Mock find_by_status for enqueue_next_pending_transaction
279            mocks
280                .tx_repo
281                .expect_find_by_status()
282                .returning(|_, _| Ok(vec![])); // No pending transactions
283
284            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
285            let mut tx = create_test_transaction(&relayer.id);
286            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
287                data.signatures.push(dummy_signature());
288                data.sequence_number = Some(42); // Set sequence number
289            }
290
291            let res = handler.submit_transaction_impl(tx).await;
292
293            // Should return error but transaction should be marked as failed
294            assert!(res.is_err());
295            matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
296        }
297
298        #[tokio::test]
299        async fn submit_transaction_repository_error_marks_failed() {
300            let relayer = create_test_relayer();
301            let mut mocks = default_test_mocks();
302
303            // Provider succeeds
304            mocks
305                .provider
306                .expect_send_transaction()
307                .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
308
309            // Repository fails on first update (submission)
310            mocks
311                .tx_repo
312                .expect_partial_update()
313                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
314                .returning(|_, _| Err(RepositoryError::Unknown("Database error".to_string())));
315
316            // Mock finalize_transaction_state for failure handling
317            mocks
318                .tx_repo
319                .expect_partial_update()
320                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
321                .returning(|id, upd| {
322                    let mut tx = create_test_transaction("relayer-1");
323                    tx.id = id;
324                    tx.status = upd.status.unwrap();
325                    Ok::<_, RepositoryError>(tx)
326                });
327
328            // Mock notification for failed transaction
329            mocks
330                .job_producer
331                .expect_produce_send_notification_job()
332                .times(1)
333                .returning(|_, _| Box::pin(async { Ok(()) }));
334
335            // Mock find_by_status for enqueue_next_pending_transaction
336            mocks
337                .tx_repo
338                .expect_find_by_status()
339                .returning(|_, _| Ok(vec![])); // No pending transactions
340
341            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
342            let mut tx = create_test_transaction(&relayer.id);
343            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
344                data.signatures.push(dummy_signature());
345                data.sequence_number = Some(42); // Set sequence number
346            }
347
348            let res = handler.submit_transaction_impl(tx).await;
349
350            // Should return error but transaction should be marked as failed
351            assert!(res.is_err());
352        }
353
354        #[tokio::test]
355        async fn submit_transaction_uses_signed_envelope_xdr() {
356            let relayer = create_test_relayer();
357            let mut mocks = default_test_mocks();
358
359            // Create a transaction with signed_envelope_xdr set
360            let mut tx = create_test_transaction(&relayer.id);
361            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
362                data.signatures.push(dummy_signature());
363                // Build and store the signed envelope XDR
364                let envelope = data.get_envelope_for_submission().unwrap();
365                let xdr = envelope
366                    .to_xdr_base64(soroban_rs::xdr::Limits::none())
367                    .unwrap();
368                data.signed_envelope_xdr = Some(xdr);
369            }
370
371            // Provider should receive the envelope decoded from signed_envelope_xdr
372            mocks
373                .provider
374                .expect_send_transaction()
375                .returning(|_| Box::pin(async { Ok(Hash([2u8; 32])) }));
376
377            // Update to Submitted
378            mocks
379                .tx_repo
380                .expect_partial_update()
381                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
382                .returning(|id, upd| {
383                    let mut tx = create_test_transaction("relayer-1");
384                    tx.id = id;
385                    tx.status = upd.status.unwrap();
386                    Ok::<_, RepositoryError>(tx)
387                });
388
389            // Expect notification
390            mocks
391                .job_producer
392                .expect_produce_send_notification_job()
393                .times(1)
394                .returning(|_, _| Box::pin(async { Ok(()) }));
395
396            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
397            let res = handler.submit_transaction_impl(tx).await.unwrap();
398
399            assert_eq!(res.status, TransactionStatus::Submitted);
400        }
401
402        #[tokio::test]
403        async fn resubmit_transaction_delegates_to_submit() {
404            let relayer = create_test_relayer();
405            let mut mocks = default_test_mocks();
406
407            // provider gives a hash
408            mocks
409                .provider
410                .expect_send_transaction()
411                .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
412
413            // expect partial update to Submitted
414            mocks
415                .tx_repo
416                .expect_partial_update()
417                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
418                .returning(|id, upd| {
419                    let mut tx = create_test_transaction("relayer-1");
420                    tx.id = id;
421                    tx.status = upd.status.unwrap();
422                    Ok::<_, RepositoryError>(tx)
423                });
424
425            // Expect notification
426            mocks
427                .job_producer
428                .expect_produce_send_notification_job()
429                .times(1)
430                .returning(|_, _| Box::pin(async { Ok(()) }));
431
432            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
433
434            let mut tx = create_test_transaction(&relayer.id);
435            if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
436                d.signatures.push(dummy_signature());
437            }
438
439            let res = handler.resubmit_transaction_impl(tx).await.unwrap();
440            assert_eq!(res.status, TransactionStatus::Submitted);
441        }
442
443        #[tokio::test]
444        async fn submit_transaction_failure_enqueues_next_transaction() {
445            let relayer = create_test_relayer();
446            let mut mocks = default_test_mocks();
447
448            // Provider fails with non-bad-sequence error
449            mocks.provider.expect_send_transaction().returning(|_| {
450                Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
451            });
452
453            // No sync expected for non-bad-sequence errors
454
455            // Mock finalize_transaction_state for failure handling
456            mocks
457                .tx_repo
458                .expect_partial_update()
459                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
460                .returning(|id, upd| {
461                    let mut tx = create_test_transaction("relayer-1");
462                    tx.id = id;
463                    tx.status = upd.status.unwrap();
464                    Ok::<_, RepositoryError>(tx)
465                });
466
467            // Mock notification for failed transaction
468            mocks
469                .job_producer
470                .expect_produce_send_notification_job()
471                .times(1)
472                .returning(|_, _| Box::pin(async { Ok(()) }));
473
474            // Mock find_by_status to return a pending transaction
475            let mut pending_tx = create_test_transaction(&relayer.id);
476            pending_tx.id = "next-pending-tx".to_string();
477            pending_tx.status = TransactionStatus::Pending;
478            let captured_pending_tx = pending_tx.clone();
479            mocks
480                .tx_repo
481                .expect_find_by_status()
482                .with(
483                    mockall::predicate::eq(relayer.id.clone()),
484                    mockall::predicate::eq(vec![TransactionStatus::Pending]),
485                )
486                .times(1)
487                .returning(move |_, _| Ok(vec![captured_pending_tx.clone()]));
488
489            // Mock produce_transaction_request_job for the next pending transaction
490            mocks
491                .job_producer
492                .expect_produce_transaction_request_job()
493                .withf(move |job, _delay| job.transaction_id == "next-pending-tx")
494                .times(1)
495                .returning(|_, _| Box::pin(async { Ok(()) }));
496
497            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
498            let mut tx = create_test_transaction(&relayer.id);
499            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
500                data.signatures.push(dummy_signature());
501                data.sequence_number = Some(42); // Set sequence number
502            }
503
504            let res = handler.submit_transaction_impl(tx).await;
505
506            // Should return error but next transaction should be enqueued
507            assert!(res.is_err());
508            matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
509        }
510
511        #[tokio::test]
512        async fn test_submit_bad_sequence_resets_and_retries() {
513            let relayer = create_test_relayer();
514            let mut mocks = default_test_mocks();
515
516            // Mock provider to return bad sequence error
517            mocks.provider.expect_send_transaction().returning(|_| {
518                Box::pin(async {
519                    Err(ProviderError::Other(
520                        "transaction submission failed: TxBadSeq".to_string(),
521                    ))
522                })
523            });
524
525            // Mock get_account for sync_sequence_from_chain
526            mocks.provider.expect_get_account().times(1).returning(|_| {
527                Box::pin(async {
528                    use soroban_rs::xdr::{
529                        AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
530                        String32, Thresholds, Uint256,
531                    };
532                    use stellar_strkey::ed25519;
533
534                    let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
535                    let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
536
537                    Ok(AccountEntry {
538                        account_id,
539                        balance: 1000000,
540                        seq_num: SequenceNumber(100),
541                        num_sub_entries: 0,
542                        inflation_dest: None,
543                        flags: 0,
544                        home_domain: String32::default(),
545                        thresholds: Thresholds([1, 1, 1, 1]),
546                        signers: Default::default(),
547                        ext: AccountEntryExt::V0,
548                    })
549                })
550            });
551
552            // Mock counter set for sync_sequence_from_chain
553            mocks
554                .counter
555                .expect_set()
556                .times(1)
557                .returning(|_, _, _| Box::pin(async { Ok(()) }));
558
559            // Mock partial_update for reset_transaction_for_retry - should reset to Pending
560            mocks
561                .tx_repo
562                .expect_partial_update()
563                .withf(|_, upd| upd.status == Some(TransactionStatus::Pending))
564                .times(1)
565                .returning(|id, upd| {
566                    let mut tx = create_test_transaction("relayer-1");
567                    tx.id = id;
568                    tx.status = upd.status.unwrap();
569                    if let Some(network_data) = upd.network_data {
570                        tx.network_data = network_data;
571                    }
572                    Ok::<_, RepositoryError>(tx)
573                });
574
575            // Mock produce_transaction_request_job for re-enqueue
576            mocks
577                .job_producer
578                .expect_produce_transaction_request_job()
579                .times(1)
580                .returning(|_, _| Box::pin(async { Ok(()) }));
581
582            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
583            let mut tx = create_test_transaction(&relayer.id);
584            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
585                data.signatures.push(dummy_signature());
586                data.sequence_number = Some(42);
587            }
588
589            let result = handler.submit_transaction_impl(tx).await;
590
591            // Should return Ok since we're handling the retry
592            assert!(result.is_ok());
593            let reset_tx = result.unwrap();
594            assert_eq!(reset_tx.status, TransactionStatus::Pending);
595
596            // Verify stellar data was reset
597            if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
598                assert!(data.sequence_number.is_none());
599                assert!(data.signatures.is_empty());
600                assert!(data.hash.is_none());
601                assert!(data.signed_envelope_xdr.is_none());
602            } else {
603                panic!("Expected Stellar transaction data");
604            }
605        }
606    }
607}