openzeppelin_relayer/jobs/handlers/
token_swap_request_handler.rs

1//! Unified swap request handling worker implementation.
2//!
3//! This module implements the token swap request handling worker that processes
4//! swap jobs from the queue for all supported networks (Solana and Stellar).
5
6use actix_web::web::ThinData;
7use apalis::prelude::{Attempt, Data, Error};
8use eyre::Result as EyreResult;
9use tracing::{debug, info, instrument};
10
11use crate::{
12    constants::WORKER_TOKEN_SWAP_REQUEST_RETRIES,
13    domain::get_network_relayer,
14    jobs::{handle_result, Job, TokenSwapRequest},
15    models::DefaultAppState,
16    observability::request_id::set_request_id,
17};
18
19/// Handles incoming swap jobs from the queue.
20///
21/// # Arguments
22/// * `job` - The swap job containing relayer ID
23/// * `context` - Application state containing services
24///
25/// # Returns
26/// * `Result<(), Error>` - Success or failure of swap processing
27#[instrument(
28    level = "debug",
29    skip(job, context),
30    fields(
31        request_id = ?job.request_id,
32        job_id = %job.message_id,
33        job_type = %job.job_type.to_string(),
34        attempt = %attempt.current(),
35        relayer_id = %job.data.relayer_id,
36    )
37)]
38pub async fn token_swap_request_handler(
39    job: Job<TokenSwapRequest>,
40    context: Data<ThinData<DefaultAppState>>,
41    attempt: Attempt,
42) -> std::result::Result<(), Error> {
43    if let Some(request_id) = job.request_id.clone() {
44        set_request_id(request_id);
45    }
46
47    debug!(relayer_id = %job.data.relayer_id, "handling token swap request");
48
49    let result = handle_request(job.data, context).await;
50
51    handle_result(
52        result,
53        attempt,
54        "TokenSwapRequest",
55        WORKER_TOKEN_SWAP_REQUEST_RETRIES,
56    )
57}
58
59#[derive(Default, Debug, Clone)]
60pub struct TokenSwapCronReminder();
61
62/// Handles incoming swap jobs from the cron queue.
63#[instrument(
64    level = "info",
65    skip(_job, data, relayer_id),
66    fields(
67        job_type = "token_swap_cron",
68        attempt = %attempt.current(),
69    ),
70    err
71)]
72pub async fn token_swap_cron_handler(
73    _job: TokenSwapCronReminder,
74    relayer_id: Data<String>,
75    data: Data<ThinData<DefaultAppState>>,
76    attempt: Attempt,
77) -> std::result::Result<(), Error> {
78    info!(
79        relayer_id = %*relayer_id,
80        "handling token swap cron request"
81    );
82
83    let result = handle_request(
84        TokenSwapRequest {
85            relayer_id: relayer_id.to_string(),
86        },
87        data,
88    )
89    .await;
90
91    handle_result(
92        result,
93        attempt,
94        "TokenSwapRequest",
95        WORKER_TOKEN_SWAP_REQUEST_RETRIES,
96    )
97}
98
99async fn handle_request(
100    request: TokenSwapRequest,
101    context: Data<ThinData<DefaultAppState>>,
102) -> EyreResult<()> {
103    debug!(relayer_id = %request.relayer_id, "processing token swap");
104
105    let relayer = get_network_relayer(request.relayer_id.clone(), &context).await?;
106
107    relayer
108        .handle_token_swap_request(request.relayer_id.clone())
109        .await
110        .map_err(|e| eyre::eyre!("Failed to handle token swap request: {}", e))?;
111
112    Ok(())
113}
114
115#[cfg(test)]
116mod tests {}