Skip to content

Commit e289298

Browse files
chore: Add test to verify workflow reconnection logic (#1692)
Signed-off-by: Javier Aliaga <javier@diagrid.io>
1 parent 54833d7 commit e289298

File tree

1 file changed

+205
-0
lines changed

1 file changed

+205
-0
lines changed
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
* Copyright 2026 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
14+
package io.dapr.durabletask;
15+
16+
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
17+
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
18+
import io.grpc.ManagedChannel;
19+
import io.grpc.Server;
20+
import io.grpc.Status;
21+
import io.grpc.inprocess.InProcessChannelBuilder;
22+
import io.grpc.inprocess.InProcessServerBuilder;
23+
import io.grpc.stub.StreamObserver;
24+
import org.junit.jupiter.api.AfterEach;
25+
import org.junit.jupiter.api.Test;
26+
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
31+
import static org.junit.jupiter.api.Assertions.assertTrue;
32+
33+
/**
34+
* Tests that DurableTaskGrpcWorker auto-heals (reconnects) when the gRPC
35+
* connection to the sidecar drops with UNAVAILABLE or CANCELLED status.
36+
*
37+
* @see <a href="https://github.com/dapr/java-sdk/issues/1652">Issue #1652</a>
38+
*/
39+
class DurableTaskGrpcWorkerReconnectTest {
40+
41+
private DurableTaskGrpcWorker worker;
42+
private Server server;
43+
private ManagedChannel channel;
44+
45+
@AfterEach
46+
void tearDown() throws Exception {
47+
if (worker != null) {
48+
worker.close();
49+
}
50+
if (channel != null) {
51+
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
52+
}
53+
if (server != null) {
54+
server.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
55+
}
56+
}
57+
58+
@Test
59+
void workerReconnectsAfterUnavailableError() throws Exception {
60+
int requiredCalls = 3;
61+
CountDownLatch latch = new CountDownLatch(requiredCalls);
62+
AtomicInteger callCount = new AtomicInteger(0);
63+
64+
String serverName = InProcessServerBuilder.generateName();
65+
server = InProcessServerBuilder.forName(serverName)
66+
.directExecutor()
67+
.addService(new TaskHubSidecarServiceGrpc.TaskHubSidecarServiceImplBase() {
68+
@Override
69+
public void getWorkItems(
70+
OrchestratorService.GetWorkItemsRequest request,
71+
StreamObserver<OrchestratorService.WorkItem> responseObserver) {
72+
callCount.incrementAndGet();
73+
latch.countDown();
74+
// Simulate sidecar being unavailable
75+
responseObserver.onError(Status.UNAVAILABLE
76+
.withDescription("Sidecar is unavailable")
77+
.asRuntimeException());
78+
}
79+
})
80+
.build()
81+
.start();
82+
83+
channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
84+
85+
worker = new DurableTaskGrpcWorkerBuilder()
86+
.grpcChannel(channel)
87+
.build();
88+
worker.start();
89+
90+
// The worker should retry multiple times after UNAVAILABLE errors.
91+
// With a 5-second retry delay, we wait long enough for at least 3 attempts.
92+
boolean reached = latch.await(30, TimeUnit.SECONDS);
93+
assertTrue(reached,
94+
"Expected at least " + requiredCalls + " getWorkItems calls (reconnect attempts), but got " + callCount.get());
95+
}
96+
97+
@Test
98+
void workerReconnectsAfterCancelledError() throws Exception {
99+
int requiredCalls = 2;
100+
CountDownLatch latch = new CountDownLatch(requiredCalls);
101+
AtomicInteger callCount = new AtomicInteger(0);
102+
103+
String serverName = InProcessServerBuilder.generateName();
104+
server = InProcessServerBuilder.forName(serverName)
105+
.directExecutor()
106+
.addService(new TaskHubSidecarServiceGrpc.TaskHubSidecarServiceImplBase() {
107+
@Override
108+
public void getWorkItems(
109+
OrchestratorService.GetWorkItemsRequest request,
110+
StreamObserver<OrchestratorService.WorkItem> responseObserver) {
111+
callCount.incrementAndGet();
112+
latch.countDown();
113+
// Simulate connection cancelled (e.g., sidecar restart)
114+
responseObserver.onError(Status.CANCELLED
115+
.withDescription("Connection cancelled")
116+
.asRuntimeException());
117+
}
118+
})
119+
.build()
120+
.start();
121+
122+
channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
123+
124+
worker = new DurableTaskGrpcWorkerBuilder()
125+
.grpcChannel(channel)
126+
.build();
127+
worker.start();
128+
129+
boolean reached = latch.await(30, TimeUnit.SECONDS);
130+
assertTrue(reached,
131+
"Expected at least " + requiredCalls + " getWorkItems calls after CANCELLED, but got " + callCount.get());
132+
}
133+
134+
@Test
135+
void workerReconnectsAfterStreamEndsNormally() throws Exception {
136+
int requiredCalls = 2;
137+
CountDownLatch latch = new CountDownLatch(requiredCalls);
138+
AtomicInteger callCount = new AtomicInteger(0);
139+
140+
String serverName = InProcessServerBuilder.generateName();
141+
server = InProcessServerBuilder.forName(serverName)
142+
.directExecutor()
143+
.addService(new TaskHubSidecarServiceGrpc.TaskHubSidecarServiceImplBase() {
144+
@Override
145+
public void getWorkItems(
146+
OrchestratorService.GetWorkItemsRequest request,
147+
StreamObserver<OrchestratorService.WorkItem> responseObserver) {
148+
callCount.incrementAndGet();
149+
latch.countDown();
150+
// Simulate stream ending normally (server completes without sending items)
151+
responseObserver.onCompleted();
152+
}
153+
})
154+
.build()
155+
.start();
156+
157+
channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
158+
159+
worker = new DurableTaskGrpcWorkerBuilder()
160+
.grpcChannel(channel)
161+
.build();
162+
worker.start();
163+
164+
// When the stream ends normally, the outer while(true) loop should
165+
// re-establish the stream immediately (no 5s delay for normal completion).
166+
boolean reached = latch.await(10, TimeUnit.SECONDS);
167+
assertTrue(reached,
168+
"Expected at least " + requiredCalls + " getWorkItems calls after normal stream end, but got " + callCount.get());
169+
}
170+
171+
@Test
172+
void workerStopsCleanlyOnClose() throws Exception {
173+
CountDownLatch firstCallLatch = new CountDownLatch(1);
174+
175+
String serverName = InProcessServerBuilder.generateName();
176+
server = InProcessServerBuilder.forName(serverName)
177+
.directExecutor()
178+
.addService(new TaskHubSidecarServiceGrpc.TaskHubSidecarServiceImplBase() {
179+
@Override
180+
public void getWorkItems(
181+
OrchestratorService.GetWorkItemsRequest request,
182+
StreamObserver<OrchestratorService.WorkItem> responseObserver) {
183+
firstCallLatch.countDown();
184+
// Keep stream open (simulate connected state)
185+
// The worker should be interrupted by close()
186+
}
187+
})
188+
.build()
189+
.start();
190+
191+
channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
192+
193+
worker = new DurableTaskGrpcWorkerBuilder()
194+
.grpcChannel(channel)
195+
.build();
196+
worker.start();
197+
198+
// Wait for the worker to connect
199+
assertTrue(firstCallLatch.await(10, TimeUnit.SECONDS), "Worker should have connected");
200+
201+
// Close should stop the worker cleanly without hanging
202+
worker.close();
203+
worker = null; // prevent double-close in tearDown
204+
}
205+
}

0 commit comments

Comments
 (0)