Skip to content
1 change: 0 additions & 1 deletion contributing/samples/gepa/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
from tau_bench.types import EnvRunResult
from tau_bench.types import RunConfig
import tau_bench_agent as tau_bench_agent_lib

import utils


Expand Down
1 change: 0 additions & 1 deletion contributing/samples/gepa/run_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from absl import flags
import experiment
from google.genai import types

import utils

_OUTPUT_DIR = flags.DEFINE_string(
Expand Down
1 change: 1 addition & 0 deletions src/google/adk/agents/parallel_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ async def process_an_agent(events_for_one_agent):
finally:
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)


class ParallelAgent(BaseAgent):
Expand Down
14 changes: 11 additions & 3 deletions src/google/adk/flows/llm_flows/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@
_TOOL_THREAD_POOLS: dict[int, ThreadPoolExecutor] = {}
_TOOL_THREAD_POOL_LOCK = threading.Lock()

# Sentinel object used to distinguish a FunctionTool that legitimately returns
# None from a non-FunctionTool sync tool that skips thread-pool execution.
# Using None as a sentinel would cause tools whose underlying function has no
# explicit return statement (implicit None) to fall through to the async
# fallback path and execute a second time.
_SYNC_TOOL_RESULT_UNSET = object()


def _is_live_request_queue_annotation(param: inspect.Parameter) -> bool:
"""Check whether a parameter is annotated as LiveRequestQueue.
Expand Down Expand Up @@ -159,13 +166,14 @@ def run_sync_tool():
}
return tool.func(**args_to_call)
else:
# For other sync tool types, we can't easily run them in thread pool
return None
# For other sync tool types, we can't easily run them in thread pool.
# Return the sentinel so the caller knows to fall back to run_async.
return _SYNC_TOOL_RESULT_UNSET

result = await loop.run_in_executor(
executor, lambda: ctx.run(run_sync_tool)
)
if result is not None:
if result is not _SYNC_TOOL_RESULT_UNSET:
return result
else:
# For async tools, run them in a new event loop in a background thread.
Expand Down
36 changes: 36 additions & 0 deletions tests/unittests/agents/test_parallel_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from google.adk.agents.base_agent import BaseAgent
from google.adk.agents.base_agent import BaseAgentState
from google.adk.agents.invocation_context import InvocationContext
from google.adk.agents.parallel_agent import _merge_agent_run_pre_3_11
from google.adk.agents.parallel_agent import ParallelAgent
from google.adk.agents.sequential_agent import SequentialAgent
from google.adk.agents.sequential_agent import SequentialAgentState
Expand Down Expand Up @@ -373,3 +374,38 @@ async def test_stop_agent_if_sub_agent_fails(
async for _ in agen:
# The infinite agent could iterate a few times depending on scheduling.
pass


async def _slow_agent_with_cleanup_delay():
"""Async generator that sleeps in its finally block to simulate cleanup."""
try:
await asyncio.sleep(10)
yield 'slow-event'
finally:
await asyncio.sleep(0.05)


async def _failing_agent():
"""Async generator that raises after a short delay."""
await asyncio.sleep(0.01)
raise ValueError('simulated sub-agent failure')
yield # pragma: no cover


@pytest.mark.asyncio
async def test_merge_agent_run_pre_3_11_no_aclose_error_on_failure():
"""Regression test for Python 3.10 RuntimeError: aclose() already running.

_merge_agent_run_pre_3_11 must await all cancelled tasks before returning so
that generators are fully released before the caller invokes aclose() on them.
"""
agent_runs = [_slow_agent_with_cleanup_delay(), _failing_agent()]

with pytest.raises(ValueError, match='simulated sub-agent failure'):
async for _ in _merge_agent_run_pre_3_11(agent_runs):
pass

# If tasks were not properly awaited, aclose() on a still-running generator
# would raise RuntimeError here.
for agen in agent_runs:
await agen.aclose()
50 changes: 50 additions & 0 deletions tests/unittests/flows/llm_flows/test_functions_thread_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,56 @@ def blocking_sleep() -> dict:
event_loop_ticks >= 5
), f'Event loop should have ticked at least 5 times, got {event_loop_ticks}'

@pytest.mark.asyncio
@pytest.mark.parametrize(
'return_value,use_implicit_return',
[
(None, True), # implicit None (no return statement)
(None, False), # explicit `return None`
(0, False), # falsy int
('', False), # falsy str
({}, False), # falsy dict
(False, False), # falsy bool
],
)
async def test_sync_tool_falsy_return_executes_exactly_once(
self, return_value, use_implicit_return
):
"""FunctionTools returning None or other falsy values must execute exactly once.

Regression test for https://github.com/google/adk-python/issues/5284.
Previously, a None return was mistaken for the internal sentinel used to
signal 'non-FunctionTool, fall back to run_async', causing a second
invocation. The fix uses an identity-based sentinel so that None and other
falsy values (0, '', {}, False) are treated as valid results.
"""
call_count = 0

def sync_func():
nonlocal call_count
call_count += 1
if not use_implicit_return:
return return_value
# implicit None — no return statement

tool = FunctionTool(sync_func)
model = testing_utils.MockModel.create(responses=[])
agent = Agent(name='test_agent', model=model, tools=[tool])
invocation_context = await testing_utils.create_invocation_context(
agent=agent, user_content=''
)
tool_context = ToolContext(
invocation_context=invocation_context,
function_call_id='test_id',
)

result = await _call_tool_in_thread_pool(tool, {}, tool_context)

assert result == return_value
assert (
call_count == 1
), f'Tool function executed {call_count} time(s); expected exactly 1.'

@pytest.mark.asyncio
async def test_sync_tool_exception_propagates(self):
"""Test that exceptions from sync tools propagate correctly."""
Expand Down