Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion async-opcua-client/src/transport/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,16 @@ impl<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> StreamTransport<R, W> {
incoming: Option<Result<Message, std::io::Error>>,
) -> TransportPollResult {
let Some(incoming) = incoming else {
error!("Stream unexpectedly closed by peer");
return TransportPollResult::Closed(StatusCode::BadCommunicationError);
};
match incoming {
Ok(message) => {
if let Err(e) = self.state.handle_incoming_message(message) {
error!(
"Failed to handle incoming message, closing transport: {}",
e
);
TransportPollResult::Closed(e)
} else {
TransportPollResult::IncomingMessage
Expand All @@ -279,6 +284,7 @@ impl<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> StreamTransport<R, W> {
if self.send_buffer.should_encode_chunks() {
let secure_channel = trace_read_lock!(self.state.channel_state.secure_channel());
if let Err(e) = self.send_buffer.encode_next_chunk(&secure_channel) {
error!("Failed to encode chunk, closing transport: {}", e);
return TransportPollResult::Closed(e);
}
}
Expand All @@ -290,7 +296,7 @@ impl<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> StreamTransport<R, W> {
tokio::select! {
r = self.send_buffer.read_into_async(&mut self.write) => {
if let Err(e) = r {
error!("write bytes task failed: {}", e);
error!("Writing outgoing message to stream failed: {}", e);
return TransportPollResult::Closed(StatusCode::BadCommunicationError);
}
TransportPollResult::OutgoingMessageSent
Expand Down Expand Up @@ -323,6 +329,7 @@ impl<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> StreamTransport<R, W> {
self.state.message_send_failed(request_id, e.status());
TransportPollResult::RecoverableError(e.status())
} else {
error!("Failed to write outgoing message to send buffer, closing transport: {}", e);
TransportPollResult::Closed(e.status())
}
} else {
Expand Down
9 changes: 2 additions & 7 deletions async-opcua-core/src/comms/message_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,8 @@ impl SimpleBinaryDecodable for MessageChunk {
decoding_options: &DecodingOptions,
) -> EncodingResult<Self> {
// Read the header out first
let chunk_header =
MessageChunkHeader::decode(in_stream, decoding_options).map_err(|err| {
Error::new(
StatusCode::BadCommunicationError,
format!("Cannot decode chunk header {err:?}"),
)
})?;
let chunk_header = MessageChunkHeader::decode(in_stream, decoding_options)
.map_err(|err| Error::decoding(format!("Cannot decode chunk header {err:?}")))?;

let message_size = chunk_header.message_size as usize;
if decoding_options.max_message_size > 0 && message_size > decoding_options.max_message_size
Expand Down