Skip to content

Commit a8a61b7

Browse files
committed
fix: adapt to ttrpc 0.9.0 async API changes
Signed-off-by: Maksym Pavlenko <mxpv@users.noreply.github.com>
1 parent 32dc3a7 commit a8a61b7

File tree

4 files changed

+18
-18
lines changed

4 files changed

+18
-18
lines changed

crates/shim-protos/examples/connect-async.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ async fn main() {
3232
let pid = args.get(2).map(|str| str.to_owned()).unwrap_or_default();
3333

3434
println!("Connecting to {}...", socket_path);
35-
let client = Client::connect(socket_path).expect("Failed to connect to shim");
35+
let client = Client::connect(socket_path)
36+
.await
37+
.expect("Failed to connect to shim");
3638

3739
let task_client = TaskClient::new(client);
3840

crates/shim-protos/examples/ttrpc-client-async.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ fn default_ctx() -> Context {
3030

3131
#[tokio::main]
3232
async fn main() {
33-
let c = Client::connect("unix:///tmp/shim-proto-ttrpc-001").unwrap();
33+
let c = Client::connect("unix:///tmp/shim-proto-ttrpc-001")
34+
.await
35+
.unwrap();
3436
let task = TaskClient::new(c);
3537
let now = std::time::Instant::now();
3638

crates/shim/src/asynchronous/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -349,11 +349,11 @@ pub async fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) ->
349349

350350
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))]
351351
async fn create_server(flags: &args::Flags) -> Result<Server> {
352-
use std::os::fd::IntoRawFd;
352+
use containerd_shim_protos::ttrpc::r#async::transport::Listener;
353353
let listener = start_listener(&flags.socket).await?;
354-
let mut server = Server::new();
355-
server = server.add_listener(listener.into_raw_fd())?;
356-
server = server.set_domain_unix();
354+
let listener =
355+
Listener::try_from(listener).map_err(io_error!(e, "creating ttrpc listener"))?;
356+
let server = Server::new().add_listener(listener);
357357
Ok(server)
358358
}
359359

@@ -543,7 +543,7 @@ async fn start_listener(address: &str) -> Result<UnixListener> {
543543
#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
544544
async fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> {
545545
for _i in 0..count {
546-
match Client::connect(address) {
546+
match Client::connect(address).await {
547547
Ok(_) => {
548548
return Ok(());
549549
}

crates/shim/src/asynchronous/publisher.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,9 @@ impl RemotePublisher {
134134
})
135135
.await?;
136136

137-
// Client::new() takes ownership of the RawFd.
138-
Ok(Client::new(fd))
137+
// Safety: `fd` is a unix socket returned by `connect()`.
138+
// `from_raw_unix_socket_fd` takes ownership of the RawFd.
139+
Ok(unsafe { Client::from_raw_unix_socket_fd(fd) })
139140
}
140141

141142
/// Publish a new event.
@@ -195,17 +196,14 @@ impl Events for RemotePublisher {
195196

196197
#[cfg(test)]
197198
mod tests {
198-
use std::{
199-
os::unix::{io::AsRawFd, net::UnixListener},
200-
sync::Arc,
201-
};
199+
use std::{os::unix::net::UnixListener, sync::Arc};
202200

203201
use async_trait::async_trait;
204202
use containerd_shim_protos::{
205203
api::{Empty, ForwardRequest},
206204
events::task::TaskOOM,
207205
shim_async::{create_events, Events},
208-
ttrpc::asynchronous::Server,
206+
ttrpc::asynchronous::{transport::Listener, Server},
209207
};
210208
use tokio::sync::{
211209
mpsc::{channel, Sender},
@@ -247,13 +245,11 @@ mod tests {
247245
let barrier2 = barrier.clone();
248246
let server_thread = tokio::spawn(async move {
249247
let listener = UnixListener::bind(&path1).unwrap();
248+
let listener = Listener::try_from(listener).unwrap();
250249
let service = create_events(Arc::new(server));
251250
let mut server = Server::new()
252-
.set_domain_unix()
253-
.add_listener(listener.as_raw_fd())
254-
.unwrap()
251+
.add_listener(listener)
255252
.register_service(service);
256-
std::mem::forget(listener);
257253
server.start().await.unwrap();
258254
barrier2.wait().await;
259255

0 commit comments

Comments
 (0)