Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion async-opcua-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub use session::{
SessionActivity, SessionBuilder, SessionConnectMode, SessionEventLoop, SessionPollResult,
Subscription, SubscriptionActivity, SubscriptionCallbacks, UARequest,
};
pub use transport::{AsyncSecureChannel, TcpConnector, TcpTransport};
pub use transport::AsyncSecureChannel;

/// This module contains utilities for reverse connect. Allowing you to
/// connect to a server by having the server initiate the connection to the client.
Expand Down
9 changes: 6 additions & 3 deletions async-opcua-client/src/transport/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use std::{future::Future, sync::Arc};

use opcua_types::{EndpointDescription, Error, StatusCode};

use crate::transport::state::SecureChannelState;
use crate::transport::{state::SecureChannelState, RequestRecv};

use super::{tcp::TransportConfiguration, OutgoingMessage, TcpConnector, TransportPollResult};
use super::{tcp::TransportConfiguration, TcpConnector, TransportPollResult};

/// Trait implemented by simple wrapper types that create a connection to an OPC-UA server.
///
Expand All @@ -13,6 +13,9 @@ use super::{tcp::TransportConfiguration, OutgoingMessage, TcpConnector, Transpor
/// - This deals with connection establishment up to after exchange of HELLO/ACKNOWLEDGE
/// or equivalent.
/// - This should not do any retries, that's handled on a higher level.
///
/// Most implementations will want to use `StreamConnector` instead of doing the
/// hello/acknowledge exchange manually. See `TcpConnector` for an example of this.
pub trait Connector: Send + Sync {
/// The transport type created by this connector.
type Transport: Transport + Send + Sync + 'static;
Expand All @@ -23,7 +26,7 @@ pub trait Connector: Send + Sync {
fn connect(
&self,
channel: Arc<SecureChannelState>,
outgoing_recv: tokio::sync::mpsc::Receiver<OutgoingMessage>,
outgoing_recv: RequestRecv,
config: TransportConfiguration,
) -> impl Future<Output = Result<Self::Transport, StatusCode>> + Send + Sync;

Expand Down
32 changes: 24 additions & 8 deletions async-opcua-client/src/transport/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use opcua_core::comms::{
use opcua_types::{Error, StatusCode};

use crate::transport::state::SecureChannelState;
use crate::transport::RequestRecv;

#[derive(Debug)]
struct MessageChunkWithChunkInfo {
Expand All @@ -29,13 +30,14 @@ pub(crate) struct MessageState {
deadline: Instant,
}

pub(super) struct TransportState {
/// Internal state of a transport implementation.
pub struct TransportState {
/// Channel for outgoing requests. Will only be polled if the number of inflight requests is below the limit.
outgoing_recv: tokio::sync::mpsc::Receiver<OutgoingMessage>,
/// State of pending requests
message_states: HashMap<u32, MessageState>,
/// Secure channel
pub(super) channel_state: Arc<SecureChannelState>,
pub channel_state: Arc<SecureChannelState>,
/// Max pending incoming messages
max_chunk_count: usize,
/// Last decoded sequence number
Expand All @@ -45,6 +47,13 @@ pub(super) struct TransportState {
receive_buffer_size: usize,
}

#[derive(Debug, Clone, Copy)]
pub(super) enum TransportCloseState {
Comment thread
svanharmelen marked this conversation as resolved.
Open,
Closing(StatusCode),
Closed(StatusCode),
}

#[derive(Debug)]
/// Result of polling a transport implementation.
/// This represents a single iteration of the transport event loop.
Expand All @@ -62,16 +71,21 @@ pub enum TransportPollResult {
Closed(StatusCode),
}

/// An outgoing message to be sent by the transport.
pub struct OutgoingMessage {
/// The actual request message to send.
pub request: RequestMessage,
/// A callback that should be called when a response is received.
pub callback: Option<tokio::sync::oneshot::Sender<Result<ResponseMessage, StatusCode>>>,
/// Deadline for the request.
pub deadline: Instant,
}

impl TransportState {
pub(super) fn new(
/// Create a new transport state.
pub fn new(
channel_state: Arc<SecureChannelState>,
outgoing_recv: tokio::sync::mpsc::Receiver<OutgoingMessage>,
outgoing_recv: RequestRecv,
max_chunk_count: usize,
receive_buffer_size: usize,
) -> Self {
Expand All @@ -91,7 +105,7 @@ impl TransportState {
}

/// Wait for an outgoing message. Will also check for timed out messages.
pub(super) async fn wait_for_outgoing_message(
pub async fn wait_for_outgoing_message(
&mut self,
send_buffer: &mut SendBuffer,
) -> Option<(RequestMessage, u32)> {
Expand Down Expand Up @@ -124,7 +138,7 @@ impl TransportState {
}

/// Store incoming messages in the message state.
pub(super) fn handle_incoming_message(&mut self, message: Message) -> Result<(), StatusCode> {
pub fn handle_incoming_message(&mut self, message: Message) -> Result<(), StatusCode> {
let status = match message {
Message::Acknowledge(ack) => {
debug!("Reader got an unexpected ack {:?}", ack);
Expand All @@ -151,7 +165,9 @@ impl TransportState {
}
}

pub(super) fn message_send_failed(&mut self, request_id: u32, err: StatusCode) {
/// Call this if sending a message fails. This will notify the waiting request
/// that the message could not be sent.
pub fn message_send_failed(&mut self, request_id: u32, err: StatusCode) {
if let Some(message_state) = self.message_states.remove(&request_id) {
let _ = message_state.callback.send(Err(err));
}
Expand Down Expand Up @@ -314,7 +330,7 @@ impl TransportState {
/// Close the transport, aborting any pending requests.
/// If `status` is good, the pending requests will be terminated with
/// `BadConnectionClosed`.
pub(super) async fn close(&mut self, status: StatusCode) -> StatusCode {
pub async fn close(&mut self, status: StatusCode) -> StatusCode {
// If the status is good, we still want to send a bad status code
// to the pending requests. They didn't succeed, after all.
let request_status = if status.is_good() {
Expand Down
10 changes: 7 additions & 3 deletions async-opcua-client/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ mod channel;
mod connect;
mod core;
mod state;
mod stream;
pub(super) mod tcp;

pub use channel::{AsyncSecureChannel, SecureChannelEventLoop};
pub use connect::{Connector, ConnectorBuilder, Transport};
pub(crate) use core::OutgoingMessage;
pub use core::TransportPollResult;
pub use tcp::{ReverseHelloVerifier, ReverseTcpConnector, TcpConnector, TcpTransport};
pub use core::{OutgoingMessage, TransportPollResult, TransportState};
pub use state::{RequestRecv, RequestSend, SecureChannelState};
pub use stream::{wait_for_reverse_hello, StreamConnection, StreamConnector, StreamTransport};
pub use tcp::{
ReverseHelloVerifier, ReverseTcpConnector, TcpConnector, TcpTransport, TransportConfiguration,
};
6 changes: 5 additions & 1 deletion async-opcua-client/src/transport/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use opcua_types::{
OpenSecureChannelResponse, RequestHeader, SecurityTokenRequestType, StatusCode,
};

pub(crate) type RequestSend = tokio::sync::mpsc::Sender<OutgoingMessage>;
/// Tokio channel for sending requests to the transport.
pub type RequestSend = tokio::sync::mpsc::Sender<OutgoingMessage>;
/// Tokio channel for receiving requests in the transport.
pub type RequestRecv = tokio::sync::mpsc::Receiver<OutgoingMessage>;

/// The state of the secure channel used by the transport.
pub struct SecureChannelState {
Expand Down Expand Up @@ -211,6 +214,7 @@ impl SecureChannelState {
self.authentication_token.store(Arc::new(token));
}

/// Get a reference to the secure channel.
pub fn secure_channel(&self) -> &RwLock<SecureChannel> {
&self.secure_channel
}
Expand Down
Loading