openzeppelin_relayer/domain/transaction/stellar/
submit.rs1use chrono::Utc;
6use tracing::{info, warn};
7
8use super::{is_final_state, utils::is_bad_sequence_error, StellarRelayerTransaction};
9use crate::{
10 constants::STELLAR_BAD_SEQUENCE_RETRY_DELAY_SECONDS,
11 jobs::JobProducerTrait,
12 models::{
13 NetworkTransactionData, RelayerRepoModel, TransactionError, TransactionRepoModel,
14 TransactionStatus, TransactionUpdateRequest,
15 },
16 repositories::{Repository, TransactionCounterTrait, TransactionRepository},
17 services::{provider::StellarProviderTrait, signer::Signer},
18 utils::calculate_scheduled_timestamp,
19};
20
21impl<R, T, J, S, P, C, D> StellarRelayerTransaction<R, T, J, S, P, C, D>
22where
23 R: Repository<RelayerRepoModel, String> + Send + Sync,
24 T: TransactionRepository + Send + Sync,
25 J: JobProducerTrait + Send + Sync,
26 S: Signer + Send + Sync,
27 P: StellarProviderTrait + Send + Sync,
28 C: TransactionCounterTrait + Send + Sync,
29 D: crate::services::stellar_dex::StellarDexServiceTrait + Send + Sync + 'static,
30{
31 pub async fn submit_transaction_impl(
34 &self,
35 tx: TransactionRepoModel,
36 ) -> Result<TransactionRepoModel, TransactionError> {
37 info!(tx_id = %tx.id, status = ?tx.status, "submitting stellar transaction");
38
39 if is_final_state(&tx.status) {
41 warn!(
42 tx_id = %tx.id,
43 status = ?tx.status,
44 "transaction already in final state, skipping submission"
45 );
46 return Ok(tx);
47 }
48
49 match self.submit_core(tx.clone()).await {
51 Ok(submitted_tx) => Ok(submitted_tx),
52 Err(error) => {
53 self.handle_submit_failure(tx, error).await
55 }
56 }
57 }
58
59 async fn submit_core(
61 &self,
62 tx: TransactionRepoModel,
63 ) -> Result<TransactionRepoModel, TransactionError> {
64 let stellar_data = tx.network_data.get_stellar_transaction_data()?;
65 let tx_envelope = stellar_data
66 .get_envelope_for_submission()
67 .map_err(TransactionError::from)?;
68
69 let hash = self
70 .provider()
71 .send_transaction(&tx_envelope)
72 .await
73 .map_err(TransactionError::from)?;
74
75 let tx_hash_hex = hex::encode(hash.as_slice());
76 let updated_stellar_data = stellar_data.with_hash(tx_hash_hex.clone());
77
78 let mut hashes = tx.hashes.clone();
79 hashes.push(tx_hash_hex);
80
81 let update_req = TransactionUpdateRequest {
82 status: Some(TransactionStatus::Submitted),
83 sent_at: Some(Utc::now().to_rfc3339()),
84 network_data: Some(NetworkTransactionData::Stellar(updated_stellar_data)),
85 hashes: Some(hashes),
86 ..Default::default()
87 };
88
89 let updated_tx = self
90 .transaction_repository()
91 .partial_update(tx.id.clone(), update_req)
92 .await?;
93
94 self.send_transaction_update_notification(&updated_tx).await;
96
97 Ok(updated_tx)
98 }
99
100 async fn handle_submit_failure(
103 &self,
104 tx: TransactionRepoModel,
105 error: TransactionError,
106 ) -> Result<TransactionRepoModel, TransactionError> {
107 let error_reason = format!("Submission failed: {error}");
108 let tx_id = tx.id.clone();
109 warn!(reason = %error_reason, "transaction submission failed");
110
111 if is_bad_sequence_error(&error_reason) {
112 if let Ok(stellar_data) = tx.network_data.get_stellar_transaction_data() {
114 info!("syncing sequence from chain after bad sequence error");
115 match self
116 .sync_sequence_from_chain(&stellar_data.source_account)
117 .await
118 {
119 Ok(()) => {
120 info!("successfully synced sequence from chain");
121 }
122 Err(sync_error) => {
123 warn!(error = %sync_error, "failed to sync sequence from chain");
124 }
125 }
126 }
127
128 info!("bad sequence error detected, resetting and re-enqueueing");
130
131 match self.reset_transaction_for_retry(tx.clone()).await {
133 Ok(reset_tx) => {
134 if let Err(e) = self
136 .send_transaction_request_job(
137 &reset_tx,
138 Some(calculate_scheduled_timestamp(
139 STELLAR_BAD_SEQUENCE_RETRY_DELAY_SECONDS,
140 )),
141 )
142 .await
143 {
144 warn!(error = %e, "failed to re-enqueue transaction after reset");
145 } else {
146 info!("transaction reset and re-enqueued for retry through pipeline");
147 }
148
149 return Ok(reset_tx);
151 }
152 Err(reset_error) => {
153 warn!(error = %reset_error, "failed to reset transaction for retry");
154 }
156 }
157 }
158
159 let update_request = TransactionUpdateRequest {
162 status: Some(TransactionStatus::Failed),
163 status_reason: Some(error_reason.clone()),
164 ..Default::default()
165 };
166 let _failed_tx = match self
167 .finalize_transaction_state(tx_id.clone(), update_request)
168 .await
169 {
170 Ok(updated_tx) => updated_tx,
171 Err(finalize_error) => {
172 warn!(error = %finalize_error, "failed to mark transaction as failed, continuing with lane cleanup");
173 tx
174 }
175 };
176
177 if let Err(enqueue_error) = self.enqueue_next_pending_transaction(&tx_id).await {
179 warn!(error = %enqueue_error, "failed to enqueue next pending transaction after submission failure");
180 }
181
182 info!(error = %error_reason, "transaction submission failure handled");
183
184 Err(error)
185 }
186
187 pub async fn resubmit_transaction_impl(
189 &self,
190 tx: TransactionRepoModel,
191 ) -> Result<TransactionRepoModel, TransactionError> {
192 self.submit_transaction_impl(tx).await
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use soroban_rs::xdr::{Hash, WriteXdr};
200
201 use crate::domain::transaction::stellar::test_helpers::*;
202
203 mod submit_transaction_tests {
204 use crate::{models::RepositoryError, services::provider::ProviderError};
205
206 use super::*;
207
208 #[tokio::test]
209 async fn submit_transaction_happy_path() {
210 let relayer = create_test_relayer();
211 let mut mocks = default_test_mocks();
212
213 mocks
215 .provider
216 .expect_send_transaction()
217 .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
218
219 mocks
221 .tx_repo
222 .expect_partial_update()
223 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
224 .returning(|id, upd| {
225 let mut tx = create_test_transaction("relayer-1");
226 tx.id = id;
227 tx.status = upd.status.unwrap();
228 Ok::<_, RepositoryError>(tx)
229 });
230
231 mocks
233 .job_producer
234 .expect_produce_send_notification_job()
235 .times(1)
236 .returning(|_, _| Box::pin(async { Ok(()) }));
237
238 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
239
240 let mut tx = create_test_transaction(&relayer.id);
241 if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
242 d.signatures.push(dummy_signature());
243 }
244
245 let res = handler.submit_transaction_impl(tx).await.unwrap();
246 assert_eq!(res.status, TransactionStatus::Submitted);
247 }
248
249 #[tokio::test]
250 async fn submit_transaction_provider_error_marks_failed() {
251 let relayer = create_test_relayer();
252 let mut mocks = default_test_mocks();
253
254 mocks.provider.expect_send_transaction().returning(|_| {
256 Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
257 });
258
259 mocks
261 .tx_repo
262 .expect_partial_update()
263 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
264 .returning(|id, upd| {
265 let mut tx = create_test_transaction("relayer-1");
266 tx.id = id;
267 tx.status = upd.status.unwrap();
268 Ok::<_, RepositoryError>(tx)
269 });
270
271 mocks
273 .job_producer
274 .expect_produce_send_notification_job()
275 .times(1)
276 .returning(|_, _| Box::pin(async { Ok(()) }));
277
278 mocks
280 .tx_repo
281 .expect_find_by_status()
282 .returning(|_, _| Ok(vec![])); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
285 let mut tx = create_test_transaction(&relayer.id);
286 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
287 data.signatures.push(dummy_signature());
288 data.sequence_number = Some(42); }
290
291 let res = handler.submit_transaction_impl(tx).await;
292
293 assert!(res.is_err());
295 matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
296 }
297
298 #[tokio::test]
299 async fn submit_transaction_repository_error_marks_failed() {
300 let relayer = create_test_relayer();
301 let mut mocks = default_test_mocks();
302
303 mocks
305 .provider
306 .expect_send_transaction()
307 .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
308
309 mocks
311 .tx_repo
312 .expect_partial_update()
313 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
314 .returning(|_, _| Err(RepositoryError::Unknown("Database error".to_string())));
315
316 mocks
318 .tx_repo
319 .expect_partial_update()
320 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
321 .returning(|id, upd| {
322 let mut tx = create_test_transaction("relayer-1");
323 tx.id = id;
324 tx.status = upd.status.unwrap();
325 Ok::<_, RepositoryError>(tx)
326 });
327
328 mocks
330 .job_producer
331 .expect_produce_send_notification_job()
332 .times(1)
333 .returning(|_, _| Box::pin(async { Ok(()) }));
334
335 mocks
337 .tx_repo
338 .expect_find_by_status()
339 .returning(|_, _| Ok(vec![])); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
342 let mut tx = create_test_transaction(&relayer.id);
343 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
344 data.signatures.push(dummy_signature());
345 data.sequence_number = Some(42); }
347
348 let res = handler.submit_transaction_impl(tx).await;
349
350 assert!(res.is_err());
352 }
353
354 #[tokio::test]
355 async fn submit_transaction_uses_signed_envelope_xdr() {
356 let relayer = create_test_relayer();
357 let mut mocks = default_test_mocks();
358
359 let mut tx = create_test_transaction(&relayer.id);
361 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
362 data.signatures.push(dummy_signature());
363 let envelope = data.get_envelope_for_submission().unwrap();
365 let xdr = envelope
366 .to_xdr_base64(soroban_rs::xdr::Limits::none())
367 .unwrap();
368 data.signed_envelope_xdr = Some(xdr);
369 }
370
371 mocks
373 .provider
374 .expect_send_transaction()
375 .returning(|_| Box::pin(async { Ok(Hash([2u8; 32])) }));
376
377 mocks
379 .tx_repo
380 .expect_partial_update()
381 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
382 .returning(|id, upd| {
383 let mut tx = create_test_transaction("relayer-1");
384 tx.id = id;
385 tx.status = upd.status.unwrap();
386 Ok::<_, RepositoryError>(tx)
387 });
388
389 mocks
391 .job_producer
392 .expect_produce_send_notification_job()
393 .times(1)
394 .returning(|_, _| Box::pin(async { Ok(()) }));
395
396 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
397 let res = handler.submit_transaction_impl(tx).await.unwrap();
398
399 assert_eq!(res.status, TransactionStatus::Submitted);
400 }
401
402 #[tokio::test]
403 async fn resubmit_transaction_delegates_to_submit() {
404 let relayer = create_test_relayer();
405 let mut mocks = default_test_mocks();
406
407 mocks
409 .provider
410 .expect_send_transaction()
411 .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
412
413 mocks
415 .tx_repo
416 .expect_partial_update()
417 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
418 .returning(|id, upd| {
419 let mut tx = create_test_transaction("relayer-1");
420 tx.id = id;
421 tx.status = upd.status.unwrap();
422 Ok::<_, RepositoryError>(tx)
423 });
424
425 mocks
427 .job_producer
428 .expect_produce_send_notification_job()
429 .times(1)
430 .returning(|_, _| Box::pin(async { Ok(()) }));
431
432 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
433
434 let mut tx = create_test_transaction(&relayer.id);
435 if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
436 d.signatures.push(dummy_signature());
437 }
438
439 let res = handler.resubmit_transaction_impl(tx).await.unwrap();
440 assert_eq!(res.status, TransactionStatus::Submitted);
441 }
442
443 #[tokio::test]
444 async fn submit_transaction_failure_enqueues_next_transaction() {
445 let relayer = create_test_relayer();
446 let mut mocks = default_test_mocks();
447
448 mocks.provider.expect_send_transaction().returning(|_| {
450 Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
451 });
452
453 mocks
457 .tx_repo
458 .expect_partial_update()
459 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
460 .returning(|id, upd| {
461 let mut tx = create_test_transaction("relayer-1");
462 tx.id = id;
463 tx.status = upd.status.unwrap();
464 Ok::<_, RepositoryError>(tx)
465 });
466
467 mocks
469 .job_producer
470 .expect_produce_send_notification_job()
471 .times(1)
472 .returning(|_, _| Box::pin(async { Ok(()) }));
473
474 let mut pending_tx = create_test_transaction(&relayer.id);
476 pending_tx.id = "next-pending-tx".to_string();
477 pending_tx.status = TransactionStatus::Pending;
478 let captured_pending_tx = pending_tx.clone();
479 mocks
480 .tx_repo
481 .expect_find_by_status()
482 .with(
483 mockall::predicate::eq(relayer.id.clone()),
484 mockall::predicate::eq(vec![TransactionStatus::Pending]),
485 )
486 .times(1)
487 .returning(move |_, _| Ok(vec![captured_pending_tx.clone()]));
488
489 mocks
491 .job_producer
492 .expect_produce_transaction_request_job()
493 .withf(move |job, _delay| job.transaction_id == "next-pending-tx")
494 .times(1)
495 .returning(|_, _| Box::pin(async { Ok(()) }));
496
497 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
498 let mut tx = create_test_transaction(&relayer.id);
499 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
500 data.signatures.push(dummy_signature());
501 data.sequence_number = Some(42); }
503
504 let res = handler.submit_transaction_impl(tx).await;
505
506 assert!(res.is_err());
508 matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
509 }
510
511 #[tokio::test]
512 async fn test_submit_bad_sequence_resets_and_retries() {
513 let relayer = create_test_relayer();
514 let mut mocks = default_test_mocks();
515
516 mocks.provider.expect_send_transaction().returning(|_| {
518 Box::pin(async {
519 Err(ProviderError::Other(
520 "transaction submission failed: TxBadSeq".to_string(),
521 ))
522 })
523 });
524
525 mocks.provider.expect_get_account().times(1).returning(|_| {
527 Box::pin(async {
528 use soroban_rs::xdr::{
529 AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
530 String32, Thresholds, Uint256,
531 };
532 use stellar_strkey::ed25519;
533
534 let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
535 let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
536
537 Ok(AccountEntry {
538 account_id,
539 balance: 1000000,
540 seq_num: SequenceNumber(100),
541 num_sub_entries: 0,
542 inflation_dest: None,
543 flags: 0,
544 home_domain: String32::default(),
545 thresholds: Thresholds([1, 1, 1, 1]),
546 signers: Default::default(),
547 ext: AccountEntryExt::V0,
548 })
549 })
550 });
551
552 mocks
554 .counter
555 .expect_set()
556 .times(1)
557 .returning(|_, _, _| Box::pin(async { Ok(()) }));
558
559 mocks
561 .tx_repo
562 .expect_partial_update()
563 .withf(|_, upd| upd.status == Some(TransactionStatus::Pending))
564 .times(1)
565 .returning(|id, upd| {
566 let mut tx = create_test_transaction("relayer-1");
567 tx.id = id;
568 tx.status = upd.status.unwrap();
569 if let Some(network_data) = upd.network_data {
570 tx.network_data = network_data;
571 }
572 Ok::<_, RepositoryError>(tx)
573 });
574
575 mocks
577 .job_producer
578 .expect_produce_transaction_request_job()
579 .times(1)
580 .returning(|_, _| Box::pin(async { Ok(()) }));
581
582 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
583 let mut tx = create_test_transaction(&relayer.id);
584 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
585 data.signatures.push(dummy_signature());
586 data.sequence_number = Some(42);
587 }
588
589 let result = handler.submit_transaction_impl(tx).await;
590
591 assert!(result.is_ok());
593 let reset_tx = result.unwrap();
594 assert_eq!(reset_tx.status, TransactionStatus::Pending);
595
596 if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
598 assert!(data.sequence_number.is_none());
599 assert!(data.signatures.is_empty());
600 assert!(data.hash.is_none());
601 assert!(data.signed_envelope_xdr.is_none());
602 } else {
603 panic!("Expected Stellar transaction data");
604 }
605 }
606 }
607}