Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/workerd/api/actor-state.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1089,14 +1089,15 @@ DurableObjectState::DurableObjectState(jsg::Lock& js,
kj::Maybe<jsg::Ref<DurableObjectStorage>> storage,
kj::Maybe<rpc::Container::Client> container,
bool containerRunning,
kj::Maybe<kj::String> containerInstanceId,
kj::Maybe<Worker::Actor::FacetManager&> facetManager,
kj::Maybe<ActorVersion> version)
: id(kj::mv(actorId)),
exports(js, exports),
props(js, props),
storage(kj::mv(storage)),
container(container.map([&](rpc::Container::Client& cap) {
return js.alloc<Container>(kj::mv(cap), containerRunning);
return js.alloc<Container>(kj::mv(cap), containerRunning, kj::mv(containerInstanceId));
})),
facetManager(facetManager.map(
[](Worker::Actor::FacetManager& ref) { return IoContext::current().addObject(ref); })),
Expand Down
1 change: 1 addition & 0 deletions src/workerd/api/actor-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ class DurableObjectState: public jsg::Object {
kj::Maybe<jsg::Ref<DurableObjectStorage>> storage,
kj::Maybe<rpc::Container::Client> container,
bool containerRunning,
kj::Maybe<kj::String> containerInstanceId,
kj::Maybe<Worker::Actor::FacetManager&> facetManager,
kj::Maybe<ActorVersion> version = kj::none);

Expand Down
34 changes: 31 additions & 3 deletions src/workerd/api/container.c++
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,11 @@ void ExecProcess::kill(jsg::Lock& js, jsg::Optional<int> signal) {
// =======================================================================================
// Basic lifecycle methods

Container::Container(rpc::Container::Client rpcClient, bool running)
Container::Container(rpc::Container::Client rpcClient, bool running,
kj::Maybe<kj::String> instanceId)
: rpcClient(IoContext::current().addObject(kj::heap(kj::mv(rpcClient)))),
running(running) {}
running(running),
instanceId(kj::mv(instanceId)) {}

void Container::start(jsg::Lock& js, jsg::Optional<StartupOptions> maybeOptions) {
auto flags = FeatureFlags::get(js);
Expand Down Expand Up @@ -279,7 +281,26 @@ void Container::start(jsg::Lock& js, jsg::Optional<StartupOptions> maybeOptions)

req.setCompatibilityFlags(flags);

IoContext::current().addTask(req.sendIgnoringResult());
// Fire the start RPC. Update instanceId from the response as a side effect.
// Capture a strong self-reference (JSG_THIS) to guarantee the Container object
// stays alive until the background task completes, and capture the current
// lifecycle epoch to detect stale responses.
auto self = JSG_THIS;
auto epoch = lifecycleEpoch;
IoContext::current().addTask(
req.send().then(
[self = kj::mv(self), epoch](
capnp::Response<rpc::Container::StartResults> results) mutable {
if (epoch != self->lifecycleEpoch) {
// A destroy/monitor happened since this start() was issued. Discard.
return;
}
if (results.hasInstanceId()) {
self->instanceId = kj::str(results.getInstanceId());
}
}
)
);

running = true;
}
Expand Down Expand Up @@ -552,6 +573,8 @@ jsg::Promise<void> Container::monitor(jsg::Lock& js) {
.awaitIo(js, rpcClient->monitorRequest(capnp::MessageSize{4, 0}).send())
.then(js, [this](jsg::Lock& js, capnp::Response<rpc::Container::MonitorResults> results) {
running = false;
instanceId = kj::none;
++lifecycleEpoch;
auto exitCode = results.getExitCode();
KJ_IF_SOME(d, destroyReason) {
jsg::Value error = kj::mv(d);
Expand All @@ -566,6 +589,8 @@ jsg::Promise<void> Container::monitor(jsg::Lock& js) {
}
}, [this](jsg::Lock& js, jsg::Value&& error) {
running = false;
instanceId = kj::none;
++lifecycleEpoch;
destroyReason = kj::none;
js.throwException(kj::mv(error));
});
Expand All @@ -578,6 +603,9 @@ jsg::Promise<void> Container::destroy(jsg::Lock& js, jsg::Optional<jsg::Value> e
destroyReason = kj::mv(error);
}

instanceId = kj::none;
++lifecycleEpoch;

return IoContext::current().awaitIo(
js, rpcClient->destroyRequest(capnp::MessageSize{4, 0}).sendIgnoringResult());
}
Expand Down
12 changes: 11 additions & 1 deletion src/workerd/api/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class ExecProcess: public jsg::Object {
// etc.
class Container: public jsg::Object {
public:
Container(rpc::Container::Client rpcClient, bool running);
Container(rpc::Container::Client rpcClient, bool running, kj::Maybe<kj::String> instanceId);

struct DirectorySnapshot {
kj::String id;
Expand Down Expand Up @@ -230,6 +230,12 @@ class Container: public jsg::Object {
return running;
}

// Returns the unique instance identifier for the current container,
// or JS undefined if no container is running or ID is unavailable.
jsg::Optional<kj::StringPtr> getInstanceId() const {
return instanceId.map([](const kj::String& s) -> kj::StringPtr { return s; });
}

// Methods correspond closely to the RPC interface in `container.capnp`.
void start(jsg::Lock& js, jsg::Optional<StartupOptions> options);
jsg::Promise<void> monitor(jsg::Lock& js);
Expand All @@ -254,6 +260,7 @@ class Container: public jsg::Object {

JSG_RESOURCE_TYPE(Container, CompatibilityFlags::Reader flags) {
JSG_READONLY_PROTOTYPE_PROPERTY(running, getRunning);
JSG_READONLY_PROTOTYPE_PROPERTY(instanceId, getInstanceId);
JSG_METHOD(start);
JSG_METHOD(monitor);
JSG_METHOD(destroy);
Expand All @@ -274,11 +281,14 @@ class Container: public jsg::Object {

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
tracker.trackField("destroyReason", destroyReason);
KJ_IF_SOME(i, instanceId) { tracker.trackField("instanceId", i); }
}

private:
IoOwn<rpc::Container::Client> rpcClient;
bool running;
kj::Maybe<kj::String> instanceId;
uint64_t lifecycleEpoch = 0;

kj::Maybe<jsg::Value> destroyReason;

Expand Down
14 changes: 9 additions & 5 deletions src/workerd/io/container.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ interface Container @0x9aaceefc06523bca {
# When the actor shuts down, workerd will drop the `Container` capability, at which point
# the container engine should implicitly destroy the container.

status @0 () -> (running :Bool);
# Returns the container's current status. The runtime will always call this at DO startup.

start @1 StartParams -> ();
# Start the container. It's an error to call this if the container is already running.
status @0 () -> (running :Bool, instanceId :Text);
# Returns the container's current status and unique instance identifier.
# `instanceId` is an opaque string that uniquely identifies this specific container instance.
# It changes every time a new container is provisioned (after sleep, crash, or eviction).
# Empty string means no ID is available (e.g., old runtime that predates this field).

start @1 StartParams -> (instanceId :Text);
# Start the container. Returns the unique instance identifier for the new container.
# It's an error to call this if the container is already running.

struct StartParams {
entrypoint @0 :List(Text);
Expand Down
12 changes: 11 additions & 1 deletion src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -3887,6 +3887,7 @@ kj::Promise<void> Worker::Actor::ensureConstructedImpl(IoContext& context, Actor

try {
bool containerRunning = false;
kj::Maybe<kj::String> containerInstanceId;
KJ_IF_SOME(c, impl->container) {
// We need to do an RPC to check if the container is running.
// TODO(perf): It would be nice if we could have started this RPC earlier, e.g. in parallel
Expand All @@ -3895,9 +3896,17 @@ kj::Promise<void> Worker::Actor::ensureConstructedImpl(IoContext& context, Actor
// not a huge deal.
auto status = co_await c.statusRequest(capnp::MessageSize{4, 0}).send();
containerRunning = status.getRunning();
// Defense-in-depth: only accept instanceId when running AND field is present.
// This protects the JS invariant (instanceId !== undefined → running === true)
// even if a producer regresses and sends instanceId for non-running containers.
if (containerRunning && status.hasInstanceId()) {
containerInstanceId = kj::str(status.getInstanceId());
}
}

co_await context.run([this, &info, containerRunning](Worker::Lock& lock) {
co_await context.run([this, &info, containerRunning,
containerInstanceId = kj::mv(containerInstanceId)](
Worker::Lock& lock) mutable {
jsg::Lock& js = lock;

kj::Maybe<jsg::Ref<api::DurableObjectStorage>> storage;
Expand All @@ -3908,6 +3917,7 @@ kj::Promise<void> Worker::Actor::ensureConstructedImpl(IoContext& context, Actor
auto ctx = js.alloc<api::DurableObjectState>(js, cloneId(),
jsg::JsValue(KJ_ASSERT_NONNULL(lock.getWorker().impl->ctxExports).getHandle(js)),
impl->props.toJs(js), kj::mv(storage), kj::mv(impl->container), containerRunning,
kj::mv(containerInstanceId),
impl->facetManager, impl->version.map([](ActorVersion& v) {
return ActorVersion{.cohort = v.cohort.map([](kj::String& s) { return kj::str(s); })};
}));
Expand Down
25 changes: 19 additions & 6 deletions src/workerd/server/container-client.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1515,7 +1515,7 @@ kj::Promise<ContainerClient::InspectResponse> ContainerClient::inspectContainer(
// We check if the container with the given name exist, and if it's not,
// we simply return false while avoiding an unnecessary error.
if (response.statusCode == 404) {
co_return InspectResponse{.isRunning = false};
co_return InspectResponse{.isRunning = false, .id = kj::none};
}

JSG_REQUIRE(response.statusCode == 200, Error, "Container inspect failed");
Expand All @@ -1533,7 +1533,7 @@ kj::Promise<ContainerClient::InspectResponse> ContainerClient::inspectContainer(
// perspective, a restarting container is still "alive" and should be treated as running
// so that start() correctly refuses to start a duplicate and destroy() can clean it up.
bool running = status == "running" || status == "restarting";
co_return InspectResponse{.isRunning = running};
co_return InspectResponse{.isRunning = running, .id = kj::str(jsonRoot.getId())};
}

kj::Promise<kj::Maybe<ContainerClient::SidecarInspectResponse>> ContainerClient::inspectSidecar() {
Expand Down Expand Up @@ -2152,12 +2152,12 @@ kj::Promise<void> ContainerClient::status(StatusContext context) {
co_await ready;
KJ_DEFER(done->fulfill());

const auto [isRunning] = co_await inspectContainer();
containerStarted.store(isRunning, std::memory_order_release);
auto inspect = co_await inspectContainer();
containerStarted.store(inspect.isRunning, std::memory_order_release);
containerSidecarStarted.store(false, std::memory_order_release);
this->sidecarIngressHostPort = kj::none;

if (isRunning) {
if (inspect.isRunning) {
// If the sidecar container is already running (e.g. workerd restarted while
// containers stayed up), recover its published ingress port, then configure
// it to use our current egress listener port.
Expand All @@ -2170,7 +2170,14 @@ kj::Promise<void> ContainerClient::status(StatusContext context) {
co_await readCACert();
}

context.getResults().setRunning(isRunning);
auto results = context.getResults();
results.setRunning(inspect.isRunning);
// Only set instanceId when running (preserves JS invariant)
if (inspect.isRunning) {
KJ_IF_SOME(id, inspect.id) {
results.setInstanceId(id);
}
}
}

kj::Promise<void> ContainerClient::start(StartContext context) {
Expand Down Expand Up @@ -2252,6 +2259,12 @@ kj::Promise<void> ContainerClient::start(StartContext context) {
co_await startContainer();

containerStarted.store(true, std::memory_order_release);

// Set the Docker container ID as instanceId
auto inspect = co_await inspectContainer();
KJ_IF_SOME(id, inspect.id) {
context.getResults().setInstanceId(id);
}
}

kj::Promise<void> ContainerClient::monitor(MonitorContext context) {
Expand Down
1 change: 1 addition & 0 deletions src/workerd/server/container-client.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte

struct InspectResponse {
bool isRunning;
kj::Maybe<kj::String> id; // Docker container ID, None if container not found (404)
};

struct IPAMConfigResult {
Expand Down
20 changes: 20 additions & 0 deletions src/workerd/server/tests/container-client/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ export class DurableObjectExample extends DurableObject {
}

assert.strictEqual(container.running, false);
assert.strictEqual(container.instanceId, undefined);

// Start container with valid configuration
container.start({
Expand All @@ -90,10 +91,29 @@ export class DurableObjectExample extends DurableObject {

await this.waitUntilContainerIsHealthy();

assert.strictEqual(typeof container.instanceId, 'string');
assert.ok(container.instanceId.length > 0);
const firstInstanceId = container.instanceId;

await container.destroy();

await monitor;
assert.strictEqual(container.running, false);
assert.strictEqual(container.instanceId, undefined);

// Restart — instanceId must change (new container provisioned)
container.start({
env: { A: 'B', C: 'D', L: 'F' },
enableInternet: true,
});
const monitor2 = container.monitor().catch((_err) => {});
await this.waitUntilContainerIsHealthy();

assert.strictEqual(typeof container.instanceId, 'string');
assert.notStrictEqual(container.instanceId, firstInstanceId);

await container.destroy();
await monitor2;
}

async testExec() {
Expand Down
Loading