Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions .github/workflows/itk-nightly.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
name: Nightly ITK

on:
schedule:
- cron: '0 2 * * *'
workflow_dispatch:

permissions:
contents: write

jobs:
nightly:
name: Nightly ITK Run
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v6

- name: Set up JDK 17
uses: actions/setup-java@v5
with:
java-version: '17'
distribution: 'temurin'
cache: maven

- name: Run Nightly ITK Tests
run: bash run_itk.sh
working-directory: itk
env:
A2A_ITK_REVISION: main
ITK_NIGHTLY_RUN: "True"

- name: Upload Results to Rolling Release
uses: softprops/action-gh-release@v3
with:
tag_name: "nightly-metrics"
prerelease: true
files: |
itk/itk_java.json
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
44 changes: 44 additions & 0 deletions .github/workflows/itk.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: ITK

on:
push:
branches: [main]
pull_request:
paths:
- 'client/**'
- 'common/**'
- 'http-client/**'
- 'itk/**'
- 'jsonrpc-common/**'
- 'reference/**'
- 'server-common/**'
- 'spec/**'
- 'spec-grpc/**'
- 'transport/**'
- 'pom.xml'
- '.github/workflows/itk.yaml'

permissions:
contents: read

jobs:
itk:
name: ITK
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v6

- name: Set up JDK 17
uses: actions/setup-java@v5
with:
java-version: '17'
distribution: 'temurin'
cache: maven

- name: Run ITK Tests
run: bash run_itk.sh
working-directory: itk
env:
A2A_ITK_REVISION: main
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class ExtrasBomVerifier extends DynamicBomVerifier {
private static final Set<String> EXTRAS_EXCLUSIONS = Set.of(
"boms/", // BOM test modules themselves
"examples/", // Example applications
"itk/", // Integration Test Kit agent
"tck/", // TCK test suite
"tests/", // Integration tests
"test-utils-docker/", // Test utilities for Docker-based tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class ReferenceBomVerifier extends DynamicBomVerifier {
private static final Set<String> REFERENCE_EXCLUSIONS = Set.of(
"boms/", // BOM test modules themselves
"examples/", // Example applications
"itk/", // Integration Test Kit agent
"tck/", // TCK test suite
"tests/", // Integration tests
"test-utils-docker/", // Test utilities for Docker-based tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class SdkBomVerifier extends DynamicBomVerifier {
private static final Set<String> SDK_EXCLUSIONS = Set.of(
"boms/", // BOM test modules themselves
"examples/", // Example applications
"itk/", // Integration Test Kit agent
"tck/", // TCK test suite
"compat-0.3/tck/", // Compat 0.3 TCK (not yet enabled)
"compat-0.3/reference/", // Compat 0.3 reference implementations (in reference BOM)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public SSEEventListener(Consumer<StreamingEventKind> eventHandler,
@Override
public void onMessage(ServerSentEvent event, @Nullable Future<Void> completableFuture) {
try {
log.fine("Streaming message received: " + event.data());
log.fine("REST SSE raw data: " + event.data());
org.a2aproject.sdk.grpc.StreamResponse.Builder builder = org.a2aproject.sdk.grpc.StreamResponse.newBuilder();
JsonFormat.parser().merge(event.data(), builder);
parseAndHandleMessage(builder.build(), completableFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public Task_v0_3 getCurrentTask() throws A2AClientInvalidStateError_v0_3 {
}

public Task_v0_3 saveTaskEvent(Task_v0_3 task) throws A2AClientInvalidArgsError_v0_3 {
if (currentTask != null) {
if (currentTask != null && !currentTask.id().equals(task.id())) {
throw new A2AClientInvalidArgsError_v0_3("Task is already set, create new manager for new tasks.");
}
saveTask(task);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.a2aproject.sdk.compat03.server.grpc.quarkus;

import java.util.concurrent.Executor;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.a2aproject.sdk.server.util.async.Internal;
import io.grpc.Context;
import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;

/**
* v0.3 variant of {@code BlockingOffloadInterceptor} in the reference/grpc module.
*/
@ApplicationScoped
public class BlockingOffloadInterceptor_v0_3 implements ServerInterceptor {

private final Executor executor;

@Inject
public BlockingOffloadInterceptor_v0_3(@Internal Executor executor) {
this.executor = executor;
}

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {

MethodDescriptor.MethodType type = call.getMethodDescriptor().getType();
if (type == MethodDescriptor.MethodType.CLIENT_STREAMING
|| type == MethodDescriptor.MethodType.BIDI_STREAMING) {
return next.startCall(call, headers);
}

ServerCall.Listener<ReqT> delegate = next.startCall(call, headers);

return new SimpleForwardingServerCallListener<ReqT>(delegate) {
@Override
public void onHalfClose() {
Context grpcContext = Context.current().fork();
try {
executor.execute(() -> {
Context previous = grpcContext.attach();
try {
super.onHalfClose();
} finally {
grpcContext.detach(previous);
}
});
} catch (Exception e) {
call.close(Status.INTERNAL.withDescription("Failed to offload to worker thread: " + e.getMessage()), new Metadata());
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.quarkus.grpc.GrpcService;
import io.quarkus.grpc.RegisterInterceptor;
import io.quarkus.security.Authenticated;
import io.smallrye.common.annotation.Blocking;
import org.a2aproject.sdk.compat03.conversion.Convert_v0_3_To10RequestHandler;
import org.a2aproject.sdk.compat03.spec.AgentCard_v0_3;
import org.a2aproject.sdk.compat03.transport.grpc.handler.CallContextFactory_v0_3;
Expand All @@ -17,7 +18,9 @@

@GrpcService
@RegisterInterceptor(A2AExtensionsInterceptor_v0_3.class)
@RegisterInterceptor(BlockingOffloadInterceptor_v0_3.class)
@Authenticated
@Blocking
public class QuarkusGrpcHandler_v0_3 extends GrpcHandler_v0_3 {

private final AgentCard_v0_3 agentCard;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void setupRoutes(@Observes Router router) {
} catch (Exception e) {
VertxSecurityHelper.handleGenericError(ctx);
}
});
}, false);

// Only register v0.3 agent card if no real v1.0 agent card producer exists.
// DefaultProducers provides a @DefaultBean AgentCard fallback that is always
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,52 +75,52 @@ void setupRouter(@Observes @Priority(10) Router router) {
.handler(BodyHandler.create())
.blockingHandler(authenticated(ctx -> {
sendMessage(extractBody(ctx), ctx);
}));
}), false);

// POST /v1/message:stream
router.postWithRegex("^\\/v1\\/message:stream$")
.handler(BodyHandler.create())
.blockingHandler(authenticatedStreaming(ctx -> {
sendMessageStreaming(extractBody(ctx), ctx);
}));
}), false);

// GET /v1/tasks/:id
router.get("/v1/tasks/:id")
.order(1)
.blockingHandler(authenticated(this::getTask));
.blockingHandler(authenticated(this::getTask), false);

// POST /v1/tasks/{id}:cancel
router.postWithRegex("^\\/v1\\/tasks\\/([^/]+):cancel$")
.order(1)
.blockingHandler(authenticated(this::cancelTask));
.blockingHandler(authenticated(this::cancelTask), false);

// POST /v1/tasks/{id}:subscribe
router.postWithRegex("^\\/v1\\/tasks\\/([^/]+):subscribe$")
.order(1)
.blockingHandler(authenticatedStreaming(this::resubscribeTask));
.blockingHandler(authenticatedStreaming(this::resubscribeTask), false);

// POST /v1/tasks/:id/pushNotificationConfigs
router.post("/v1/tasks/:id/pushNotificationConfigs")
.order(1)
.handler(BodyHandler.create())
.blockingHandler(authenticated(ctx -> {
setTaskPushNotificationConfiguration(extractBody(ctx), ctx);
}));
}), false);

// GET /v1/tasks/:id/pushNotificationConfigs/:configId
router.get("/v1/tasks/:id/pushNotificationConfigs/:configId")
.order(1)
.blockingHandler(authenticated(this::getTaskPushNotificationConfiguration));
.blockingHandler(authenticated(this::getTaskPushNotificationConfiguration), false);

// GET /v1/tasks/:id/pushNotificationConfigs
router.get("/v1/tasks/:id/pushNotificationConfigs")
.order(2)
.blockingHandler(authenticated(this::listTaskPushNotificationConfigurations));
.blockingHandler(authenticated(this::listTaskPushNotificationConfigurations), false);

// DELETE /v1/tasks/:id/pushNotificationConfigs/:configId
router.delete("/v1/tasks/:id/pushNotificationConfigs/:configId")
.order(1)
.blockingHandler(authenticated(this::deleteTaskPushNotificationConfiguration));
.blockingHandler(authenticated(this::deleteTaskPushNotificationConfiguration), false);

// Only register v0.3 agent card if no real v1.0 agent card producer exists.
// DefaultProducers provides a @DefaultBean AgentCard fallback that is always
Expand All @@ -136,7 +136,7 @@ void setupRouter(@Observes @Priority(10) Router router) {
router.get("/v1/card")
.order(1)
.produces(APPLICATION_JSON)
.blockingHandler(authenticated(this::getAuthenticatedExtendedCard));
.blockingHandler(authenticated(this::getAuthenticatedExtendedCard), false);
}

private Handler<RoutingContext> authenticated(Consumer<RoutingContext> action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ default FileContent toV10(FileContent_v0_3 v03) {
}

if (v03 instanceof FileWithBytes_v0_3 v03Bytes) {
return new FileWithBytes(v03Bytes.mimeType(), v03Bytes.name(), v03Bytes.bytes());
String name = v03Bytes.name() != null ? v03Bytes.name() : "";
return new FileWithBytes(v03Bytes.mimeType(), name, v03Bytes.bytes());
} else if (v03 instanceof FileWithUri_v0_3 v03Uri) {
return new FileWithUri(v03Uri.mimeType(), v03Uri.name(), v03Uri.uri());
String name = v03Uri.name() != null ? v03Uri.name() : "";
return new FileWithUri(v03Uri.mimeType(), name, v03Uri.uri());
}

throw new InvalidRequestError(null, "Unrecognized FileContent type: " + v03.getClass().getName(), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.a2aproject.sdk.server.auth.UnauthenticatedUser;
import org.a2aproject.sdk.server.auth.User;
import org.a2aproject.sdk.spec.A2AError;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;

Expand Down Expand Up @@ -235,6 +236,16 @@ public void sendStreamingMessage(org.a2aproject.sdk.compat03.grpc.SendMessageReq

try {
ServerCallContext context = createCallContext(responseObserver);
Context forked = Context.current().fork();
context.getState().put(ServerCallContext.EXECUTION_WRAPPER_KEY,
(java.util.function.UnaryOperator<Runnable>) (runnable -> () -> {
Context prev = forked.attach();
try {
runnable.run();
} finally {
forked.detach(prev);
}
}));
MessageSendParams_v0_3 params = FromProto.messageSendParams(request);
Flow.Publisher<StreamingEventKind_v0_3> publisher = requestHandler.onMessageSendStream(params, context);
convertToStreamResponse(publisher, responseObserver);
Expand All @@ -259,6 +270,16 @@ public void taskSubscription(org.a2aproject.sdk.compat03.grpc.TaskSubscriptionRe

try {
ServerCallContext context = createCallContext(responseObserver);
Context forkedSub = Context.current().fork();
context.getState().put(ServerCallContext.EXECUTION_WRAPPER_KEY,
(java.util.function.UnaryOperator<Runnable>) (runnable -> () -> {
Context prev = forkedSub.attach();
try {
runnable.run();
} finally {
forkedSub.detach(prev);
}
}));
TaskIdParams_v0_3 params = FromProto.taskIdParams(request);
Flow.Publisher<StreamingEventKind_v0_3> publisher = requestHandler.onResubscribeToTask(params, context);
convertToStreamResponse(publisher, responseObserver);
Expand Down
4 changes: 4 additions & 0 deletions itk/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
a2a-itk/
logs/
raw_results.json
itk_java.json
Loading
Loading