openzeppelin_relayer/services/plugins/
mod.rs

1//! Plugins service module for handling plugins execution and interaction with relayer
2
3use std::{fmt, sync::Arc};
4
5use crate::observability::request_id::get_request_id;
6use crate::{
7    jobs::JobProducerTrait,
8    models::{
9        AppState, NetworkRepoModel, NotificationRepoModel, PluginCallRequest, PluginMetadata,
10        PluginModel, RelayerRepoModel, SignerRepoModel, ThinDataAppState, TransactionRepoModel,
11    },
12    repositories::{
13        ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
14        Repository, TransactionCounterTrait, TransactionRepository,
15    },
16};
17use actix_web::web;
18use async_trait::async_trait;
19use serde::{Deserialize, Serialize};
20use thiserror::Error;
21use uuid::Uuid;
22
23pub mod runner;
24pub use runner::*;
25
26pub mod relayer_api;
27pub use relayer_api::*;
28
29pub mod script_executor;
30pub use script_executor::*;
31
32pub mod socket;
33pub use socket::*;
34
35#[cfg(test)]
36use mockall::automock;
37
38#[derive(Error, Debug, Serialize)]
39pub enum PluginError {
40    #[error("Socket error: {0}")]
41    SocketError(String),
42    #[error("Plugin error: {0}")]
43    PluginError(String),
44    #[error("Relayer error: {0}")]
45    RelayerError(String),
46    #[error("Plugin execution error: {0}")]
47    PluginExecutionError(String),
48    #[error("Script execution timed out after {0} seconds")]
49    ScriptTimeout(u64),
50    #[error("Invalid method: {0}")]
51    InvalidMethod(String),
52    #[error("Invalid payload: {0}")]
53    InvalidPayload(String),
54    #[error("{0}")]
55    HandlerError(Box<PluginHandlerPayload>),
56}
57
58impl PluginError {
59    /// Enriches the error with traces if it's a HandlerError variant.
60    /// For other variants, returns the error unchanged.
61    pub fn with_traces(self, traces: Vec<serde_json::Value>) -> Self {
62        match self {
63            PluginError::HandlerError(mut payload) => {
64                payload.append_traces(traces);
65                PluginError::HandlerError(payload)
66            }
67            other => other,
68        }
69    }
70}
71
72impl From<PluginError> for String {
73    fn from(error: PluginError) -> Self {
74        error.to_string()
75    }
76}
77
78#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
79pub struct PluginCallResponse {
80    /// The plugin result, parsed as JSON when possible; otherwise a string
81    pub result: serde_json::Value,
82    /// Optional metadata captured during plugin execution (logs/traces)
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub metadata: Option<PluginMetadata>,
85}
86
87#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
88pub struct PluginHandlerError {
89    #[serde(skip_serializing_if = "Option::is_none")]
90    pub code: Option<String>,
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub details: Option<serde_json::Value>,
93}
94
95#[derive(Debug)]
96pub struct PluginHandlerResponse {
97    pub status: u16,
98    pub message: String,
99    pub error: PluginHandlerError,
100    pub metadata: Option<PluginMetadata>,
101}
102
103#[derive(Debug, Serialize)]
104pub struct PluginHandlerPayload {
105    pub status: u16,
106    pub message: String,
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub code: Option<String>,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub details: Option<serde_json::Value>,
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub logs: Option<Vec<LogEntry>>,
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub traces: Option<Vec<serde_json::Value>>,
115}
116
117impl PluginHandlerPayload {
118    fn append_traces(&mut self, traces: Vec<serde_json::Value>) {
119        match &mut self.traces {
120            Some(existing) => existing.extend(traces),
121            None => self.traces = Some(traces),
122        }
123    }
124
125    fn into_response(self, emit_logs: bool, emit_traces: bool) -> PluginHandlerResponse {
126        let logs = if emit_logs { self.logs } else { None };
127        let traces = if emit_traces { self.traces } else { None };
128        let message = derive_handler_message(&self.message, logs.as_deref());
129        let metadata = build_metadata(logs, traces);
130
131        PluginHandlerResponse {
132            status: self.status,
133            message,
134            error: PluginHandlerError {
135                code: self.code,
136                details: self.details,
137            },
138            metadata,
139        }
140    }
141}
142
143impl fmt::Display for PluginHandlerPayload {
144    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145        f.write_str(&self.message)
146    }
147}
148
149fn derive_handler_message(message: &str, logs: Option<&[LogEntry]>) -> String {
150    if !message.trim().is_empty() {
151        return message.to_string();
152    }
153
154    if let Some(logs) = logs {
155        if let Some(entry) = logs
156            .iter()
157            .rev()
158            .find(|entry| matches!(entry.level, LogLevel::Error | LogLevel::Warn))
159        {
160            return entry.message.clone();
161        }
162
163        if let Some(entry) = logs.last() {
164            return entry.message.clone();
165        }
166    }
167
168    "Plugin execution failed".to_string()
169}
170
171fn build_metadata(
172    logs: Option<Vec<LogEntry>>,
173    traces: Option<Vec<serde_json::Value>>,
174) -> Option<PluginMetadata> {
175    if logs.is_some() || traces.is_some() {
176        Some(PluginMetadata { logs, traces })
177    } else {
178        None
179    }
180}
181
182#[derive(Debug)]
183pub enum PluginCallResult {
184    Success(PluginCallResponse),
185    Handler(PluginHandlerResponse),
186    Fatal(PluginError),
187}
188
189#[derive(Default)]
190pub struct PluginService<R: PluginRunnerTrait> {
191    runner: R,
192}
193
194impl<R: PluginRunnerTrait> PluginService<R> {
195    pub fn new(runner: R) -> Self {
196        Self { runner }
197    }
198
199    fn resolve_plugin_path(plugin_path: &str) -> String {
200        if plugin_path.starts_with("plugins/") {
201            plugin_path.to_string()
202        } else {
203            format!("plugins/{plugin_path}")
204        }
205    }
206
207    #[allow(clippy::type_complexity)]
208    async fn call_plugin<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
209        &self,
210        plugin: PluginModel,
211        plugin_call_request: PluginCallRequest,
212        state: Arc<ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>,
213    ) -> PluginCallResult
214    where
215        J: JobProducerTrait + Send + Sync + 'static,
216        RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
217        TR: TransactionRepository
218            + Repository<TransactionRepoModel, String>
219            + Send
220            + Sync
221            + 'static,
222        NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
223        NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
224        SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
225        TCR: TransactionCounterTrait + Send + Sync + 'static,
226        PR: PluginRepositoryTrait + Send + Sync + 'static,
227        AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
228    {
229        let socket_path = format!("/tmp/{}.sock", Uuid::new_v4());
230        let script_path = Self::resolve_plugin_path(&plugin.path);
231        let script_params = plugin_call_request.params.to_string();
232        let headers_json = plugin_call_request
233            .headers
234            .map(|h| serde_json::to_string(&h).unwrap_or_default());
235
236        let result = self
237            .runner
238            .run(
239                plugin.id.clone(),
240                &socket_path,
241                script_path,
242                plugin.timeout,
243                script_params,
244                get_request_id(),
245                headers_json,
246                state,
247            )
248            .await;
249
250        match result {
251            Ok(script_result) => {
252                // Include logs/traces only if enabled via plugin config
253                let logs = if plugin.emit_logs {
254                    Some(script_result.logs)
255                } else {
256                    None
257                };
258                let traces = if plugin.emit_traces {
259                    Some(script_result.trace)
260                } else {
261                    None
262                };
263                let metadata = build_metadata(logs, traces);
264
265                // Parse return_value string into JSON when possible; otherwise string
266                let result = if script_result.return_value.trim() == "undefined" {
267                    serde_json::Value::Null
268                } else {
269                    serde_json::from_str::<serde_json::Value>(&script_result.return_value)
270                        .unwrap_or(serde_json::Value::String(script_result.return_value))
271                };
272
273                PluginCallResult::Success(PluginCallResponse { result, metadata })
274            }
275            Err(e) => match e {
276                PluginError::HandlerError(payload) => {
277                    let failure = payload.into_response(plugin.emit_logs, plugin.emit_traces);
278                    let has_logs = failure
279                        .metadata
280                        .as_ref()
281                        .and_then(|meta| meta.logs.as_ref())
282                        .is_some();
283                    let has_traces = failure
284                        .metadata
285                        .as_ref()
286                        .and_then(|meta| meta.traces.as_ref())
287                        .is_some();
288
289                    tracing::debug!(
290                        status = failure.status,
291                        message = %failure.message,
292                        code = ?failure.error.code.as_ref(),
293                        details = ?failure.error.details.as_ref(),
294                        has_logs,
295                        has_traces,
296                        "Plugin handler returned error"
297                    );
298
299                    PluginCallResult::Handler(failure)
300                }
301                other => {
302                    // This is an actual execution/infrastructure failure
303                    tracing::error!("Plugin execution failed: {:?}", other);
304                    PluginCallResult::Fatal(other)
305                }
306            },
307        }
308    }
309}
310
311#[async_trait]
312#[cfg_attr(test, automock)]
313pub trait PluginServiceTrait<J, TR, RR, NR, NFR, SR, TCR, PR, AKR>: Send + Sync
314where
315    J: JobProducerTrait + 'static,
316    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
317    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
318    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
319    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
320    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
321    TCR: TransactionCounterTrait + Send + Sync + 'static,
322    PR: PluginRepositoryTrait + Send + Sync + 'static,
323    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
324{
325    fn new(runner: PluginRunner) -> Self;
326    async fn call_plugin(
327        &self,
328        plugin: PluginModel,
329        plugin_call_request: PluginCallRequest,
330        state: Arc<web::ThinData<AppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>>,
331    ) -> PluginCallResult;
332}
333
334#[async_trait]
335impl<J, TR, RR, NR, NFR, SR, TCR, PR, AKR> PluginServiceTrait<J, TR, RR, NR, NFR, SR, TCR, PR, AKR>
336    for PluginService<PluginRunner>
337where
338    J: JobProducerTrait + 'static,
339    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
340    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
341    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
342    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
343    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
344    TCR: TransactionCounterTrait + Send + Sync + 'static,
345    PR: PluginRepositoryTrait + Send + Sync + 'static,
346    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
347{
348    fn new(runner: PluginRunner) -> Self {
349        Self::new(runner)
350    }
351
352    async fn call_plugin(
353        &self,
354        plugin: PluginModel,
355        plugin_call_request: PluginCallRequest,
356        state: Arc<web::ThinData<AppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>>,
357    ) -> PluginCallResult {
358        self.call_plugin(plugin, plugin_call_request, state).await
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use std::time::Duration;
365
366    use crate::{
367        constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS,
368        jobs::MockJobProducerTrait,
369        models::PluginModel,
370        repositories::{
371            ApiKeyRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage,
372            PluginRepositoryStorage, RelayerRepositoryStorage, SignerRepositoryStorage,
373            TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
374        },
375        utils::mocks::mockutils::create_mock_app_state,
376    };
377
378    use super::*;
379
380    #[test]
381    fn test_resolve_plugin_path() {
382        assert_eq!(
383            PluginService::<MockPluginRunnerTrait>::resolve_plugin_path("plugins/examples/test.ts"),
384            "plugins/examples/test.ts"
385        );
386
387        assert_eq!(
388            PluginService::<MockPluginRunnerTrait>::resolve_plugin_path("examples/test.ts"),
389            "plugins/examples/test.ts"
390        );
391
392        assert_eq!(
393            PluginService::<MockPluginRunnerTrait>::resolve_plugin_path("test.ts"),
394            "plugins/test.ts"
395        );
396    }
397
398    #[tokio::test]
399    async fn test_call_plugin() {
400        let plugin = PluginModel {
401            id: "test-plugin".to_string(),
402            path: "test-path".to_string(),
403            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
404            emit_logs: true,
405            emit_traces: false,
406        };
407        let app_state =
408            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
409
410        let mut plugin_runner = MockPluginRunnerTrait::default();
411
412        plugin_runner
413            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
414            .returning(|_, _, _, _, _, _, _, _| {
415                Ok(ScriptResult {
416                    logs: vec![LogEntry {
417                        level: LogLevel::Log,
418                        message: "test-log".to_string(),
419                    }],
420                    error: "test-error".to_string(),
421                    return_value: "test-result".to_string(),
422                    trace: Vec::new(),
423                })
424            });
425
426        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
427        let outcome = plugin_service
428            .call_plugin(
429                plugin,
430                PluginCallRequest {
431                    params: serde_json::Value::Null,
432                    headers: None,
433                },
434                Arc::new(web::ThinData(app_state)),
435            )
436            .await;
437        match outcome {
438            PluginCallResult::Success(result) => {
439                // result should be the string since it is not JSON
440                assert_eq!(
441                    result.result,
442                    serde_json::Value::String("test-result".to_string())
443                );
444                // emit_logs=true -> logs should be present in metadata
445                assert!(result.metadata.and_then(|meta| meta.logs).is_some());
446            }
447            PluginCallResult::Handler(_) | PluginCallResult::Fatal(_) => {
448                panic!("expected success outcome")
449            }
450        }
451    }
452
453    #[tokio::test]
454    async fn test_from_plugin_error_to_string() {
455        let error = PluginError::PluginExecutionError("test-error".to_string());
456        let result: String = error.into();
457        assert_eq!(result, "Plugin execution error: test-error");
458    }
459
460    #[test]
461    fn test_plugin_error_with_traces_handler_error() {
462        let payload = PluginHandlerPayload {
463            status: 400,
464            message: "test message".to_string(),
465            code: Some("TEST_CODE".to_string()),
466            details: None,
467            logs: None,
468            traces: Some(vec![serde_json::json!({"trace": "1"})]),
469        };
470        let error = PluginError::HandlerError(Box::new(payload));
471        let new_traces = vec![
472            serde_json::json!({"trace": "2"}),
473            serde_json::json!({"trace": "3"}),
474        ];
475
476        let enriched_error = error.with_traces(new_traces);
477
478        match enriched_error {
479            PluginError::HandlerError(payload) => {
480                let traces = payload.traces.unwrap();
481                assert_eq!(traces.len(), 3);
482                assert_eq!(traces[0], serde_json::json!({"trace": "1"}));
483                assert_eq!(traces[1], serde_json::json!({"trace": "2"}));
484                assert_eq!(traces[2], serde_json::json!({"trace": "3"}));
485            }
486            _ => panic!("Expected HandlerError variant"),
487        }
488    }
489
490    #[test]
491    fn test_plugin_error_with_traces_other_variants() {
492        let error = PluginError::PluginExecutionError("test".to_string());
493        let new_traces = vec![serde_json::json!({"trace": "1"})];
494
495        let result = error.with_traces(new_traces);
496
497        match result {
498            PluginError::PluginExecutionError(msg) => assert_eq!(msg, "test"),
499            _ => panic!("Expected PluginExecutionError variant"),
500        }
501    }
502
503    #[test]
504    fn test_derive_handler_message_with_message() {
505        let result = derive_handler_message("Custom error message", None);
506        assert_eq!(result, "Custom error message");
507    }
508
509    #[test]
510    fn test_derive_handler_message_with_error_log() {
511        let logs = vec![
512            LogEntry {
513                level: LogLevel::Log,
514                message: "info log".to_string(),
515            },
516            LogEntry {
517                level: LogLevel::Error,
518                message: "error log".to_string(),
519            },
520        ];
521        let result = derive_handler_message("", Some(&logs));
522        assert_eq!(result, "error log");
523    }
524
525    #[test]
526    fn test_derive_handler_message_with_warn_log() {
527        let logs = vec![
528            LogEntry {
529                level: LogLevel::Log,
530                message: "info log".to_string(),
531            },
532            LogEntry {
533                level: LogLevel::Warn,
534                message: "warn log".to_string(),
535            },
536        ];
537        let result = derive_handler_message("", Some(&logs));
538        assert_eq!(result, "warn log");
539    }
540
541    #[test]
542    fn test_derive_handler_message_with_only_info_logs() {
543        let logs = vec![
544            LogEntry {
545                level: LogLevel::Log,
546                message: "first log".to_string(),
547            },
548            LogEntry {
549                level: LogLevel::Info,
550                message: "last log".to_string(),
551            },
552        ];
553        let result = derive_handler_message("", Some(&logs));
554        assert_eq!(result, "last log");
555    }
556
557    #[test]
558    fn test_derive_handler_message_no_logs() {
559        let result = derive_handler_message("", None);
560        assert_eq!(result, "Plugin execution failed");
561    }
562
563    #[test]
564    fn test_build_metadata_with_logs_and_traces() {
565        let logs = vec![LogEntry {
566            level: LogLevel::Log,
567            message: "test".to_string(),
568        }];
569        let traces = vec![serde_json::json!({"trace": "1"})];
570
571        let result = build_metadata(Some(logs.clone()), Some(traces.clone()));
572
573        assert!(result.is_some());
574        let metadata = result.unwrap();
575        assert_eq!(metadata.logs.unwrap(), logs);
576        assert_eq!(metadata.traces.unwrap(), traces);
577    }
578
579    #[test]
580    fn test_build_metadata_with_only_logs() {
581        let logs = vec![LogEntry {
582            level: LogLevel::Log,
583            message: "test".to_string(),
584        }];
585
586        let result = build_metadata(Some(logs.clone()), None);
587
588        assert!(result.is_some());
589        let metadata = result.unwrap();
590        assert_eq!(metadata.logs.unwrap(), logs);
591        assert!(metadata.traces.is_none());
592    }
593
594    #[test]
595    fn test_build_metadata_with_only_traces() {
596        let traces = vec![serde_json::json!({"trace": "1"})];
597
598        let result = build_metadata(None, Some(traces.clone()));
599
600        assert!(result.is_some());
601        let metadata = result.unwrap();
602        assert!(metadata.logs.is_none());
603        assert_eq!(metadata.traces.unwrap(), traces);
604    }
605
606    #[test]
607    fn test_build_metadata_with_neither() {
608        let result = build_metadata(None, None);
609        assert!(result.is_none());
610    }
611
612    #[test]
613    fn test_plugin_handler_payload_append_traces_to_existing() {
614        let mut payload = PluginHandlerPayload {
615            status: 400,
616            message: "test".to_string(),
617            code: None,
618            details: None,
619            logs: None,
620            traces: Some(vec![serde_json::json!({"trace": "1"})]),
621        };
622
623        payload.append_traces(vec![serde_json::json!({"trace": "2"})]);
624
625        let traces = payload.traces.unwrap();
626        assert_eq!(traces.len(), 2);
627        assert_eq!(traces[0], serde_json::json!({"trace": "1"}));
628        assert_eq!(traces[1], serde_json::json!({"trace": "2"}));
629    }
630
631    #[test]
632    fn test_plugin_handler_payload_append_traces_to_none() {
633        let mut payload = PluginHandlerPayload {
634            status: 400,
635            message: "test".to_string(),
636            code: None,
637            details: None,
638            logs: None,
639            traces: None,
640        };
641
642        payload.append_traces(vec![serde_json::json!({"trace": "1"})]);
643
644        let traces = payload.traces.unwrap();
645        assert_eq!(traces.len(), 1);
646        assert_eq!(traces[0], serde_json::json!({"trace": "1"}));
647    }
648
649    #[test]
650    fn test_plugin_handler_payload_into_response_with_logs_and_traces() {
651        let logs = vec![LogEntry {
652            level: LogLevel::Error,
653            message: "error message".to_string(),
654        }];
655        let payload = PluginHandlerPayload {
656            status: 400,
657            message: "".to_string(),
658            code: Some("ERR_CODE".to_string()),
659            details: Some(serde_json::json!({"key": "value"})),
660            logs: Some(logs.clone()),
661            traces: Some(vec![serde_json::json!({"trace": "1"})]),
662        };
663
664        let response = payload.into_response(true, true);
665
666        assert_eq!(response.status, 400);
667        assert_eq!(response.message, "error message"); // Derived from error log
668        assert_eq!(response.error.code, Some("ERR_CODE".to_string()));
669        assert!(response.metadata.is_some());
670        let metadata = response.metadata.unwrap();
671        assert_eq!(metadata.logs.unwrap(), logs);
672        assert_eq!(metadata.traces.unwrap().len(), 1);
673    }
674
675    #[test]
676    fn test_plugin_handler_payload_into_response_without_logs() {
677        let logs = vec![LogEntry {
678            level: LogLevel::Log,
679            message: "test log".to_string(),
680        }];
681        let payload = PluginHandlerPayload {
682            status: 500,
683            message: "explicit message".to_string(),
684            code: None,
685            details: None,
686            logs: Some(logs),
687            traces: None,
688        };
689
690        let response = payload.into_response(false, false);
691
692        assert_eq!(response.status, 500);
693        assert_eq!(response.message, "explicit message");
694        assert!(response.metadata.is_none()); // emit_logs=false, emit_traces=false
695    }
696
697    #[tokio::test]
698    async fn test_call_plugin_handler_error() {
699        let plugin = PluginModel {
700            id: "test-plugin".to_string(),
701            path: "test-path".to_string(),
702            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
703            emit_logs: true,
704            emit_traces: true,
705        };
706        let app_state =
707            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
708
709        let mut plugin_runner = MockPluginRunnerTrait::default();
710
711        plugin_runner
712            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
713            .returning(move |_, _, _, _, _, _, _, _| {
714                Err(PluginError::HandlerError(Box::new(PluginHandlerPayload {
715                    status: 400,
716                    message: "Plugin handler error".to_string(),
717                    code: Some("VALIDATION_ERROR".to_string()),
718                    details: Some(serde_json::json!({"field": "email"})),
719                    logs: Some(vec![LogEntry {
720                        level: LogLevel::Error,
721                        message: "Invalid email".to_string(),
722                    }]),
723                    traces: Some(vec![serde_json::json!({"step": "validation"})]),
724                })))
725            });
726
727        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
728        let outcome = plugin_service
729            .call_plugin(
730                plugin,
731                PluginCallRequest {
732                    params: serde_json::Value::Null,
733                    headers: None,
734                },
735                Arc::new(web::ThinData(app_state)),
736            )
737            .await;
738
739        match outcome {
740            PluginCallResult::Handler(response) => {
741                assert_eq!(response.status, 400);
742                assert_eq!(response.error.code, Some("VALIDATION_ERROR".to_string()));
743                assert!(response.metadata.is_some());
744                let metadata = response.metadata.unwrap();
745                assert!(metadata.logs.is_some());
746                assert!(metadata.traces.is_some());
747            }
748            _ => panic!("Expected Handler result"),
749        }
750    }
751
752    #[tokio::test]
753    async fn test_call_plugin_fatal_error() {
754        let plugin = PluginModel {
755            id: "test-plugin".to_string(),
756            path: "test-path".to_string(),
757            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
758            emit_logs: false,
759            emit_traces: false,
760        };
761        let app_state =
762            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
763
764        let mut plugin_runner = MockPluginRunnerTrait::default();
765
766        plugin_runner
767            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
768            .returning(|_, _, _, _, _, _, _, _| {
769                Err(PluginError::PluginExecutionError("Fatal error".to_string()))
770            });
771
772        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
773        let outcome = plugin_service
774            .call_plugin(
775                plugin,
776                PluginCallRequest {
777                    params: serde_json::Value::Null,
778                    headers: None,
779                },
780                Arc::new(web::ThinData(app_state)),
781            )
782            .await;
783
784        match outcome {
785            PluginCallResult::Fatal(error) => {
786                assert!(matches!(error, PluginError::PluginExecutionError(_)));
787            }
788            _ => panic!("Expected Fatal result"),
789        }
790    }
791
792    #[tokio::test]
793    async fn test_call_plugin_success_with_json_result() {
794        let plugin = PluginModel {
795            id: "test-plugin".to_string(),
796            path: "test-path".to_string(),
797            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
798            emit_logs: true,
799            emit_traces: true,
800        };
801        let app_state =
802            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
803
804        let mut plugin_runner = MockPluginRunnerTrait::default();
805
806        plugin_runner
807            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
808            .returning(|_, _, _, _, _, _, _, _| {
809                Ok(ScriptResult {
810                    logs: vec![LogEntry {
811                        level: LogLevel::Log,
812                        message: "test-log".to_string(),
813                    }],
814                    error: "".to_string(),
815                    return_value: r#"{"result": "success"}"#.to_string(),
816                    trace: vec![serde_json::json!({"step": "1"})],
817                })
818            });
819
820        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
821        let outcome = plugin_service
822            .call_plugin(
823                plugin,
824                PluginCallRequest {
825                    params: serde_json::Value::Null,
826                    headers: None,
827                },
828                Arc::new(web::ThinData(app_state)),
829            )
830            .await;
831
832        match outcome {
833            PluginCallResult::Success(result) => {
834                // Should be parsed as JSON object
835                assert_eq!(result.result, serde_json::json!({"result": "success"}));
836                assert!(result.metadata.is_some());
837                let metadata = result.metadata.unwrap();
838                assert!(metadata.logs.is_some());
839                assert!(metadata.traces.is_some());
840            }
841            _ => panic!("Expected Success result"),
842        }
843    }
844
845    #[tokio::test]
846    async fn test_call_plugin_success_with_undefined_result() {
847        let plugin = PluginModel {
848            id: "test-plugin".to_string(),
849            path: "test-path".to_string(),
850            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
851            emit_logs: false,
852            emit_traces: false,
853        };
854        let app_state =
855            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
856
857        let mut plugin_runner = MockPluginRunnerTrait::default();
858
859        plugin_runner
860            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
861            .returning(|_, _, _, _, _, _, _, _| {
862                Ok(ScriptResult {
863                    logs: vec![],
864                    error: "".to_string(),
865                    return_value: "undefined".to_string(),
866                    trace: vec![],
867                })
868            });
869
870        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
871        let outcome = plugin_service
872            .call_plugin(
873                plugin,
874                PluginCallRequest {
875                    params: serde_json::Value::Null,
876                    headers: None,
877                },
878                Arc::new(web::ThinData(app_state)),
879            )
880            .await;
881
882        match outcome {
883            PluginCallResult::Success(result) => {
884                // "undefined" should be converted to null
885                assert_eq!(result.result, serde_json::Value::Null);
886                // emit_logs=false, emit_traces=false -> no metadata
887                assert!(result.metadata.is_none());
888            }
889            _ => panic!("Expected Success result"),
890        }
891    }
892
893    #[tokio::test]
894    async fn test_call_plugin_with_headers() {
895        use std::sync::{Arc as StdArc, Mutex};
896
897        let plugin = PluginModel {
898            id: "test-plugin".to_string(),
899            path: "test-path".to_string(),
900            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
901            emit_logs: false,
902            emit_traces: false,
903        };
904        let app_state =
905            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
906
907        // Capture the headers_json parameter passed to the runner
908        let captured_headers: StdArc<Mutex<Option<String>>> = StdArc::new(Mutex::new(None));
909        let captured_headers_clone = captured_headers.clone();
910
911        let mut plugin_runner = MockPluginRunnerTrait::default();
912
913        plugin_runner
914            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
915            .returning(move |_, _, _, _, _, _, headers_json, _| {
916                // Capture the headers_json parameter
917                *captured_headers_clone.lock().unwrap() = headers_json;
918                Ok(ScriptResult {
919                    logs: vec![],
920                    error: "".to_string(),
921                    return_value: "{}".to_string(),
922                    trace: vec![],
923                })
924            });
925
926        // Create request with headers
927        let mut headers_map = std::collections::HashMap::new();
928        headers_map.insert(
929            "x-custom-header".to_string(),
930            vec!["custom-value".to_string()],
931        );
932        headers_map.insert(
933            "authorization".to_string(),
934            vec!["Bearer token123".to_string()],
935        );
936
937        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
938        let _outcome = plugin_service
939            .call_plugin(
940                plugin,
941                PluginCallRequest {
942                    params: serde_json::json!({"test": "data"}),
943                    headers: Some(headers_map.clone()),
944                },
945                Arc::new(web::ThinData(app_state)),
946            )
947            .await;
948
949        // Verify headers were serialized and passed to the runner
950        let captured = captured_headers.lock().unwrap();
951        assert!(
952            captured.is_some(),
953            "headers_json should be passed to runner"
954        );
955
956        let headers_json = captured.as_ref().unwrap();
957        let parsed: std::collections::HashMap<String, Vec<String>> =
958            serde_json::from_str(headers_json).expect("headers_json should be valid JSON");
959
960        assert_eq!(
961            parsed.get("x-custom-header"),
962            Some(&vec!["custom-value".to_string()])
963        );
964        assert_eq!(
965            parsed.get("authorization"),
966            Some(&vec!["Bearer token123".to_string()])
967        );
968    }
969
970    #[tokio::test]
971    async fn test_call_plugin_without_headers() {
972        use std::sync::{Arc as StdArc, Mutex};
973
974        let plugin = PluginModel {
975            id: "test-plugin".to_string(),
976            path: "test-path".to_string(),
977            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
978            emit_logs: false,
979            emit_traces: false,
980        };
981        let app_state =
982            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
983
984        let captured_headers: StdArc<Mutex<Option<String>>> = StdArc::new(Mutex::new(None));
985        let captured_headers_clone = captured_headers.clone();
986
987        let mut plugin_runner = MockPluginRunnerTrait::default();
988
989        plugin_runner
990            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
991            .returning(move |_, _, _, _, _, _, headers_json, _| {
992                *captured_headers_clone.lock().unwrap() = headers_json;
993                Ok(ScriptResult {
994                    logs: vec![],
995                    error: "".to_string(),
996                    return_value: "{}".to_string(),
997                    trace: vec![],
998                })
999            });
1000
1001        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1002        let _outcome = plugin_service
1003            .call_plugin(
1004                plugin,
1005                PluginCallRequest {
1006                    params: serde_json::json!({}),
1007                    headers: None, // No headers
1008                },
1009                Arc::new(web::ThinData(app_state)),
1010            )
1011            .await;
1012
1013        // Verify headers_json is None when no headers provided
1014        let captured = captured_headers.lock().unwrap();
1015        assert!(
1016            captured.is_none(),
1017            "headers_json should be None when no headers provided"
1018        );
1019    }
1020}