Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions devolutions-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ sha2 = "0.10"
serde_json = "1"
serde = { version = "1", features = ["derive"] }
tap = "1.0"
tempfile = "3"
tokio-tungstenite = { version = "0.26", features = ["rustls-tls-native-roots"] }
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "tls12", "ring"] }
tracing = "0.1"
Expand Down Expand Up @@ -98,8 +99,5 @@ features = [
[target.'cfg(windows)'.build-dependencies]
embed-resource = "3.0"

[dev-dependencies]
tempfile = "3"

[target.'cfg(windows)'.dev-dependencies]
expect-test = "1.5"
18 changes: 14 additions & 4 deletions devolutions-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::io::BufReader;
use std::net::SocketAddr;
use std::sync::Arc;

use anyhow::{Context, bail};
use anyhow::{Context as _, bail};
use camino::{Utf8Path, Utf8PathBuf};
use devolutions_agent_shared::{default_schedule_window_start, get_data_dir};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -402,7 +402,10 @@ pub mod dto {
pub schedule: Option<UpdaterSchedule>,
}

#[allow(clippy::derivable_impls)] // Just to be explicit about the default values of the config.
#[expect(
clippy::derivable_impls,
reason = "manually implemented so we are being explicit about the defaults"
)]
impl Default for UpdaterConf {
fn default() -> Self {
Self {
Expand Down Expand Up @@ -471,7 +474,10 @@ pub mod dto {
pub enabled: bool,
}

#[allow(clippy::derivable_impls)] // Just to be explicit about the default values of the config.
#[expect(
clippy::derivable_impls,
reason = "manually implemented so we are being explicit about the defaults"
)]
impl Default for SessionConf {
fn default() -> Self {
Self { enabled: false }
Expand Down Expand Up @@ -525,6 +531,7 @@ pub mod dto {
pub server_spki_sha256: Option<String>,
}

/// PowerShell Universal Event Hub compatibility configuration.
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct PsuEventHubConf {
Expand All @@ -544,7 +551,10 @@ pub mod dto {
pub powershell: PsuPowerShellConf,
}

#[allow(clippy::derivable_impls)] // Just to be explicit about default disabled behavior.
#[expect(
clippy::derivable_impls,
reason = "manually implemented so we are being explicit about the defaults"
)]
impl Default for PsuEventHubConf {
fn default() -> Self {
Self {
Expand Down
36 changes: 24 additions & 12 deletions devolutions-agent/src/psu_event_hub/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use std::sync::Arc;
use anyhow::Context as _;
use camino::Utf8PathBuf;
use serde_json::Value;
use tokio::task::JoinSet;
use uuid::Uuid;

use crate::config::dto::{PsuEventHubConnectionConf, PsuPowerShellConf};
use crate::config::dto::PsuEventHubConnectionConf;
use crate::psu_event_hub::models::WebsocketEventResponse;
use crate::psu_event_hub::powershell_worker::PowerShellWorker;
use crate::psu_event_hub::result_store::ResultStore;
Expand All @@ -19,16 +20,21 @@ pub(super) struct EventHubExecutor {
}

impl EventHubExecutor {
pub(super) fn new(connection: &PsuEventHubConnectionConf, power_shell: PsuPowerShellConf) -> Self {
pub(super) fn new(connection: &PsuEventHubConnectionConf, worker: Arc<PowerShellWorker>) -> Self {
Self {
hub: connection.hub.clone(),
script_path: connection.script_path.as_ref().map(normalize_script_path),
worker: Arc::new(PowerShellWorker::new(power_shell)),
worker,
result_store: ResultStore::default(),
}
}

pub(super) fn handle_invocation(&self, target: &str, arguments: &[Value]) -> anyhow::Result<Option<Value>> {
pub(super) fn handle_invocation(
&self,
target: &str,
arguments: &[Value],
execution_tasks: &mut JoinSet<()>,
) -> anyhow::Result<Option<Value>> {
if target == "GetResult" {
let execution_id = required_string_argument(arguments, 0, "event id")?;
let result = self.result_store.take(execution_id);
Expand All @@ -39,41 +45,47 @@ impl EventHubExecutor {

if target == self.hub {
let data = required_string_argument(arguments, 0, "event data")?.to_owned();
let execution_id = self.execute_script(data, true);
let execution_id = self.execute_script(data, true, execution_tasks);
return Ok(Some(Value::String(execution_id)));
}

if target == format!("{}Void", self.hub) {
let data = required_string_argument(arguments, 0, "event data")?.to_owned();
self.execute_script(data, false);
self.execute_script(data, false, execution_tasks);
return Ok(None);
}

if target == format!("{}Module", self.hub) {
let command = required_string_argument(arguments, 0, "command")?.to_owned();
let data = required_string_argument(arguments, 1, "event data")?.to_owned();
let execution_id = self.execute_command(command, data, true);
let execution_id = self.execute_command(command, data, true, execution_tasks);
return Ok(Some(Value::String(execution_id)));
}

if target == format!("{}ModuleVoid", self.hub) {
let command = required_string_argument(arguments, 0, "command")?.to_owned();
let data = required_string_argument(arguments, 1, "event data")?.to_owned();
self.execute_command(command, data, false);
self.execute_command(command, data, false, execution_tasks);
return Ok(None);
}

warn!(%target, hub = %self.hub, "Received unknown PSU Event Hub invocation");
Ok(None)
}

fn execute_command(&self, command: String, data: String, return_result: bool) -> String {
fn execute_command(
&self,
command: String,
data: String,
return_result: bool,
execution_tasks: &mut JoinSet<()>,
) -> String {
let execution_id = Uuid::new_v4().to_string();
let worker = Arc::clone(&self.worker);
let result_store = self.result_store.clone();
let stored_execution_id = execution_id.clone();

tokio::spawn(async move {
execution_tasks.spawn(async move {
match worker.execute_command(command, data, return_result).await {
Ok(response) if return_result => result_store.insert(stored_execution_id, response),
Ok(_) => {}
Expand All @@ -90,7 +102,7 @@ impl EventHubExecutor {
execution_id
}

fn execute_script(&self, data: String, return_result: bool) -> String {
fn execute_script(&self, data: String, return_result: bool, execution_tasks: &mut JoinSet<()>) -> String {
let execution_id = Uuid::new_v4().to_string();
let Some(script_path) = self.script_path.clone() else {
if return_result {
Expand All @@ -106,7 +118,7 @@ impl EventHubExecutor {
let result_store = self.result_store.clone();
let stored_execution_id = execution_id.clone();

tokio::spawn(async move {
execution_tasks.spawn(async move {
match worker.execute_script(script_path, data, return_result).await {
Ok(response) if return_result => result_store.insert(stored_execution_id, response),
Ok(_) => {}
Expand Down
56 changes: 52 additions & 4 deletions devolutions-agent/src/psu_event_hub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ mod powershell_worker;
mod result_store;
mod signalr;

use std::sync::Arc;

use anyhow::Context as _;
use async_trait::async_trait;
use devolutions_gateway_task::{ShutdownSignal, Task};
use tokio::task::JoinSet;

use crate::config::ConfHandle;
use crate::config::{ConfHandle, dto};
use crate::psu_event_hub::executor::EventHubExecutor;
use crate::psu_event_hub::powershell_worker::PowerShellWorker;

Expand Down Expand Up @@ -45,16 +48,27 @@ impl Task for PsuEventHubTask {

let mut join_set = JoinSet::new();

let secret_resolver = PowerShellWorker::new(psu_conf.powershell.clone());
let worker = Arc::new(
PowerShellWorker::new(psu_conf.powershell.clone()).context("failed to initialize PSU PowerShell worker")?,
);

for mut connection in psu_conf.connections {
if connection.hub.trim().is_empty() {
warn!(url = %connection.url, "Skipping PSU Event Hub connection without a hub name");
continue;
}

if let Err(error) = validate_connection(&connection) {
error!(
hub = %connection.hub,
error = format!("{error:#}"),
"Skipping PSU Event Hub connection because configuration is invalid"
);
continue;
}

if let Some(app_token) = connection.app_token.as_deref() {
match secret_resolver.resolve_app_token(app_token).await {
match worker.resolve_app_token(app_token).await {
Ok(resolved) => connection.app_token = Some(resolved),
Err(error) => {
error!(
Expand All @@ -67,7 +81,7 @@ impl Task for PsuEventHubTask {
}
}

let executor = EventHubExecutor::new(&connection, psu_conf.powershell.clone());
let executor = EventHubExecutor::new(&connection, Arc::clone(&worker));
let connection_shutdown_signal = shutdown_signal.clone();

join_set
Expand All @@ -85,3 +99,37 @@ impl Task for PsuEventHubTask {
Ok(())
}
}

fn validate_connection(connection: &dto::PsuEventHubConnectionConf) -> anyhow::Result<()> {
if connection.use_default_credentials && connection.app_token.is_none() {
anyhow::bail!(
"PSU Event Hub use_default_credentials is configured for hub {} but Windows default credentials are not implemented",
connection.hub
);
}

Ok(())
}

#[cfg(test)]
mod tests {
use url::Url;

use super::*;

// THOUGHT(CBenoit): The contract should probably be encoded at the type level instead of providing a separate validate_connection function.
// We need to verify if the type can be made stronger without breaking something else though.
#[test]
fn default_credentials_without_app_token_are_rejected() {
let connection = dto::PsuEventHubConnectionConf {
hub: "Hub".to_owned(),
url: Url::parse("http://localhost:5000").expect("parse URL"),
app_token: None,
use_default_credentials: true,
script_path: None,
description: None,
};

assert!(validate_connection(&connection).is_err());
Comment on lines +123 to +133

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: This should probably be encoded at the type level instead of through a validate function; making the runtime test unrequired.

}
}
10 changes: 10 additions & 0 deletions devolutions-agent/src/psu_event_hub/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ impl WebsocketEventResponse {
terminating_error: Some(message.into()),
}
}

pub(super) fn timeout(message: impl Into<String>) -> Self {
Self {
data: None,
job_outputs: Vec::new(),
complete: true,
timeout: true,
terminating_error: Some(message.into()),
}
}
}

impl Default for WebsocketEventResponse {
Expand Down
Loading
Loading